Files
indra/pkg/engine/peerstore.go
l0k18 a2bac60a30 encryption added to tests that didn't refactor correctly
note about encryption and why it must be

test datastore now using same settings as seed

badger options extracted from storage for test and other uses

one place it was missing

all using badger/v3 now

peer database entry count implemented

fixed stray incorrect refactorings

added expiry to interface, now propagates correctly

eliminated type switch with extending of interface

Access to badger View and Update now in Engine

Datastore now available via Listener
2023-08-01 10:50:29 +01:00

260 lines
6.9 KiB
Go

package engine
import (
"context"
"errors"
"fmt"
"github.com/dgraph-io/badger/v3"
"github.com/indra-labs/indra/pkg/cert"
magic2 "github.com/indra-labs/indra/pkg/engine/magic"
"github.com/indra-labs/indra/pkg/util/slice"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"reflect"
"time"
"github.com/indra-labs/indra/pkg/codec/ad/addresses"
"github.com/indra-labs/indra/pkg/codec/reg"
"github.com/indra-labs/indra/pkg/util/splice"
pubsub "github.com/libp2p/go-libp2p-pubsub"
)
// SetupGossip establishes a connection of a Host to the pubsub gossip network
// used by Indra to propagate peer metadata.
func (ng *Engine) SetupGossip(ctx context.Context, host host.Host,
cancel func()) (PubSub *pubsub.PubSub, topic *pubsub.Topic,
sub *pubsub.Subscription, e error) {
if PubSub, e = pubsub.NewGossipSub(ctx, host); fails(e) {
cancel()
return
}
if topic, e = PubSub.Join(PubSubTopic); fails(e) {
cancel()
return
}
if sub, e = topic.Subscribe(); fails(e) {
cancel()
return
}
log.T.Ln(ng.LogEntry("subscribed to"), PubSubTopic,
"topic on gossip network")
return
}
// SendAd dispatches an encoded byte slice ostensibly of a peer advertisement to
// gossip to the rest of the network.
func (ng *Engine) SendAd(a slice.Bytes) (e error) {
return ng.topic.Publish(ng.ctx, a)
}
// SendAds dispatches all ads in NodeAds. Primarily called at startup.
func (ng *Engine) SendAds() (e error) {
ads := ng.NodeAds.GetAsCerts()
for _, v := range ads {
s := splice.New(v.Len())
if fails(v.Encode(s)) {
return
}
if e = ng.topic.Publish(ng.ctx, s.GetAll()); fails(e) {
return
}
}
return
}
// RunAdHandler listens to the gossip and dispatches messages to be handled and
// update the peerstore.
func (ng *Engine) RunAdHandler(handler func(p *pubsub.Message) (e error)) {
// Since the frequency of updates should be around 1 hour we run here only
// one thread here. Relays indicate their loading as part of the response
// message protocol for ranking in the session cache.
go func(ng *Engine) {
out:
for {
var m *pubsub.Message
var e error
if m, e = ng.sub.Next(ng.ctx); e != nil {
continue
}
if e = handler(m); fails(e) {
continue
}
select {
case <-ng.ctx.Done():
log.D.Ln("shutting down ad handler")
break out
default:
}
}
return
}(ng)
go func(ng *Engine) {
log.D.Ln(ng.LogEntry("checking and updating peer information ads"))
// First time we want to do the thing straight away and update the peers
// with a new ads.NodeAds.
ng.gossip(time.NewTicker(time.Second))
// Then after this we check once a second
}(ng)
}
// Fingerprint is a short identifier generated by a 40 byte truncated hash of
// the node public key.
func (ng *Engine) Fingerprint() (fp string) {
return ng.Mgr().GetLocalNode().Identity.Pub.Fingerprint()
}
func (ng *Engine) LogEntry(s string) (entry string) {
return fmt.Sprint(ng.Fingerprint(), " ", s)
}
func (ng *Engine) gossip(tick *time.Ticker) {
first := true
out:
for {
if first {
first = false
// Send out all ads because we are starting up.
fails(ng.SendAds())
// As all ads are sent we can return to the head of the loop.
continue
}
// Check for already generated NodeAds, and make them first time if
// needed.
log.D.Ln(ng.LogEntry("gossip tick"))
var e error
ads := ng.NodeAds.GetAsCerts()
for i := range ads {
if !ads[i].Expired() {
continue
}
s := splice.New(ads[i].Len())
ads[i].Encode(s)
if e = ng.topic.Publish(ng.ctx, s.GetAll()); fails(e) {
return
}
}
// After all that is done, check if we are shutting down, if so exit.
select {
case <-ng.ctx.Done():
break out
case <-tick.C:
}
}
}
// ErrWrongTypeDecode indicates a message has the wrong magic.
const ErrWrongTypeDecode = "magic '%s' but type is '%s'"
// HandleAd correctly recognises, validates, and stores the ads in the peerstore.
func (ng *Engine) HandleAd(p *pubsub.Message) (e error) {
if len(p.Data) < 1 {
log.E.Ln("received slice of no length")
return
}
if p.ReceivedFrom == ng.Listener.Host.ID() {
// Not sure why the Next function delivers these but they are not
// needed.
return
}
s := splice.NewFrom(p.Data)
c := reg.Recognise(s)
if c == nil {
return errors.New("ad not recognised")
}
if e = c.Decode(s); fails(e) {
return
}
var ok bool
var a cert.Act
if a, ok = c.(cert.Act); !ok {
return fmt.Errorf(ErrWrongTypeDecode,
addresses.Magic, reflect.TypeOf(c).String())
}
var id peer.ID
if id, e = a.GetID(); fails(e) {
return
}
if id == ng.Listener.Host.ID() {
log.T.Ln("ignoring own ad")
}
if !a.Validate() {
return errors.New(reflect.TypeOf(c).String() +
"ad failed validation")
}
magic := string(s.GetAll()[:magic2.Len])
log.D.Ln(ng.LogEntry("received"), reflect.TypeOf(c),
"from gossip network for node", a.PubKey().Fingerprint())
// If we got to here now we can add to the PeerStore.
log.D.Ln("storing value with magic", magic, s.GetAll()[:magic2.Len])
if e = ng.Listener.Host.
Peerstore().Put(id, magic, s.GetAll().ToBytes()); fails(e) {
return
}
return
}
// GetPeerRecord queries the peerstore for an ad from a given peer.ID and the ad
// type key.
//
// The ad type keys are the same as the Magic of each ad type, to be simple.
func (ng *Engine) GetPeerRecord(id peer.ID, key string) (add cert.Act, e error) {
var a interface{}
if a, e = ng.Listener.Host.Peerstore().Get(id, key); fails(e) {
return
}
var ok bool
var adb slice.Bytes
if adb, ok = a.(slice.Bytes); !ok {
e = errors.New("peer record did not decode slice.Bytes")
return
}
if len(adb) < 1 {
e = fmt.Errorf("record for peer ID %v key %s has expired", id, key)
}
s := splice.NewFrom(adb)
c := reg.Recognise(s)
if c == nil {
e = errors.New(key + " peer record not recognised")
return
}
if e = c.Decode(s); fails(e) {
return
}
if add, ok = c.(cert.Act); !ok {
e = errors.New(key + " peer record did not decode as Act")
}
return
}
// PeerstoreUpdate provides access to the badger.DB to walk the records with a
// closure, that can update the record.
func (ng *Engine) PeerstoreUpdate(fn func(txn *badger.Txn) error) (e error) {
return ng.Listener.Datastore.DB.Update(fn)
}
// PeerstoreView provides access to the badger.DB to walk the records with a
// closure, that can update the record.
func (ng *Engine) PeerstoreView(fn func(txn *badger.Txn) error) (e error) {
return ng.Listener.Datastore.DB.View(fn)
}
// ClearPeerRecord places an empty slice into a peer record by way of deleting it.
//
// todo: these should be purged from the peerstore in a GC pass.
// todo: Expiry and storage limits...
// todo: The PeerstoreUpdate function can delete records. (probably absent this why nobody uses it).
func (ng *Engine) ClearPeerRecord(id peer.ID, key string) (e error) {
if _, e = ng.Listener.Host.Peerstore().Get(id, key); fails(e) {
return
}
if e = ng.Listener.Host.Peerstore().Put(id, key, []byte{}); fails(e) {
}
return
}