peer and service ads now insert into peerstore

This commit is contained in:
херетик
2023-06-17 08:34:49 +01:00
parent 70008ab8c0
commit f8fbf0a513
7 changed files with 71 additions and 32 deletions

View File

@@ -212,7 +212,7 @@ func New(p Params) (c *Engine, e error) {
if c.sub, e = c.topic.Subscribe(); fails(e) { if c.sub, e = c.topic.Subscribe(); fails(e) {
return return
} }
log.D.Ln("subscribed to", PubSubTopic, "topic on gossip network") log.T.Ln("subscribed to", PubSubTopic, "topic on gossip network")
} }
c.Manager.AddNodes(append([]*node.Node{p.Node}, p.Nodes...)...) c.Manager.AddNodes(append([]*node.Node{p.Node}, p.Nodes...)...)
// AddIntro a return session for receiving responses, ideally more of these // AddIntro a return session for receiving responses, ideally more of these

View File

@@ -4,6 +4,7 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"github.com/indra-labs/indra/pkg/onions/adload"
"github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peer"
"reflect" "reflect"
@@ -33,16 +34,19 @@ func (ng *Engine) RunAdHandler(handler func(p *pubsub.Message,
for { for {
var m *pubsub.Message var m *pubsub.Message
var e error var e error
log.D.Ln("waiting for next message from gossip network")
if m, e = ng.sub.Next(ng.ctx); e != nil { if m, e = ng.sub.Next(ng.ctx); e != nil {
continue continue
} }
log.D.Ln("received message from gossip network")
if e = handler(m, ng.ctx); fails(e) { if e = handler(m, ng.ctx); fails(e) {
continue continue
} }
select { select {
case <-ng.ctx.Done(): case <-ng.ctx.Done():
log.D.Ln("shutting down ad handler")
break out break out
default:
} }
} }
return return
@@ -71,7 +75,16 @@ func (ng *Engine) HandleAd(p *pubsub.Message, ctx context.Context) (e error) {
return fmt.Errorf(ErrWrongTypeDecode, return fmt.Errorf(ErrWrongTypeDecode,
adaddress.Magic, reflect.TypeOf(c).String()) adaddress.Magic, reflect.TypeOf(c).String())
} else { } else {
_ = addr // If we got to here now we can add to the PeerStore.
log.D.S("new ad for services:", c)
var id peer.ID
if id, e = peer.IDFromPublicKey(addr.Key); fails(e) {
return
}
if e = ng.Listener.Host.
Peerstore().Put(id, "services", s.GetAll().ToBytes()); fails(e) {
return
}
} }
case *adintro.Ad: case *adintro.Ad:
log.D.Ln("received", reflect.TypeOf(c), "from gossip network") log.D.Ln("received", reflect.TypeOf(c), "from gossip network")
@@ -79,8 +92,33 @@ func (ng *Engine) HandleAd(p *pubsub.Message, ctx context.Context) (e error) {
return fmt.Errorf(ErrWrongTypeDecode, return fmt.Errorf(ErrWrongTypeDecode,
adintro.Magic, reflect.TypeOf(c).String()) adintro.Magic, reflect.TypeOf(c).String())
} else { } else {
_ = intr // If we got to here now we can add to the PeerStore.
log.D.S("new ad for services:", c)
var id peer.ID
if id, e = peer.IDFromPublicKey(intr.Key); fails(e) {
return
}
if e = ng.Listener.Host.
Peerstore().Put(id, "services", s.GetAll().ToBytes()); fails(e) {
return
}
}
case *adload.Ad:
log.D.Ln("received", reflect.TypeOf(c), "from gossip network")
if lod, ok := c.(*adload.Ad); !ok {
return fmt.Errorf(ErrWrongTypeDecode,
adaddress.Magic, reflect.TypeOf(c).String())
} else {
// If we got to here now we can add to the PeerStore.
log.D.S("new ad for services:", c)
var id peer.ID
if id, e = peer.IDFromPublicKey(lod.Key); fails(e) {
return
}
if e = ng.Listener.Host.
Peerstore().Put(id, "services", s.GetAll().ToBytes()); fails(e) {
return
}
} }
case *adservices.Ad: case *adservices.Ad:
log.D.Ln("received", reflect.TypeOf(c), "from gossip network") log.D.Ln("received", reflect.TypeOf(c), "from gossip network")
@@ -88,17 +126,16 @@ func (ng *Engine) HandleAd(p *pubsub.Message, ctx context.Context) (e error) {
return fmt.Errorf(ErrWrongTypeDecode, return fmt.Errorf(ErrWrongTypeDecode,
adservices.Magic, reflect.TypeOf(c).String()) adservices.Magic, reflect.TypeOf(c).String())
} else { } else {
// If we got to here now we can add to the PeerStore. // If we got to here now we can add to the PeerStore.
log.D.S("new ad for service:", c) log.D.S("new ad for services:", c)
var id peer.ID var id peer.ID
if id, e = peer.IDFromPublicKey(serv.Key); fails(e) { if id, e = peer.IDFromPublicKey(serv.Key); fails(e) {
return return
} }
if e = ng.Listener.Host. if e = ng.Listener.Host.
Peerstore().Put(id, "service", s.GetAll().ToBytes()); fails(e) { Peerstore().Put(id, "services", s.GetAll().ToBytes()); fails(e) {
return return
} }
} }
case *adpeer.Ad: case *adpeer.Ad:
log.D.Ln("received", reflect.TypeOf(c), "from gossip network") log.D.Ln("received", reflect.TypeOf(c), "from gossip network")

View File

@@ -4,6 +4,7 @@ import (
"github.com/indra-labs/indra" "github.com/indra-labs/indra"
"github.com/indra-labs/indra/pkg/crypto/nonce" "github.com/indra-labs/indra/pkg/crypto/nonce"
"github.com/indra-labs/indra/pkg/onions/adpeer" "github.com/indra-labs/indra/pkg/onions/adpeer"
"github.com/indra-labs/indra/pkg/onions/adservices"
"github.com/indra-labs/indra/pkg/util/splice" "github.com/indra-labs/indra/pkg/util/splice"
"os" "os"
"testing" "testing"
@@ -54,17 +55,18 @@ func TestEngine_PeerStore(t *testing.T) {
if e = engines[0].SendAd(newPeerAd); fails(e) { if e = engines[0].SendAd(newPeerAd); fails(e) {
t.FailNow() t.FailNow()
} }
//time.Sleep(time.Second) time.Sleep(time.Second * 3)
//newServiceAd := adservices.NewServiceAd(nonce.NewID(), newServiceAd := adservices.New(nonce.NewID(),
// engines[0].Manager.GetLocalNodeIdentityPrv(), engines[0].Manager.GetLocalNodeIdentityPrv(),
// 20000, 54321, time.Now().Add(time.Hour*24*7)) []adservices.Service{{20000, 54321}},
//ss := splice.New(newServiceAd.Len()) time.Now().Add(time.Hour*24*7))
//if e = newServiceAd.Encode(ss); fails(e) { ss := splice.New(newServiceAd.Len())
// t.FailNow() if e = newServiceAd.Encode(ss); fails(e) {
//} t.FailNow()
//if e = engines[0].SendAd(newServiceAd); fails(e) { }
// t.FailNow() if e = engines[0].SendAd(newServiceAd); fails(e) {
//} t.FailNow()
}
time.Sleep(time.Second * 3) time.Sleep(time.Second * 3)
cancel() cancel()
for i := range engines { for i := range engines {

View File

@@ -1,4 +1,4 @@
package adpeer package adload
import ( import (
"github.com/indra-labs/indra/pkg/crypto" "github.com/indra-labs/indra/pkg/crypto"

View File

@@ -1,4 +1,4 @@
package adpeer package adload
import ( import (
"testing" "testing"

View File

@@ -45,8 +45,8 @@ type Ad struct {
var _ coding.Codec = &Ad{} var _ coding.Codec = &Ad{}
// NewServiceAd ... // New ...
func NewServiceAd(id nonce.ID, key *crypto.Prv, services []Service, func New(id nonce.ID, key *crypto.Prv, services []Service,
expiry time.Time) (sv *Ad) { expiry time.Time) (sv *Ad) {
s := splice.New(adintro.Len) s := splice.New(adintro.Len)

View File

@@ -19,7 +19,7 @@ func TestServiceAd(t *testing.T) {
var e error var e error
pr, _, _ := crypto.NewSigner() pr, _, _ := crypto.NewSigner()
id := nonce.NewID() id := nonce.NewID()
sv := NewServiceAd(id, pr, []Service{{80, 50000},{443, 50000}}, time.Now().Add(time.Hour)) sv := New(id, pr, []Service{{80, 50000},{443, 50000}}, time.Now().Add(time.Hour))
log.D.S("service", sv) log.D.S("service", sv)
s := splice.New(sv.Len()) s := splice.New(sv.Len())
if e = sv.Encode(s); fails(e) { if e = sv.Encode(s); fails(e) {