diff --git a/pkg/engine/engine.go b/pkg/engine/engine.go index 5ffe983f..020ebbc8 100644 --- a/pkg/engine/engine.go +++ b/pkg/engine/engine.go @@ -172,6 +172,7 @@ func (ng *Engine) Shutdown() { // Start a single thread of the Engine. func (ng *Engine) Start() { log.T.Ln("starting engine") + ng.RunAdHandler(ng.HandleAd) for { if ng.Handler() { break @@ -197,13 +198,11 @@ func New(p Params) (c *Engine, e error) { h: hidden.NewHiddenrouting(), Pause: qu.T(), } - var ps *pubsub.PubSub if p.Listener.Host != nil { - if ps, e = pubsub.NewGossipSub(ctx, p.Listener.Host); fails(e) { + if c.PubSub, e = pubsub.NewGossipSub(ctx, p.Listener.Host); fails(e) { cancel() return } - c.PubSub = ps if c.topic, e = c.PubSub.Join(PubSubTopic); fails(e) { return } diff --git a/pkg/engine/peerstore.go b/pkg/engine/peerstore.go index 4f2b977c..176bd532 100644 --- a/pkg/engine/peerstore.go +++ b/pkg/engine/peerstore.go @@ -1,11 +1,101 @@ package engine -//import "github.com/libp2p/go-libp2p/core/peer" -// -//func (ng *Engine) Publish(p peer.ID, key string, val interface{}) error { -// return ng.Manager.Listener.Host.Peerstore().Put(p, key, val) -//} -// -//func (ng *Engine) FindPeerRecord(p peer.ID, key string) (val interface{}, e error) { -// return ng.Manager.Listener.Host.Peerstore().Get(p, key) -//} +import ( + "context" + "errors" + "fmt" + "github.com/indra-labs/indra/pkg/ad" + "github.com/indra-labs/indra/pkg/onions/adaddress" + "github.com/indra-labs/indra/pkg/onions/adintro" + "github.com/indra-labs/indra/pkg/onions/adpeer" + "github.com/indra-labs/indra/pkg/onions/adservice" + "github.com/indra-labs/indra/pkg/onions/ont" + "github.com/indra-labs/indra/pkg/onions/reg" + "github.com/indra-labs/indra/pkg/util/slice" + "github.com/indra-labs/indra/pkg/util/splice" + pubsub "github.com/libp2p/go-libp2p-pubsub" + "reflect" +) + +func (ng *Engine) SendAd(a ad.Ad) (e error) { + return ng.topic.Publish(ng.ctx, ont.Encode(a).GetAll()) +} + +func (ng *Engine) RunAdHandler(handler func(b slice.Bytes, ctx context.Context) (e error)) { + // Since the frequency of updates should be around 1 hour we run here only one + // thread here. Relays indicate their loading as part of the response message + // protocol for ranking in the session cache. + go func(ng *Engine) { + out: + for { + var m *pubsub.Message + var e error + if m, e = ng.sub.Next(ng.ctx); e != nil { + continue + } + if e = handler(m.Data, ng.ctx); fails(e) { + continue + } + select { + case <-ng.ctx.Done(): + break out + } + } + return + }(ng) +} + +const ErrWrongTypeDecode = "magic '%s' but type is '%s'" + +func (ng *Engine) HandleAd(b slice.Bytes, ctx context.Context) (e error) { + if len(b) < 1 { + log.E.Ln("received slice of no length") + return + } + s := splice.NewFrom(b) + c := reg.Recognise(s) + if c == nil { + return errors.New("message not recognised") + } + if e = c.Decode(s); fails(e) { + return + } + switch c.Magic() { + case adaddress.Magic: + log.D.Ln("received", reflect.TypeOf(c), "from gossip network") + if addr, ok := c.(*adaddress.Ad); !ok { + return fmt.Errorf(ErrWrongTypeDecode, + adaddress.Magic, reflect.TypeOf(c).String()) + } else { + _ = addr + } + case adintro.Magic: + log.D.Ln("received", reflect.TypeOf(c), "from gossip network") + if intr, ok := c.(*adintro.Ad); !ok { + return fmt.Errorf(ErrWrongTypeDecode, + adintro.Magic, reflect.TypeOf(c).String()) + } else { + _ = intr + + } + case adservice.Magic: + log.D.Ln("received", reflect.TypeOf(c), "from gossip network") + if serv, ok := c.(*adservice.Ad); !ok { + return fmt.Errorf(ErrWrongTypeDecode, + adservice.Magic, reflect.TypeOf(c).String()) + } else { + _ = serv + + } + case adpeer.Magic: + log.D.Ln("received", reflect.TypeOf(c), "from gossip network") + if peer, ok := c.(*adpeer.Ad); !ok { + return fmt.Errorf(ErrWrongTypeDecode, + adpeer.Magic, reflect.TypeOf(c).String()) + } else { + _ = peer + log.T.Ln("received", reflect.TypeOf(c)) + } + } + return +} diff --git a/pkg/engine/peerstore_test.go b/pkg/engine/peerstore_test.go index 622557b2..4293f3b0 100644 --- a/pkg/engine/peerstore_test.go +++ b/pkg/engine/peerstore_test.go @@ -44,8 +44,9 @@ func TestEngine_PeerStore(t *testing.T) { if e = newAd.Encode(s); fails(e) { t.FailNow() } - //engines[0].sub.Next(engines[0].ctx) - //engines[1].topic.Publish() + if e = engines[0].SendAd(newAd); fails(e) { + t.FailNow() + } time.Sleep(time.Second * 3) cancel() for i := range engines { diff --git a/pkg/onions/ont/interfaces.go b/pkg/onions/ont/interfaces.go index 7d4b1a78..41af47c8 100644 --- a/pkg/onions/ont/interfaces.go +++ b/pkg/onions/ont/interfaces.go @@ -41,9 +41,9 @@ type Onion interface { } // Encode is the generic encoder for an onion, all onions can be encoded with it. -func Encode(on Onion) (s *splice.Splice) { - s = splice.New(on.Len()) - fails(on.Encode(s)) +func Encode(d coding.Codec) (s *splice.Splice) { + s = splice.New(d.Len()) + fails(d.Encode(s)) return } diff --git a/pkg/proc/log/log.go b/pkg/proc/log/log.go index f4f50ff2..4d433e0b 100644 --- a/pkg/proc/log/log.go +++ b/pkg/proc/log/log.go @@ -389,7 +389,7 @@ func logPrint( Longest.Store(uint32(len(loc))) } loc = color.OpItalic.Sprint(color.OpUnderscore.Sprint(loc)) + strings.Repeat(" ", int(Longest.Load())-len(loc)+1) - formatString := fmt.Sprint("%s%-6v %s %s %s") + formatString := "%s%-6v %s %s %s" timeText = time.Now().Format("2006-01-02 15:04:05.999999999 UTC+0700") var app string if len(App.Load()) > 0 {