Peers now gossip ads

This commit is contained in:
херетик
2023-06-09 07:12:53 +01:00
parent 1dd2ac538b
commit 724446e0d3
5 changed files with 108 additions and 18 deletions

View File

@@ -172,6 +172,7 @@ func (ng *Engine) Shutdown() {
// Start a single thread of the Engine.
func (ng *Engine) Start() {
log.T.Ln("starting engine")
ng.RunAdHandler(ng.HandleAd)
for {
if ng.Handler() {
break
@@ -197,13 +198,11 @@ func New(p Params) (c *Engine, e error) {
h: hidden.NewHiddenrouting(),
Pause: qu.T(),
}
var ps *pubsub.PubSub
if p.Listener.Host != nil {
if ps, e = pubsub.NewGossipSub(ctx, p.Listener.Host); fails(e) {
if c.PubSub, e = pubsub.NewGossipSub(ctx, p.Listener.Host); fails(e) {
cancel()
return
}
c.PubSub = ps
if c.topic, e = c.PubSub.Join(PubSubTopic); fails(e) {
return
}

View File

@@ -1,11 +1,101 @@
package engine
//import "github.com/libp2p/go-libp2p/core/peer"
//
//func (ng *Engine) Publish(p peer.ID, key string, val interface{}) error {
// return ng.Manager.Listener.Host.Peerstore().Put(p, key, val)
//}
//
//func (ng *Engine) FindPeerRecord(p peer.ID, key string) (val interface{}, e error) {
// return ng.Manager.Listener.Host.Peerstore().Get(p, key)
//}
import (
"context"
"errors"
"fmt"
"github.com/indra-labs/indra/pkg/ad"
"github.com/indra-labs/indra/pkg/onions/adaddress"
"github.com/indra-labs/indra/pkg/onions/adintro"
"github.com/indra-labs/indra/pkg/onions/adpeer"
"github.com/indra-labs/indra/pkg/onions/adservice"
"github.com/indra-labs/indra/pkg/onions/ont"
"github.com/indra-labs/indra/pkg/onions/reg"
"github.com/indra-labs/indra/pkg/util/slice"
"github.com/indra-labs/indra/pkg/util/splice"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"reflect"
)
func (ng *Engine) SendAd(a ad.Ad) (e error) {
return ng.topic.Publish(ng.ctx, ont.Encode(a).GetAll())
}
func (ng *Engine) RunAdHandler(handler func(b slice.Bytes, ctx context.Context) (e error)) {
// Since the frequency of updates should be around 1 hour we run here only one
// thread here. Relays indicate their loading as part of the response message
// protocol for ranking in the session cache.
go func(ng *Engine) {
out:
for {
var m *pubsub.Message
var e error
if m, e = ng.sub.Next(ng.ctx); e != nil {
continue
}
if e = handler(m.Data, ng.ctx); fails(e) {
continue
}
select {
case <-ng.ctx.Done():
break out
}
}
return
}(ng)
}
const ErrWrongTypeDecode = "magic '%s' but type is '%s'"
func (ng *Engine) HandleAd(b slice.Bytes, ctx context.Context) (e error) {
if len(b) < 1 {
log.E.Ln("received slice of no length")
return
}
s := splice.NewFrom(b)
c := reg.Recognise(s)
if c == nil {
return errors.New("message not recognised")
}
if e = c.Decode(s); fails(e) {
return
}
switch c.Magic() {
case adaddress.Magic:
log.D.Ln("received", reflect.TypeOf(c), "from gossip network")
if addr, ok := c.(*adaddress.Ad); !ok {
return fmt.Errorf(ErrWrongTypeDecode,
adaddress.Magic, reflect.TypeOf(c).String())
} else {
_ = addr
}
case adintro.Magic:
log.D.Ln("received", reflect.TypeOf(c), "from gossip network")
if intr, ok := c.(*adintro.Ad); !ok {
return fmt.Errorf(ErrWrongTypeDecode,
adintro.Magic, reflect.TypeOf(c).String())
} else {
_ = intr
}
case adservice.Magic:
log.D.Ln("received", reflect.TypeOf(c), "from gossip network")
if serv, ok := c.(*adservice.Ad); !ok {
return fmt.Errorf(ErrWrongTypeDecode,
adservice.Magic, reflect.TypeOf(c).String())
} else {
_ = serv
}
case adpeer.Magic:
log.D.Ln("received", reflect.TypeOf(c), "from gossip network")
if peer, ok := c.(*adpeer.Ad); !ok {
return fmt.Errorf(ErrWrongTypeDecode,
adpeer.Magic, reflect.TypeOf(c).String())
} else {
_ = peer
log.T.Ln("received", reflect.TypeOf(c))
}
}
return
}

View File

@@ -44,8 +44,9 @@ func TestEngine_PeerStore(t *testing.T) {
if e = newAd.Encode(s); fails(e) {
t.FailNow()
}
//engines[0].sub.Next(engines[0].ctx)
//engines[1].topic.Publish()
if e = engines[0].SendAd(newAd); fails(e) {
t.FailNow()
}
time.Sleep(time.Second * 3)
cancel()
for i := range engines {

View File

@@ -41,9 +41,9 @@ type Onion interface {
}
// Encode is the generic encoder for an onion, all onions can be encoded with it.
func Encode(on Onion) (s *splice.Splice) {
s = splice.New(on.Len())
fails(on.Encode(s))
func Encode(d coding.Codec) (s *splice.Splice) {
s = splice.New(d.Len())
fails(d.Encode(s))
return
}

View File

@@ -389,7 +389,7 @@ func logPrint(
Longest.Store(uint32(len(loc)))
}
loc = color.OpItalic.Sprint(color.OpUnderscore.Sprint(loc)) + strings.Repeat(" ", int(Longest.Load())-len(loc)+1)
formatString := fmt.Sprint("%s%-6v %s %s %s")
formatString := "%s%-6v %s %s %s"
timeText = time.Now().Format("2006-01-02 15:04:05.999999999 UTC+0700")
var app string
if len(App.Load()) > 0 {