refactor peer ad generator and scaffold gossip update service
This commit is contained in:
4
go.mod
4
go.mod
@@ -69,7 +69,7 @@ require (
|
||||
github.com/decred/dcrd/lru v1.0.0 // indirect
|
||||
github.com/dgraph-io/badger v1.6.2 // indirect
|
||||
github.com/dgraph-io/ristretto v0.1.1 // indirect
|
||||
github.com/docker/distribution v2.8.1+incompatible // indirect
|
||||
github.com/docker/distribution v2.8.2+incompatible // indirect
|
||||
github.com/docker/docker-credential-helpers v0.7.0 // indirect
|
||||
github.com/docker/go-connections v0.4.0 // indirect
|
||||
github.com/docker/go-units v0.5.0 // indirect
|
||||
@@ -187,7 +187,7 @@ require (
|
||||
github.com/onsi/ginkgo/v2 v2.9.2 // indirect
|
||||
github.com/opencontainers/go-digest v1.0.0 // indirect
|
||||
github.com/opencontainers/image-spec v1.0.3-0.20211202183452-c5a74bcca799 // indirect
|
||||
github.com/opencontainers/runc v1.1.4 // indirect
|
||||
github.com/opencontainers/runc v1.1.5 // indirect
|
||||
github.com/opencontainers/runtime-spec v1.0.3-0.20211123151946-c2389c3cb60a // indirect
|
||||
github.com/opentracing/opentracing-go v1.2.0 // indirect
|
||||
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
|
||||
|
||||
@@ -54,28 +54,41 @@ func GetMultiaddrs(n *node.Node) (ma []multiaddr.Multiaddr, e error) {
|
||||
return
|
||||
}
|
||||
|
||||
// GenerateAds takes a node.Node and creates the NodeAds matching it.
|
||||
func GenerateAds(n *node.Node, ld byte) (na *NodeAds, e error) {
|
||||
expiry := time.Now().Add(DefaultAdExpiry)
|
||||
var svcs []services2.Service
|
||||
func GetServices(n *node.Node) (svcs []services2.Service) {
|
||||
for i := range n.Services {
|
||||
svcs = append(svcs, services2.Service{
|
||||
Port: n.Services[i].Port,
|
||||
RelayRate: n.Services[i].RelayRate,
|
||||
})
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func GetAddresses(n *node.Node) (aps []*netip.AddrPort, e error) {
|
||||
var ma []multiaddr.Multiaddr
|
||||
if ma, e = GetMultiaddrs(n); fails(e) {
|
||||
return
|
||||
}
|
||||
addrPorts := make([]*netip.AddrPort, len(ma))
|
||||
aps = make([]*netip.AddrPort, len(ma))
|
||||
for i := range ma {
|
||||
var addy netip.AddrPort
|
||||
if addy, e = multi.AddrToAddrPort(ma[i]); fails(e) {
|
||||
var a netip.AddrPort
|
||||
if a, e = multi.AddrToAddrPort(ma[i]); fails(e) {
|
||||
return
|
||||
}
|
||||
addrPorts[i] = &addy
|
||||
aps[i] = &a
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// GenerateAds takes a node.Node and creates the NodeAds matching it.
|
||||
func GenerateAds(n *node.Node, ld byte) (na *NodeAds, e error) {
|
||||
expiry := time.Now().Add(DefaultAdExpiry)
|
||||
s := GetServices(n)
|
||||
ma, e := GetAddresses(n)
|
||||
if fails(e) {
|
||||
return
|
||||
}
|
||||
aps := make([]*netip.AddrPort, len(ma))
|
||||
na = &NodeAds{
|
||||
Peer: &peer.Ad{
|
||||
Ad: ad.Ad{
|
||||
@@ -91,7 +104,7 @@ func GenerateAds(n *node.Node, ld byte) (na *NodeAds, e error) {
|
||||
Key: n.Identity.Pub,
|
||||
Expiry: expiry,
|
||||
},
|
||||
Addresses: addrPorts,
|
||||
Addresses: aps,
|
||||
},
|
||||
Services: &services2.Ad{
|
||||
Ad: ad.Ad{
|
||||
@@ -99,7 +112,7 @@ func GenerateAds(n *node.Node, ld byte) (na *NodeAds, e error) {
|
||||
Key: n.Identity.Pub,
|
||||
Expiry: expiry,
|
||||
},
|
||||
Services: svcs,
|
||||
Services: s,
|
||||
},
|
||||
Load: &load.Ad{
|
||||
Ad: ad.Ad{
|
||||
|
||||
@@ -174,10 +174,10 @@ func (ng *Engine) Handler() (terminate bool) {
|
||||
ng.Shutdown()
|
||||
return true
|
||||
case c := <-ng.Listener.Accept():
|
||||
go func() {
|
||||
log.D.Ln("new connection inbound (TODO):", c.Host.Addrs())
|
||||
_ = c
|
||||
}()
|
||||
//go func() {
|
||||
log.D.Ln("new connection inbound (TODO):", c.Host.Addrs())
|
||||
_ = c
|
||||
//}()
|
||||
case b := <-ng.Mgr().ReceiveToLocalNode():
|
||||
s := splice.Load(b, slice.NewCursor())
|
||||
ng.HandleMessage(s, prev)
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"github.com/libp2p/go-libp2p/core/host"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"reflect"
|
||||
"time"
|
||||
|
||||
"github.com/indra-labs/indra/pkg/codec/ad/addresses"
|
||||
"github.com/indra-labs/indra/pkg/codec/ad/intro"
|
||||
@@ -74,6 +75,79 @@ func (ng *Engine) RunAdHandler(handler func(p *pubsub.Message) (e error)) {
|
||||
}
|
||||
return
|
||||
}(ng)
|
||||
|
||||
go func(ng *Engine) {
|
||||
log.D.Ln("checking and updating peer information ads")
|
||||
// First time we want to do the thing straight away and update the peers
|
||||
// with a new ads.NodeAds.
|
||||
ng.gossip(time.NewTicker(time.Second))
|
||||
// Then after this we check once a second
|
||||
}(ng)
|
||||
}
|
||||
|
||||
func (ng *Engine) gossip(tick *time.Ticker) {
|
||||
now := time.Now()
|
||||
out:
|
||||
for {
|
||||
// Check for already generated NodeAds, and make them first time if
|
||||
// needed.
|
||||
na := ng.NodeAds
|
||||
log.D.S(ng.Mgr().GetLocalNodeAddresses()[0].String() + " gossip tick")
|
||||
switch {
|
||||
case na.Address == nil:
|
||||
log.D.Ln("updating peer address")
|
||||
|
||||
fallthrough
|
||||
|
||||
case na.Load == nil:
|
||||
log.D.Ln("updating peer load")
|
||||
|
||||
fallthrough
|
||||
|
||||
case na.Peer == nil:
|
||||
log.D.Ln("updating peer ad")
|
||||
|
||||
fallthrough
|
||||
|
||||
case na.Services == nil &&
|
||||
// But only if we have any services:
|
||||
len(ng.Mgr().GetLocalNode().Services) > 0:
|
||||
log.D.Ln("updating services")
|
||||
|
||||
fallthrough
|
||||
// Next, check each entry has not expired:
|
||||
|
||||
case na.Address.Expiry.Before(now):
|
||||
log.D.Ln("updating expired peer address")
|
||||
|
||||
fallthrough
|
||||
|
||||
case na.Load.Expiry.Before(now):
|
||||
log.D.Ln("updating expired load ad")
|
||||
|
||||
fallthrough
|
||||
|
||||
case na.Peer.Expiry.Before(now):
|
||||
log.D.Ln("updating peer ad")
|
||||
|
||||
fallthrough
|
||||
|
||||
case na.Services.Expiry.Before(now):
|
||||
log.D.Ln("updating peer services")
|
||||
|
||||
}
|
||||
// Then, lastly, check if the ad content has changed due to
|
||||
// reconfiguration or other reasons such as a more substantial amount of
|
||||
// load or drop in load, or changed IP addresses.
|
||||
|
||||
// After all that is done, check if we are shutting down, if so exit.
|
||||
select {
|
||||
case <-ng.ctx.Done():
|
||||
break out
|
||||
case <-tick.C:
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// ErrWrongTypeDecode indicates a message has the wrong magic.
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
/home/loki/.local/share/gtimelog/timelog.txt
|
||||
Reference in New Issue
Block a user