Node key fingerprint for quickly distinguished node labels

The result of this better trace/log feature was it was found the peers were not gossiping to each other. This appears to be because they only gossip when connected.
This commit is contained in:
l0k18
2023-07-21 08:27:05 +01:00
parent bebef495a4
commit cf50b0245a
7 changed files with 134 additions and 119 deletions

View File

@@ -3,6 +3,7 @@ package crypto
import (
"crypto/rand"
"encoding/hex"
"strings"
"sync"
"github.com/decred/dcrd/dcrec/secp256k1/v4"
@@ -246,6 +247,18 @@ func (p *Prv) Zero() { (*secp256k1.PrivateKey)(p).Zero() }
// Pub is a public key.
type Pub secp256k1.PublicKey
// Fingerprint generates a compact and distinctive Based32 fingeprint to easily
// distinguish between many peers at a glance.
//
// It is generated with a SHA256 hash of the identity key snipped to yield an 8
// character string. The truncation is done after the string encoding.
func (k *Pub) Fingerprint() (fp string) {
kk := k.ToBytes()
b := sha256.Single(kk[:])
all, _ := based32.Codec.Encode(b[:])
return strings.ToUpper(all[:8])
}
// PubFromBased32 decodes a Based32 encoded form of the Pub.
func PubFromBased32(s string) (k *Pub, e error) {
ss := []byte(s)
@@ -262,6 +275,7 @@ func (k *Pub) ToBased32() (s string) {
b := k.ToBytes()
var e error
if s, e = based32.Codec.Encode(b[:]); fails(e) {
return e.Error()
}
ss := []byte(s)[3:]
return string(ss)

View File

@@ -89,7 +89,6 @@ func GenerateAds(n *node.Node, ld byte) (na *NodeAds, e error) {
if fails(e) {
return
}
log.T.S("ma", ma)
na = &NodeAds{
Peer: &peer.Ad{
Ad: ad.Ad{

View File

@@ -72,7 +72,6 @@ func New(p Params) (ng *Engine, e error) {
return
}
// The internal node 0 needs its address from the Listener:
log.T.S("addresses", p.Node.Addresses)
addrs := p.Listener.Host.Addrs()
out:
for i := range addrs {
@@ -85,7 +84,6 @@ out:
}
p.Node.Addresses = append(p.Node.Addresses, &ap)
}
log.D.S("addresses", p.Node.Addresses)
ctx, cancel := context.WithCancel(context.Background())
ng = &Engine{
ctx: ctx,
@@ -100,7 +98,7 @@ out:
}
ng.Mgr().AddNodes(append([]*node.Node{p.Node}, p.Nodes...)...)
if p.Listener != nil && p.Listener.Host != nil {
if ng.PubSub, ng.topic, ng.sub, e = SetupGossip(ctx, p.Listener.Host, cancel); fails(e) {
if ng.PubSub, ng.topic, ng.sub, e = ng.SetupGossip(ctx, p.Listener.Host, cancel); fails(e) {
return
}
}
@@ -109,7 +107,6 @@ out:
return
}
na := ng.NodeAds
log.T.S("na", na)
a := []cert.Act{na.Address, na.Load, na.Peer, na.Services}
for i := range a {
if e = a[i].Sign(ng.Mgr().GetLocalNodeIdentityPrv()); fails(e) {
@@ -118,7 +115,7 @@ out:
}
}
// First NodeAds after boot needs to be immediately gossiped:
ng.SendAds()
// Add return sessions for receiving responses, ideally more of these
// will be generated during operation and rotated out over time.
for i := 0; i < p.NReturnSessions; i++ {
@@ -144,7 +141,7 @@ func (ng *Engine) Shutdown() {
func (ng *Engine) Start() {
log.T.Ln("starting engine")
if ng.sub != nil {
log.T.Ln(ng.Listener.Host.Addrs(), "starting gossip handling")
log.T.Ln(ng.LogEntry("starting gossip handling"))
ng.RunAdHandler(ng.HandleAd)
}
for {
@@ -195,7 +192,7 @@ func (ng *Engine) HandleMessage(s *splice.Splice, pr ont.Onion) {
// Handler is the main select switch for handling events for the Engine.
func (ng *Engine) Handler() (terminate bool) {
log.T.Ln(ng.Listener.Host.Addrs(), " awaiting message")
log.T.Ln(ng.LogEntry("awaiting message"))
var prev ont.Onion
select {
case <-ng.ctx.Done():

View File

@@ -23,7 +23,7 @@ import (
// SetupGossip establishes a connection of a Host to the pubsub gossip network
// used by Indra to propagate peer metadata.
func SetupGossip(ctx context.Context, host host.Host,
func (ng *Engine) SetupGossip(ctx context.Context, host host.Host,
cancel func()) (PubSub *pubsub.PubSub, topic *pubsub.Topic,
sub *pubsub.Subscription, e error) {
@@ -39,11 +39,13 @@ func SetupGossip(ctx context.Context, host host.Host,
cancel()
return
}
log.T.Ln("subscribed to", PubSubTopic, "topic on gossip network")
log.T.Ln(ng.LogEntry("subscribed to"), PubSubTopic,
"topic on gossip network")
return
}
// SendAd dispatches an encoded byte slice ostensibly of a peer advertisement to gossip to the rest of the network.
// SendAd dispatches an encoded byte slice ostensibly of a peer advertisement to
// gossip to the rest of the network.
func (ng *Engine) SendAd(a slice.Bytes) (e error) {
return ng.topic.Publish(ng.ctx, a)
}
@@ -91,7 +93,7 @@ func (ng *Engine) RunAdHandler(handler func(p *pubsub.Message) (e error)) {
}(ng)
go func(ng *Engine) {
log.D.Ln(ng.Listener.Host.Addrs(), "checking and updating peer information ads")
log.D.Ln(ng.LogEntry("checking and updating peer information ads"))
// First time we want to do the thing straight away and update the peers
// with a new ads.NodeAds.
ng.gossip(time.NewTicker(time.Second))
@@ -99,6 +101,15 @@ func (ng *Engine) RunAdHandler(handler func(p *pubsub.Message) (e error)) {
}(ng)
}
// Fingerprint is a short identifier generated
func (ng *Engine) Fingerprint() (fp string) {
return ng.Mgr().GetLocalNode().Identity.Pub.Fingerprint()
}
func (ng *Engine) LogEntry(s string) (entry string) {
return fmt.Sprint(ng.Fingerprint(), " ", s)
}
func (ng *Engine) gossip(tick *time.Ticker) {
now := time.Now()
first := true
@@ -114,58 +125,53 @@ out:
// Check for already generated NodeAds, and make them first time if
// needed.
na := ng.NodeAds
log.D.Ln(ng.Listener.Host.Addrs(), "gossip tick")
log.D.Ln(ng.LogEntry("gossip tick"))
switch {
case na.Address == nil:
log.D.Ln("updating peer address")
log.D.Ln(ng.LogEntry("updating peer address"))
fallthrough
case na.Load == nil:
log.D.Ln("updating peer load")
log.D.Ln(ng.LogEntry("updating peer load"))
fallthrough
case na.Peer == nil:
log.D.Ln("updating peer ad")
log.D.Ln(ng.LogEntry("updating peer ad"))
fallthrough
case na.Services == nil &&
// But only if we have any services:
len(ng.Mgr().GetLocalNode().Services) > 0:
log.D.Ln("updating services")
log.D.Ln(ng.LogEntry("updating services"))
fallthrough
// Next, check each entry has not expired:
case na.Address.Expiry.Before(now):
log.D.Ln("updating expired peer address")
log.D.Ln(ng.LogEntry("updating expired peer address"))
fallthrough
case na.Load.Expiry.Before(now):
log.D.Ln("updating expired load ad")
log.D.Ln(ng.LogEntry("updating expired load ad"))
fallthrough
case na.Peer.Expiry.Before(now):
log.D.Ln("updating peer ad")
log.D.Ln(ng.LogEntry("updating peer ad"))
fallthrough
case na.Services.Expiry.Before(now):
log.D.Ln("updating peer services")
log.D.Ln(ng.LogEntry("updating peer services"))
}
// Then, lastly, check if the ad content has changed due to
// reconfiguration or other reasons such as a more substantial amount of
// load or drop in load, or changed IP addresses.
if first {
first = false
// Send out all ads because we are starting up.
}
// After all that is done, check if we are shutting down, if so exit.
select {
case <-ng.ctx.Done():
@@ -196,7 +202,8 @@ func (ng *Engine) HandleAd(p *pubsub.Message) (e error) {
var ok bool
switch c.(type) {
case *addresses.Ad:
log.D.Ln(ng.Listener.Host.Addrs()[0].String()+" received", reflect.TypeOf(c), "from gossip network")
log.D.Ln(ng.LogEntry(fmt.Sprint("received ", reflect.TypeOf(c),
" from gossip network")))
var addr *addresses.Ad
if addr, ok = c.(*addresses.Ad); !ok {
return fmt.Errorf(ErrWrongTypeDecode,
@@ -214,7 +221,6 @@ func (ng *Engine) HandleAd(p *pubsub.Message) (e error) {
return
}
case *intro.Ad:
log.D.Ln(ng.Listener.Host.Addrs()[0].String()+" received", reflect.TypeOf(c), "from gossip network")
var intr *intro.Ad
if intr, ok = c.(*intro.Ad); !ok {
return fmt.Errorf(ErrWrongTypeDecode,
@@ -222,6 +228,8 @@ func (ng *Engine) HandleAd(p *pubsub.Message) (e error) {
} else if !intr.Validate() {
return errors.New("intro ad failed validation")
}
log.D.Ln(ng.LogEntry("received"), reflect.TypeOf(c),
"from gossip network for node", intr.Key.Fingerprint())
// If we got to here now we can add to the PeerStore.
var id peer.ID
if id, e = peer.IDFromPublicKey(intr.Key); fails(e) {
@@ -232,7 +240,6 @@ func (ng *Engine) HandleAd(p *pubsub.Message) (e error) {
return
}
case *load.Ad:
log.D.Ln(ng.Listener.Host.Addrs()[0].String()+" received", reflect.TypeOf(c), "from gossip network")
var lod *load.Ad
if lod, ok = c.(*load.Ad); !ok {
return fmt.Errorf(ErrWrongTypeDecode,
@@ -240,17 +247,19 @@ func (ng *Engine) HandleAd(p *pubsub.Message) (e error) {
} else if !lod.Validate() {
return errors.New("load ad failed validation")
}
log.D.Ln(ng.LogEntry("received"), reflect.TypeOf(c),
"from gossip network for node", lod.Key.Fingerprint())
// If we got to here now we can add to the PeerStore.
var id peer.ID
if id, e = peer.IDFromPublicKey(lod.Key); fails(e) {
return
}
log.T.Ln(ng.LogEntry("storing ad"))
if e = ng.Listener.Host.
Peerstore().Put(id, services.Magic, s.GetAll().ToBytes()); fails(e) {
return
}
case *peer2.Ad:
log.D.Ln(ng.Listener.Host.Addrs()[0].String()+" received", reflect.TypeOf(c), "from gossip network")
var pa *peer2.Ad
if pa, ok = c.(*peer2.Ad); !ok {
return fmt.Errorf(ErrWrongTypeDecode,
@@ -258,6 +267,8 @@ func (ng *Engine) HandleAd(p *pubsub.Message) (e error) {
} else if !pa.Validate() {
return errors.New("peer ad failed validation")
}
log.D.Ln(ng.LogEntry("received"), reflect.TypeOf(c),
"from gossip network for node", pa.Key.Fingerprint())
// If we got to here now we can add to the PeerStore.
var id peer.ID
if id, e = peer.IDFromPublicKey(pa.Key); fails(e) {
@@ -268,7 +279,6 @@ func (ng *Engine) HandleAd(p *pubsub.Message) (e error) {
return
}
case *services.Ad:
log.D.Ln(ng.Listener.Host.Addrs()[0].String()+" received", reflect.TypeOf(c), "from gossip network")
var sa *services.Ad
if sa, ok = c.(*services.Ad); !ok {
return fmt.Errorf(ErrWrongTypeDecode,
@@ -276,6 +286,8 @@ func (ng *Engine) HandleAd(p *pubsub.Message) (e error) {
} else if !sa.Validate() {
return errors.New("services ad failed validation")
}
log.D.Ln(ng.LogEntry("received"), reflect.TypeOf(c),
"from gossip network for node", sa.Key.Fingerprint())
// If we got to here now we can add to the PeerStore.
var id peer.ID
if id, e = peer.IDFromPublicKey(sa.Key); fails(e) {

View File

@@ -3,15 +3,6 @@ package engine
import (
"context"
"github.com/indra-labs/indra"
"github.com/indra-labs/indra/pkg/codec/ad/addresses"
"github.com/indra-labs/indra/pkg/codec/ad/intro"
"github.com/indra-labs/indra/pkg/codec/ad/load"
"github.com/indra-labs/indra/pkg/codec/ad/peer"
"github.com/indra-labs/indra/pkg/codec/ad/services"
"github.com/indra-labs/indra/pkg/crypto/nonce"
"github.com/indra-labs/indra/pkg/util/multi"
"github.com/indra-labs/indra/pkg/util/splice"
"net/netip"
"testing"
"time"
@@ -22,83 +13,84 @@ func pauza() {
time.Sleep(time.Second)
}
func TestEngine_PeerStore(t *testing.T) {
if indra.CI == "false" {
log2.SetLogLevel(log2.Trace)
}
const nTotal = 10
var e error
var engines []*Engine
var cleanup func()
ctx, _ := context.WithCancel(context.Background())
engines, cleanup, e = CreateAndStartMockEngines(nTotal, ctx)
adz := engines[0].Listener.Host.Addrs()
addrs := make([]*netip.AddrPort, len(adz))
for i := range adz {
addy, _ := multi.AddrToAddrPort(adz[i])
addrs[i] = &addy
}
// To ensure every peer will get the gossip:
pauza()
newAddressAd := addresses.New(nonce.NewID(),
engines[0].Mgr().GetLocalNodeIdentityPrv(),
addrs,
time.Now().Add(time.Hour*24*7))
sa := splice.New(newAddressAd.Len())
if e = newAddressAd.Encode(sa); fails(e) {
t.FailNow()
}
if e = engines[0].SendAd(sa.GetAll()); fails(e) {
t.FailNow()
}
newIntroAd := intro.New(nonce.NewID(),
engines[0].Mgr().GetLocalNodeIdentityPrv(),
engines[0].Mgr().GetLocalNode().Identity.Pub,
20000, 443,
time.Now().Add(time.Hour*24*7))
si := splice.New(newIntroAd.Len())
if e = newIntroAd.Encode(si); fails(e) {
t.FailNow()
}
if e = engines[0].SendAd(si.GetAll()); fails(e) {
t.FailNow()
}
newLoadAd := load.New(nonce.NewID(),
engines[0].Mgr().GetLocalNodeIdentityPrv(),
17,
time.Now().Add(time.Hour*24*7))
sl := splice.New(newLoadAd.Len())
if e = newLoadAd.Encode(sl); fails(e) {
t.FailNow()
}
if e = engines[0].SendAd(sl.GetAll()); fails(e) {
t.FailNow()
}
newPeerAd := peer.New(nonce.NewID(),
engines[0].Mgr().GetLocalNodeIdentityPrv(),
20000,
time.Now().Add(time.Hour*24*7))
sp := splice.New(newPeerAd.Len())
if e = newPeerAd.Encode(sp); fails(e) {
t.FailNow()
}
if e = engines[0].SendAd(sp.GetAll()); fails(e) {
t.FailNow()
}
newServiceAd := services.New(nonce.NewID(),
engines[0].Mgr().GetLocalNodeIdentityPrv(),
[]services.Service{{20000, 54321}, {10000, 42221}},
time.Now().Add(time.Hour*24*7))
ss := splice.New(newServiceAd.Len())
if e = newServiceAd.Encode(ss); fails(e) {
t.FailNow()
}
if e = engines[0].SendAd(ss.GetAll()); fails(e) {
t.FailNow()
}
pauza()
cleanup()
}
//
// func TestEngine_PeerStore(t *testing.T) {
// if indra.CI == "false" {
// log2.SetLogLevel(log2.Trace)
// }
// const nTotal = 10
// var e error
// var engines []*Engine
// var cleanup func()
// ctx, _ := context.WithCancel(context.Background())
// engines, cleanup, e = CreateAndStartMockEngines(nTotal, ctx)
// adz := engines[0].Listener.Host.Addrs()
// addrs := make([]*netip.AddrPort, len(adz))
// for i := range adz {
// addy, _ := multi.AddrToAddrPort(adz[i])
// addrs[i] = &addy
// }
// // To ensure every peer will get the gossip:
// pauza()
// newAddressAd := addresses.New(nonce.NewID(),
// engines[0].Mgr().GetLocalNodeIdentityPrv(),
// addrs,
// time.Now().Add(time.Hour*24*7))
// sa := splice.New(newAddressAd.Len())
// if e = newAddressAd.Encode(sa); fails(e) {
// t.FailNow()
// }
// if e = engines[0].SendAd(sa.GetAll()); fails(e) {
// t.FailNow()
// }
// newIntroAd := intro.New(nonce.NewID(),
// engines[0].Mgr().GetLocalNodeIdentityPrv(),
// engines[0].Mgr().GetLocalNode().Identity.Pub,
// 20000, 443,
// time.Now().Add(time.Hour*24*7))
// si := splice.New(newIntroAd.Len())
// if e = newIntroAd.Encode(si); fails(e) {
// t.FailNow()
// }
// if e = engines[0].SendAd(si.GetAll()); fails(e) {
// t.FailNow()
// }
// newLoadAd := load.New(nonce.NewID(),
// engines[0].Mgr().GetLocalNodeIdentityPrv(),
// 17,
// time.Now().Add(time.Hour*24*7))
// sl := splice.New(newLoadAd.Len())
// if e = newLoadAd.Encode(sl); fails(e) {
// t.FailNow()
// }
// if e = engines[0].SendAd(sl.GetAll()); fails(e) {
// t.FailNow()
// }
// newPeerAd := peer.New(nonce.NewID(),
// engines[0].Mgr().GetLocalNodeIdentityPrv(),
// 20000,
// time.Now().Add(time.Hour*24*7))
// sp := splice.New(newPeerAd.Len())
// if e = newPeerAd.Encode(sp); fails(e) {
// t.FailNow()
// }
// if e = engines[0].SendAd(sp.GetAll()); fails(e) {
// t.FailNow()
// }
// newServiceAd := services.New(nonce.NewID(),
// engines[0].Mgr().GetLocalNodeIdentityPrv(),
// []services.Service{{20000, 54321}, {10000, 42221}},
// time.Now().Add(time.Hour*24*7))
// ss := splice.New(newServiceAd.Len())
// if e = newServiceAd.Encode(ss); fails(e) {
// t.FailNow()
// }
// if e = engines[0].SendAd(ss.GetAll()); fails(e) {
// t.FailNow()
// }
// pauza()
// cleanup()
// }
func TestEngine_PeerStoreDiscovery(t *testing.T) {
if indra.CI == "false" {

View File

@@ -22,6 +22,7 @@ func NewDHT(ctx context.Context, host host.Host,
}
options = append(options,
dht.ProtocolPrefix(IndraLibP2PID),
dht.EnableOptimisticProvide(),
)
if d, e = dht.New(ctx, host, options...); fails(e) {
return

View File

@@ -277,9 +277,9 @@ func (s *Splice) Pubkey(from *crypto.Pub) *Splice {
func (s *Splice) ReadAddrPort(ap **netip.AddrPort) *Splice {
*ap = &netip.AddrPort{}
apLen := s.b[*s.c]
log.T.Ln("apLen", apLen)
// log.T.Ln("apLen", apLen)
apBytes := s.b[s.c.Inc(1):s.c.Inc(AddrLen)]
log.T.S("addrport", apBytes.ToBytes())
// log.T.S("addrport", apBytes.ToBytes())
if s.E = (*ap).UnmarshalBinary(apBytes[:apLen]); fails(s.E) {
}
s.Segments = append(s.Segments,