From e315ebd3ddaff333f0c76020350a6da79a2d991c Mon Sep 17 00:00:00 2001 From: l0k18 Date: Sat, 17 Jun 2023 10:10:00 +0100 Subject: [PATCH 1/8] all ads now properly validated before storing --- pkg/engine/peerstore.go | 145 ++++++++++++++++++---------- pkg/onions/adaddress/adaddress.go | 10 +- pkg/onions/adintro/adintro.go | 19 ++-- pkg/onions/adload/adload.go | 36 +++---- pkg/onions/adproto/adproto.go | 8 +- pkg/onions/adservices/adservices.go | 16 ++- 6 files changed, 132 insertions(+), 102 deletions(-) diff --git a/pkg/engine/peerstore.go b/pkg/engine/peerstore.go index 40ad83f2..89d959b1 100644 --- a/pkg/engine/peerstore.go +++ b/pkg/engine/peerstore.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "github.com/indra-labs/indra/pkg/ad" "github.com/indra-labs/indra/pkg/onions/adload" "github.com/indra-labs/indra/pkg/util/slice" "github.com/libp2p/go-libp2p/core/peer" @@ -67,61 +68,67 @@ func (ng *Engine) HandleAd(p *pubsub.Message, ctx context.Context) (e error) { if e = c.Decode(s); fails(e) { return } + var ok bool switch c.(type) { case *adaddress.Ad: log.D.Ln("received", reflect.TypeOf(c), "from gossip network") - if addr, ok := c.(*adaddress.Ad); !ok { + var addr *adaddress.Ad + if addr, ok = c.(*adaddress.Ad); !ok { return fmt.Errorf(ErrWrongTypeDecode, adaddress.Magic, reflect.TypeOf(c).String()) - } else { - // If we got to here now we can add to the PeerStore. - log.D.S("new ad for address:", c) - var id peer.ID - if id, e = peer.IDFromPublicKey(addr.Key); fails(e) { - return - } - if e = ng.Listener.Host. - Peerstore().Put(id, "services", s.GetAll().ToBytes()); fails(e) { - return - } + } else if !addr.Validate() { + return errors.New("addr ad failed validation") + } + // If we got to here now we can add to the PeerStore. + log.D.S("new ad for address:", c) + var id peer.ID + if id, e = peer.IDFromPublicKey(addr.Key); fails(e) { + return + } + if e = ng.Listener.Host. + Peerstore().Put(id, "services", s.GetAll().ToBytes()); fails(e) { + return } case *adintro.Ad: log.D.Ln("received", reflect.TypeOf(c), "from gossip network") - if intr, ok := c.(*adintro.Ad); !ok { + var intr *adintro.Ad + if intr, ok = c.(*adintro.Ad); !ok { return fmt.Errorf(ErrWrongTypeDecode, adintro.Magic, reflect.TypeOf(c).String()) - } else { - // If we got to here now we can add to the PeerStore. - log.D.S("new ad for intro:", c) - var id peer.ID - if id, e = peer.IDFromPublicKey(intr.Key); fails(e) { - return - } - if e = ng.Listener.Host. - Peerstore().Put(id, "services", s.GetAll().ToBytes()); fails(e) { - return - } + } else if !intr.Validate() { + return errors.New("intro ad failed validation") + } + // If we got to here now we can add to the PeerStore. + log.D.S("new ad for intro:", c) + var id peer.ID + if id, e = peer.IDFromPublicKey(intr.Key); fails(e) { + return + } + if e = ng.Listener.Host. + Peerstore().Put(id, "services", s.GetAll().ToBytes()); fails(e) { + return } case *adload.Ad: log.D.Ln("received", reflect.TypeOf(c), "from gossip network") - if lod, ok := c.(*adload.Ad); !ok { + var lod *adload.Ad + if lod, ok = c.(*adload.Ad); !ok { return fmt.Errorf(ErrWrongTypeDecode, adaddress.Magic, reflect.TypeOf(c).String()) - } else { - // If we got to here now we can add to the PeerStore. - log.D.S("new ad for load:", c) - var id peer.ID - if id, e = peer.IDFromPublicKey(lod.Key); fails(e) { - return - } - if e = ng.Listener.Host. - Peerstore().Put(id, "services", s.GetAll().ToBytes()); fails(e) { - return - } + } else if !lod.Validate() { + return errors.New("load ad failed validation") + } + // If we got to here now we can add to the PeerStore. + log.D.S("new ad for load:", c) + var id peer.ID + if id, e = peer.IDFromPublicKey(lod.Key); fails(e) { + return + } + if e = ng.Listener.Host. + Peerstore().Put(id, "services", s.GetAll().ToBytes()); fails(e) { + return } case *adpeer.Ad: log.D.Ln("received", reflect.TypeOf(c), "from gossip network") - var ok bool var pa *adpeer.Ad if pa, ok = c.(*adpeer.Ad); !ok { return fmt.Errorf(ErrWrongTypeDecode, @@ -141,21 +148,61 @@ func (ng *Engine) HandleAd(p *pubsub.Message, ctx context.Context) (e error) { } case *adservices.Ad: log.D.Ln("received", reflect.TypeOf(c), "from gossip network") - if serv, ok := c.(*adservices.Ad); !ok { + var sa *adservices.Ad + if sa, ok = c.(*adservices.Ad); !ok { return fmt.Errorf(ErrWrongTypeDecode, adservices.Magic, reflect.TypeOf(c).String()) - } else { - // If we got to here now we can add to the PeerStore. - log.D.S("new ad for services:", c) - var id peer.ID - if id, e = peer.IDFromPublicKey(serv.Key); fails(e) { - return - } - if e = ng.Listener.Host. - Peerstore().Put(id, "services", s.GetAll().ToBytes()); fails(e) { - return - } + } else if !sa.Validate() { + return errors.New("services ad failed validation") + } + // If we got to here now we can add to the PeerStore. + log.D.S("new ad for services:", c) + var id peer.ID + if id, e = peer.IDFromPublicKey(sa.Key); fails(e) { + return + } + if e = ng.Listener.Host. + Peerstore().Put(id, "services", s.GetAll().ToBytes()); fails(e) { + return } } return } + +func (ng *Engine) GetPeerRecord(id peer.ID, key string) (add ad.Ad, e error) { + var a interface{} + if a, e = ng.Listener.Host.Peerstore().Get(id, key); fails(e) { + return + } + + var ok bool + var adb slice.Bytes + if adb, ok = a.(slice.Bytes); !ok { + e = errors.New("peer record did not decode slice.Bytes") + return + } + s := splice.NewFrom(adb) + c := reg.Recognise(s) + if c == nil { + e = errors.New("message not recognised") + return + } + if e = c.Decode(s); fails(e) { + return + } + if add, ok = c.(ad.Ad); !ok { + e = errors.New("peer record did not decode as Ad") + } + return +} + +func (ng *Engine) ClearPeerRecord(id peer.ID, key string) (add ad.Ad, e error) { + if _, e = ng.Listener.Host.Peerstore().Get(id, key); fails(e) { + return + } + if e = ng.Listener.Host. + Peerstore().Put(id, key, []byte{}); fails(e) { + return + } + return +} diff --git a/pkg/onions/adaddress/adaddress.go b/pkg/onions/adaddress/adaddress.go index 2caa9628..d8a6ac18 100644 --- a/pkg/onions/adaddress/adaddress.go +++ b/pkg/onions/adaddress/adaddress.go @@ -87,11 +87,11 @@ func (x *Ad) Len() int { return Len } func (x *Ad) Magic() string { return Magic } func (x *Ad) Splice(s *splice.Splice) { - x.SpliceWithoutSig(s) + x.SpliceNoSig(s) s.Signature(x.Sig) } -func (x *Ad) SpliceWithoutSig(s *splice.Splice) { +func (x *Ad) SpliceNoSig(s *splice.Splice) { var e error var ap netip.AddrPort if ap, e = multi.AddrToAddrPort(x.Addr); fails(e) { @@ -107,13 +107,13 @@ func (x *Ad) SpliceWithoutSig(s *splice.Splice) { func (x *Ad) Validate() bool { s := splice.New(Len - magic.Len) - x.SpliceWithoutSig(s) + x.SpliceNoSig(s) hash := sha256.Single(s.GetUntil(s.GetCursor())) key, e := x.Sig.Recover(hash) if fails(e) { return false } - if key.Equals(x.Key) { + if key.Equals(x.Key) && x.Expiry.After(time.Now()) { return true } return false @@ -133,7 +133,7 @@ func New(id nonce.ID, key *crypto.Prv, Addr: ma, } s := splice.New(Len) - peerAd.SpliceWithoutSig(s) + peerAd.SpliceNoSig(s) hash := sha256.Single(s.GetUntil(s.GetCursor())) var e error if peerAd.Sig, e = crypto.Sign(key, hash); fails(e) { diff --git a/pkg/onions/adintro/adintro.go b/pkg/onions/adintro/adintro.go index 27749f9f..d44aca50 100644 --- a/pkg/onions/adintro/adintro.go +++ b/pkg/onions/adintro/adintro.go @@ -125,25 +125,24 @@ func New( ) (in *Ad) { pk := crypto.DerivePub(key) - s := splice.New(Len) - IntroSplice(s, id, pk, ap, relayRate, port, expires) - hash := sha256.Single(s.GetUntil(s.GetCursor())) - var e error - var sign crypto.SigBytes - if sign, e = crypto.Sign(key, hash); fails(e) { - return nil - } + in = &Ad{ Ad: adproto.Ad{ ID: id, Key: pk, - Expiry: time.Now().Add(adproto.TTL), - Sig: sign, + Expiry: expires, }, AddrPort: ap, RelayRate: relayRate, Port: port, } + s := splice.New(in.Len()) + in.SpliceNoSig(s) + hash := sha256.Single(s.GetUntil(s.GetCursor())) + var e error + if in.Sig, e = crypto.Sign(key, hash); fails(e) { + return nil + } return } diff --git a/pkg/onions/adload/adload.go b/pkg/onions/adload/adload.go index 0ebc8dc0..2870109a 100644 --- a/pkg/onions/adload/adload.go +++ b/pkg/onions/adload/adload.go @@ -7,7 +7,6 @@ import ( "github.com/indra-labs/indra/pkg/engine/coding" "github.com/indra-labs/indra/pkg/engine/magic" "github.com/indra-labs/indra/pkg/engine/sess" - "github.com/indra-labs/indra/pkg/onions/adintro" "github.com/indra-labs/indra/pkg/onions/adproto" "github.com/indra-labs/indra/pkg/onions/reg" log2 "github.com/indra-labs/indra/pkg/proc/log" @@ -24,7 +23,7 @@ var ( const ( Magic = "load" - Len = adproto.Len +1 + Len = adproto.Len + 1 ) // Ad stores a specification for the fee rate and existence of a peer. @@ -39,24 +38,22 @@ var _ coding.Codec = &Ad{} func New(id nonce.ID, key *crypto.Prv, load byte, expiry time.Time) (sv *Ad) { - s := splice.New(adintro.Len) k := crypto.DerivePub(key) - Splice(s, id, k, load, expiry) - hash := sha256.Single(s.GetUntil(s.GetCursor())) - var e error - var sign crypto.SigBytes - if sign, e = crypto.Sign(key, hash); fails(e) { - return nil - } sv = &Ad{ Ad: adproto.Ad{ ID: id, Key: k, - Expiry: time.Now().Add(adproto.TTL), - Sig: sign, + Expiry: expiry, }, Load: load, } + s := splice.New(sv.Len()) + sv.SpliceNoSig(s) + hash := sha256.Single(s.GetUntil(s.GetCursor())) + var e error + if sv.Sig, e = crypto.Sign(key, hash); fails(e) { + return nil + } return } @@ -83,17 +80,6 @@ func (x *Ad) Len() int { return Len } func (x *Ad) Magic() string { return "" } -func (x *Ad) Sign(prv *crypto.Prv) (e error) { - s := splice.New(x.Len()) - x.SpliceNoSig(s) - var b crypto.SigBytes - if b, e = crypto.Sign(prv, sha256.Single(s.GetUntil(s.GetCursor()))); fails(e) { - return - } - copy(x.Sig[:], b[:]) - return nil -} - func (x *Ad) Splice(s *splice.Splice) { x.SpliceNoSig(s) s.Signature(x.Sig) @@ -104,14 +90,14 @@ func (x *Ad) SpliceNoSig(s *splice.Splice) { } func (x *Ad) Validate() (valid bool) { - s := splice.New(adintro.Len - magic.Len) + s := splice.New(x.Len() - magic.Len) x.SpliceNoSig(s) hash := sha256.Single(s.GetUntil(s.GetCursor())) key, e := x.Sig.Recover(hash) if fails(e) { return false } - if key.Equals(x.Key) { + if key.Equals(x.Key) && x.Expiry.After(time.Now()) { return true } return false diff --git a/pkg/onions/adproto/adproto.go b/pkg/onions/adproto/adproto.go index d7e922e8..4705f774 100644 --- a/pkg/onions/adproto/adproto.go +++ b/pkg/onions/adproto/adproto.go @@ -76,11 +76,11 @@ func (x *Ad) Len() int { return Len } func (x *Ad) Magic() string { return Magic } func (x *Ad) Splice(s *splice.Splice) { - x.SpliceWithoutSig(s) + x.SpliceNoSig(s) s.Signature(x.Sig) } -func (x *Ad) SpliceWithoutSig(s *splice.Splice) { +func (x *Ad) SpliceNoSig(s *splice.Splice) { s.Magic(Magic). ID(x.ID). Pubkey(x.Key). @@ -89,7 +89,7 @@ func (x *Ad) SpliceWithoutSig(s *splice.Splice) { func (x *Ad) Validate() bool { s := splice.New(Len) - x.SpliceWithoutSig(s) + x.SpliceNoSig(s) hash := sha256.Single(s.GetUntil(s.GetCursor())) key, e := x.Sig.Recover(hash) if fails(e) { @@ -111,7 +111,7 @@ func New(id nonce.ID, key *crypto.Prv, Expiry: expiry, } s := splice.New(Len - magic.Len) - protoAd.SpliceWithoutSig(s) + protoAd.SpliceNoSig(s) hash := sha256.Single(s.GetUntil(s.GetCursor())) var e error if protoAd.Sig, e = crypto.Sign(key, hash); fails(e) { diff --git a/pkg/onions/adservices/adservices.go b/pkg/onions/adservices/adservices.go index 4d9b4ab4..965fbf07 100644 --- a/pkg/onions/adservices/adservices.go +++ b/pkg/onions/adservices/adservices.go @@ -49,24 +49,22 @@ var _ coding.Codec = &Ad{} func New(id nonce.ID, key *crypto.Prv, services []Service, expiry time.Time) (sv *Ad) { - s := splice.New(adintro.Len) k := crypto.DerivePub(key) - ServiceSplice(s, id, k, services, expiry) - hash := sha256.Single(s.GetUntil(s.GetCursor())) - var e error - var sign crypto.SigBytes - if sign, e = crypto.Sign(key, hash); fails(e) { - return nil - } sv = &Ad{ Ad: adproto.Ad{ ID: id, Key: k, Expiry: time.Now().Add(adproto.TTL), - Sig: sign, }, Services: services, } + s := splice.New(adintro.Len) + sv.SpliceNoSig(s) + hash := sha256.Single(s.GetUntil(s.GetCursor())) + var e error + if sv.Sig, e = crypto.Sign(key, hash); fails(e) { + return nil + } return } From a980fcab4e0fce8131770fa3dc5ce7ee00d5e2b2 Mon Sep 17 00:00:00 2001 From: l0k18 Date: Sat, 17 Jun 2023 10:47:16 +0100 Subject: [PATCH 2/8] using magic also for db key --- pkg/engine/peerstore.go | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/pkg/engine/peerstore.go b/pkg/engine/peerstore.go index 89d959b1..0bb5af59 100644 --- a/pkg/engine/peerstore.go +++ b/pkg/engine/peerstore.go @@ -63,7 +63,7 @@ func (ng *Engine) HandleAd(p *pubsub.Message, ctx context.Context) (e error) { s := splice.NewFrom(p.Data) c := reg.Recognise(s) if c == nil { - return errors.New("message not recognised") + return errors.New("ad not recognised") } if e = c.Decode(s); fails(e) { return @@ -86,7 +86,7 @@ func (ng *Engine) HandleAd(p *pubsub.Message, ctx context.Context) (e error) { return } if e = ng.Listener.Host. - Peerstore().Put(id, "services", s.GetAll().ToBytes()); fails(e) { + Peerstore().Put(id, adaddress.Magic, s.GetAll().ToBytes()); fails(e) { return } case *adintro.Ad: @@ -105,7 +105,7 @@ func (ng *Engine) HandleAd(p *pubsub.Message, ctx context.Context) (e error) { return } if e = ng.Listener.Host. - Peerstore().Put(id, "services", s.GetAll().ToBytes()); fails(e) { + Peerstore().Put(id, adintro.Magic, s.GetAll().ToBytes()); fails(e) { return } case *adload.Ad: @@ -124,7 +124,7 @@ func (ng *Engine) HandleAd(p *pubsub.Message, ctx context.Context) (e error) { return } if e = ng.Listener.Host. - Peerstore().Put(id, "services", s.GetAll().ToBytes()); fails(e) { + Peerstore().Put(id, adservices.Magic, s.GetAll().ToBytes()); fails(e) { return } case *adpeer.Ad: @@ -143,7 +143,7 @@ func (ng *Engine) HandleAd(p *pubsub.Message, ctx context.Context) (e error) { return } if e = ng.Listener.Host. - Peerstore().Put(id, "peer", s.GetAll().ToBytes()); fails(e) { + Peerstore().Put(id, adpeer.Magic, s.GetAll().ToBytes()); fails(e) { return } case *adservices.Ad: @@ -162,7 +162,7 @@ func (ng *Engine) HandleAd(p *pubsub.Message, ctx context.Context) (e error) { return } if e = ng.Listener.Host. - Peerstore().Put(id, "services", s.GetAll().ToBytes()); fails(e) { + Peerstore().Put(id, adservices.Magic, s.GetAll().ToBytes()); fails(e) { return } } @@ -174,24 +174,26 @@ func (ng *Engine) GetPeerRecord(id peer.ID, key string) (add ad.Ad, e error) { if a, e = ng.Listener.Host.Peerstore().Get(id, key); fails(e) { return } - var ok bool var adb slice.Bytes if adb, ok = a.(slice.Bytes); !ok { e = errors.New("peer record did not decode slice.Bytes") return } + if len(adb) < 1 { + e = fmt.Errorf("record for peer ID %v key %s has expired", id, key) + } s := splice.NewFrom(adb) c := reg.Recognise(s) if c == nil { - e = errors.New("message not recognised") + e = errors.New(key + " peer record not recognised") return } if e = c.Decode(s); fails(e) { return } if add, ok = c.(ad.Ad); !ok { - e = errors.New("peer record did not decode as Ad") + e = errors.New(key + " peer record did not decode as Ad") } return } From 9f664db500fa153f38ed1a80d47846246f2741a8 Mon Sep 17 00:00:00 2001 From: l0k18 Date: Mon, 19 Jun 2023 09:20:03 +0100 Subject: [PATCH 3/8] remove gossip method, start ad handling for node --- pkg/ad/ad.go | 41 ------------- pkg/engine/ads.go | 72 +++++++++++++++++++++++ pkg/engine/engine.go | 19 +++--- pkg/engine/node/node.go | 4 +- pkg/engine/peerstore.go | 3 +- pkg/engine/services/services.go | 2 +- pkg/engine/sess/sessionmanager.go | 2 +- pkg/onions/adaddress/adaddress.go | 10 ---- pkg/onions/adintro/adintro.go | 8 --- pkg/onions/adload/adload.go | 4 -- pkg/onions/adpeer/adpeer.go | 4 -- pkg/onions/adproto/adproto.go | 10 ---- pkg/onions/adservices/adservices.go | 4 -- pkg/onions/balance/balance.go | 4 +- pkg/onions/exit/exit.go | 6 +- pkg/onions/forward/forward.go | 4 +- pkg/onions/getbalance/getbalance.go | 4 +- pkg/onions/hiddenservice/hiddenservice.go | 2 +- pkg/onions/introquery/introquery.go | 4 +- pkg/onions/ont/interfaces.go | 2 +- pkg/onions/response/response.go | 2 +- pkg/onions/reverse/reverse.go | 2 +- pkg/util/multi/multiaddr.go | 15 +++++ 23 files changed, 117 insertions(+), 111 deletions(-) create mode 100644 pkg/engine/ads.go diff --git a/pkg/ad/ad.go b/pkg/ad/ad.go index b89087b5..c08b2942 100644 --- a/pkg/ad/ad.go +++ b/pkg/ad/ad.go @@ -2,10 +2,7 @@ package ad import ( "github.com/indra-labs/indra/pkg/engine/coding" - "github.com/indra-labs/indra/pkg/engine/sess" log2 "github.com/indra-labs/indra/pkg/proc/log" - "github.com/indra-labs/indra/pkg/util/cryptorand" - "github.com/indra-labs/indra/pkg/util/qu" "github.com/indra-labs/indra/pkg/util/splice" ) @@ -20,42 +17,4 @@ type Ad interface { coding.Codec Splice(s *splice.Splice) Validate() bool - Gossip(sm *sess.Manager, c qu.C) -} - -// Gossip writes a new Ad out to the p2p network. -// -// todo: this will be changed to use the engine host peer store. An interface -// -// will be required. -func Gossip(x Ad, sm *sess.Manager, c qu.C) { - done := qu.T() - msg := splice.New(x.Len()) - if fails(x.Encode(msg)) { - return - } - nPeers := sm.NodesLen() - peerIndices := make([]int, nPeers) - for i := 1; i < nPeers; i++ { - peerIndices[i] = i - } - cryptorand.Shuffle(nPeers, func(i, j int) { - peerIndices[i], peerIndices[j] = peerIndices[j], peerIndices[i] - }) - var cursor int - for { - select { - case <-c.Wait(): - return - case <-done: - return - default: - } - n := sm.FindNodeByIndex(peerIndices[cursor]) - n.Transport.Send(msg.GetAll()) - cursor++ - if cursor > len(peerIndices)-1 { - break - } - } } diff --git a/pkg/engine/ads.go b/pkg/engine/ads.go new file mode 100644 index 00000000..42146e0a --- /dev/null +++ b/pkg/engine/ads.go @@ -0,0 +1,72 @@ +package engine + +import ( + "github.com/indra-labs/indra/pkg/crypto/nonce" + "github.com/indra-labs/indra/pkg/engine/node" + "github.com/indra-labs/indra/pkg/onions/adaddress" + "github.com/indra-labs/indra/pkg/onions/adpeer" + "github.com/indra-labs/indra/pkg/onions/adproto" + "github.com/indra-labs/indra/pkg/onions/adservices" + "github.com/indra-labs/indra/pkg/util/multi" + "github.com/multiformats/go-multiaddr" + "time" +) + +const DefaultAdExpiry = time.Hour * 24 * 7 // one week + +type NodeAds struct { + Peer adpeer.Ad + Address adaddress.Ad + Services adservices.Ad +} + +func GetMultiaddr(n *node.Node) (ma multiaddr.Multiaddr, e error) { + if ma, e = multi.AddrFromAddrPort(*n.AddrPort); fails(e) { + return + } + ma = multi.AddKeyToMultiaddr(ma, n.Identity.Pub) + return +} + +func GenerateAds(n *node.Node) (na *NodeAds, e error) { + expiry := time.Now().Add(DefaultAdExpiry) + var svcs []adservices.Service + for i := range n.Services { + svcs = append(svcs, adservices.Service{ + Port: n.Services[i].Port, + RelayRate: uint32(n.Services[i].RelayRate), + }) + } + var ma multiaddr.Multiaddr + if ma, e = multi.AddrFromAddrPort(*n.AddrPort); fails(e) { + return + } + ma = multi.AddKeyToMultiaddr(ma, n.Identity.Pub) + na = &NodeAds{ + Peer: adpeer.Ad{ + Ad: adproto.Ad{ + ID: nonce.NewID(), + Key: n.Identity.Pub, + Expiry: expiry, + }, + RelayRate: n.RelayRate, + }, + Address: adaddress.Ad{ + Ad: adproto.Ad{ + ID: nonce.NewID(), + Key: n.Identity.Pub, + Expiry: expiry, + }, + Addr: ma, + }, + Services: adservices.Ad{ + Ad: adproto.Ad{ + ID: nonce.NewID(), + Key: n.Identity.Pub, + Expiry: expiry, + }, + Services: svcs, + }, + } + return +} diff --git a/pkg/engine/engine.go b/pkg/engine/engine.go index 63088693..6369eaae 100644 --- a/pkg/engine/engine.go +++ b/pkg/engine/engine.go @@ -77,7 +77,7 @@ func (ng *Engine) HandleMessage(s *splice.Splice, pr ont.Onion) { return } if pr != nil && on.Magic() != pr.Magic() { - log.D.S("",s.GetAll().ToBytes()) + log.D.S("", s.GetAll().ToBytes()) } m := on.GetOnion() if m == nil { @@ -102,7 +102,7 @@ func (ng *Engine) Handler() (out bool) { break case c := <-ng.Listener.Accept(): go func() { - log.D.Ln("new connection inbound (TODO):", c.Host.Addrs()[0]) + log.D.Ln("new connection inbound (TODO):", c.Host.Addrs()) _ = c }() case b := <-ng.Manager.ReceiveToLocalNode(): @@ -151,11 +151,11 @@ func (ng *Engine) Handler() (out bool) { return } -func (ng *Engine) Keyset() *crypto.KeySet { return ng.KeySet } -func (ng *Engine) KillSwitch() <-chan struct{} { return ng.ctx.Done() } -func (ng *Engine) Mgr() *sess.Manager { return ng.Manager } -func (ng *Engine) Pending() *responses.Pending { return ng.Responses } -func (ng *Engine) SetLoad(load byte) { ng.Load.Store(uint32(load)) } +func (ng *Engine) Keyset() *crypto.KeySet { return ng.KeySet } +func (ng *Engine) WaitForShutdown() <-chan struct{} { return ng.ctx.Done() } +func (ng *Engine) Mgr() *sess.Manager { return ng.Manager } +func (ng *Engine) Pending() *responses.Pending { return ng.Responses } +func (ng *Engine) SetLoad(load byte) { ng.Load.Store(uint32(load)) } // Shutdown triggers the shutdown of the client and the Cleanup before // finishing. @@ -183,6 +183,7 @@ func (ng *Engine) Start() { } } +// New creates a new Engine according to the Params given. func New(p Params) (c *Engine, e error) { p.Node.Transport = p.Transport p.Node.Identity = p.Keys @@ -201,7 +202,7 @@ func New(p Params) (c *Engine, e error) { h: hidden.NewHiddenrouting(), Pause: qu.T(), } - if p.Listener.Host != nil { + if p.Listener != nil && p.Listener.Host != nil { if c.PubSub, e = pubsub.NewGossipSub(ctx, p.Listener.Host); fails(e) { cancel() return @@ -215,7 +216,7 @@ func New(p Params) (c *Engine, e error) { log.T.Ln("subscribed to", PubSubTopic, "topic on gossip network") } c.Manager.AddNodes(append([]*node.Node{p.Node}, p.Nodes...)...) - // AddIntro a return session for receiving responses, ideally more of these + // Add return sessions for receiving responses, ideally more of these // will be generated during operation and rotated out over time. for i := 0; i < p.NReturnSessions; i++ { c.Manager.AddSession(sessions.NewSessionData(nonce.NewID(), p.Node, 0, diff --git a/pkg/engine/node/node.go b/pkg/engine/node/node.go index a3079411..f6215f1c 100644 --- a/pkg/engine/node/node.go +++ b/pkg/engine/node/node.go @@ -29,7 +29,7 @@ type Node struct { sync.Mutex AddrPort *netip.AddrPort Identity *crypto.Keys - RelayRate int // Base relay price mSAT/Mb. + RelayRate uint32 // Base relay price mSAT/Mb. Services services.Services // Services offered by this peer. payments.Chan Transport tpt.Transport @@ -38,7 +38,7 @@ type Node struct { // NewNode creates a new Node. The transport should be from either dialing out or // a peer dialing in and the self model does not need to do this. func NewNode(addr *netip.AddrPort, keys *crypto.Keys, tpt tpt.Transport, - relayRate int) (n *Node, id nonce.ID) { + relayRate uint32) (n *Node, id nonce.ID) { id = nonce.NewID() n = &Node{ ID: id, diff --git a/pkg/engine/peerstore.go b/pkg/engine/peerstore.go index 0bb5af59..8d6bad76 100644 --- a/pkg/engine/peerstore.go +++ b/pkg/engine/peerstore.go @@ -34,7 +34,6 @@ func (ng *Engine) RunAdHandler(handler func(p *pubsub.Message, for { var m *pubsub.Message var e error - log.D.Ln("waiting for next message from gossip network") if m, e = ng.sub.Next(ng.ctx); e != nil { continue } @@ -198,7 +197,7 @@ func (ng *Engine) GetPeerRecord(id peer.ID, key string) (add ad.Ad, e error) { return } -func (ng *Engine) ClearPeerRecord(id peer.ID, key string) (add ad.Ad, e error) { +func (ng *Engine) ClearPeerRecord(id peer.ID, key string) (e error) { if _, e = ng.Listener.Host.Peerstore().Get(id, key); fails(e) { return } diff --git a/pkg/engine/services/services.go b/pkg/engine/services/services.go index 2ac49b75..8779b77a 100644 --- a/pkg/engine/services/services.go +++ b/pkg/engine/services/services.go @@ -5,7 +5,7 @@ import "github.com/indra-labs/indra/pkg/engine/tpt" type ( Service struct { Port uint16 - RelayRate int + RelayRate uint32 tpt.Transport } Services []*Service diff --git a/pkg/engine/sess/sessionmanager.go b/pkg/engine/sess/sessionmanager.go index 3a1a5b7d..a3941c9a 100644 --- a/pkg/engine/sess/sessionmanager.go +++ b/pkg/engine/sess/sessionmanager.go @@ -405,7 +405,7 @@ func (sm *Manager) GetLocalNodePaymentChan() payments.Chan { } // GetLocalNodeRelayRate returns the relay rate for the local node. -func (sm *Manager) GetLocalNodeRelayRate() (rate int) { +func (sm *Manager) GetLocalNodeRelayRate() (rate uint32) { sm.Lock() defer sm.Unlock() return sm.GetLocalNode().RelayRate diff --git a/pkg/onions/adaddress/adaddress.go b/pkg/onions/adaddress/adaddress.go index d8a6ac18..0738a2fb 100644 --- a/pkg/onions/adaddress/adaddress.go +++ b/pkg/onions/adaddress/adaddress.go @@ -2,18 +2,15 @@ package adaddress import ( "fmt" - "github.com/indra-labs/indra/pkg/ad" "github.com/indra-labs/indra/pkg/crypto" "github.com/indra-labs/indra/pkg/crypto/nonce" "github.com/indra-labs/indra/pkg/crypto/sha256" "github.com/indra-labs/indra/pkg/engine/coding" "github.com/indra-labs/indra/pkg/engine/magic" - "github.com/indra-labs/indra/pkg/engine/sess" "github.com/indra-labs/indra/pkg/onions/adproto" "github.com/indra-labs/indra/pkg/onions/reg" log2 "github.com/indra-labs/indra/pkg/proc/log" "github.com/indra-labs/indra/pkg/util/multi" - "github.com/indra-labs/indra/pkg/util/qu" "github.com/indra-labs/indra/pkg/util/splice" "github.com/multiformats/go-multiaddr" "net/netip" @@ -75,13 +72,6 @@ func (x *Ad) Encode(s *splice.Splice) (e error) { func (x *Ad) GetOnion() interface{} { return x } -func (x *Ad) Gossip(sm *sess.Manager, c qu.C) { - log.D.F("propagating peer info for %s", - x.Key.ToBased32Abbreviated()) - ad.Gossip(x, sm, c) - log.T.Ln("finished broadcasting peer info") -} - func (x *Ad) Len() int { return Len } func (x *Ad) Magic() string { return Magic } diff --git a/pkg/onions/adintro/adintro.go b/pkg/onions/adintro/adintro.go index d44aca50..ceee6724 100644 --- a/pkg/onions/adintro/adintro.go +++ b/pkg/onions/adintro/adintro.go @@ -13,7 +13,6 @@ import ( "github.com/indra-labs/indra/pkg/crypto/sha256" "github.com/indra-labs/indra/pkg/engine/coding" "github.com/indra-labs/indra/pkg/engine/magic" - "github.com/indra-labs/indra/pkg/engine/sess" "github.com/indra-labs/indra/pkg/util/slice" "github.com/indra-labs/indra/pkg/util/splice" ) @@ -62,13 +61,6 @@ func (x *Ad) Encode(s *splice.Splice) (e error) { func (x *Ad) GetOnion() interface{} { return x } -// Gossip means adding to the node's peer message list which will be gossiped by -// the libp2p network of Indra peers. -func (x *Ad) Gossip(sm *sess.Manager, c <-chan struct{}) { - log.D.F("propagating hidden service intro for %s", - x.Key.ToBased32Abbreviated()) -} - func (x *Ad) Len() int { return Len } func (x *Ad) Magic() string { return Magic } diff --git a/pkg/onions/adload/adload.go b/pkg/onions/adload/adload.go index 2870109a..eb90bb86 100644 --- a/pkg/onions/adload/adload.go +++ b/pkg/onions/adload/adload.go @@ -6,11 +6,9 @@ import ( "github.com/indra-labs/indra/pkg/crypto/sha256" "github.com/indra-labs/indra/pkg/engine/coding" "github.com/indra-labs/indra/pkg/engine/magic" - "github.com/indra-labs/indra/pkg/engine/sess" "github.com/indra-labs/indra/pkg/onions/adproto" "github.com/indra-labs/indra/pkg/onions/reg" log2 "github.com/indra-labs/indra/pkg/proc/log" - "github.com/indra-labs/indra/pkg/util/qu" "github.com/indra-labs/indra/pkg/util/splice" "reflect" "time" @@ -74,8 +72,6 @@ func (x *Ad) Encode(s *splice.Splice) (e error) { func (x *Ad) GetOnion() interface{} { return nil } -func (x *Ad) Gossip(sm *sess.Manager, c qu.C) {} - func (x *Ad) Len() int { return Len } func (x *Ad) Magic() string { return "" } diff --git a/pkg/onions/adpeer/adpeer.go b/pkg/onions/adpeer/adpeer.go index 780ae2e4..454d3680 100644 --- a/pkg/onions/adpeer/adpeer.go +++ b/pkg/onions/adpeer/adpeer.go @@ -6,12 +6,10 @@ import ( "github.com/indra-labs/indra/pkg/crypto/sha256" "github.com/indra-labs/indra/pkg/engine/coding" "github.com/indra-labs/indra/pkg/engine/magic" - "github.com/indra-labs/indra/pkg/engine/sess" "github.com/indra-labs/indra/pkg/onions/adintro" "github.com/indra-labs/indra/pkg/onions/adproto" "github.com/indra-labs/indra/pkg/onions/reg" log2 "github.com/indra-labs/indra/pkg/proc/log" - "github.com/indra-labs/indra/pkg/util/qu" "github.com/indra-labs/indra/pkg/util/slice" "github.com/indra-labs/indra/pkg/util/splice" "reflect" @@ -82,8 +80,6 @@ func (x *Ad) Encode(s *splice.Splice) (e error) { func (x *Ad) GetOnion() interface{} { return nil } -func (x *Ad) Gossip(sm *sess.Manager, c qu.C) {} - func (x *Ad) Len() int { return Len } func (x *Ad) Magic() string { return "" } diff --git a/pkg/onions/adproto/adproto.go b/pkg/onions/adproto/adproto.go index 4705f774..e1442cf3 100644 --- a/pkg/onions/adproto/adproto.go +++ b/pkg/onions/adproto/adproto.go @@ -1,16 +1,13 @@ package adproto import ( - "github.com/indra-labs/indra/pkg/ad" "github.com/indra-labs/indra/pkg/crypto" "github.com/indra-labs/indra/pkg/crypto/nonce" "github.com/indra-labs/indra/pkg/crypto/sha256" "github.com/indra-labs/indra/pkg/engine/coding" "github.com/indra-labs/indra/pkg/engine/magic" - "github.com/indra-labs/indra/pkg/engine/sess" "github.com/indra-labs/indra/pkg/onions/reg" log2 "github.com/indra-labs/indra/pkg/proc/log" - "github.com/indra-labs/indra/pkg/util/qu" "github.com/indra-labs/indra/pkg/util/slice" "github.com/indra-labs/indra/pkg/util/splice" "reflect" @@ -64,13 +61,6 @@ func (x *Ad) Encode(s *splice.Splice) (e error) { func (x *Ad) GetOnion() interface{} { return x } -func (x *Ad) Gossip(sm *sess.Manager, c qu.C) { - log.D.F("propagating peer info for %s", - x.Key.ToBased32Abbreviated()) - ad.Gossip(x, sm, c) - log.T.Ln("finished broadcasting peer info") -} - func (x *Ad) Len() int { return Len } func (x *Ad) Magic() string { return Magic } diff --git a/pkg/onions/adservices/adservices.go b/pkg/onions/adservices/adservices.go index 965fbf07..4b0c33c5 100644 --- a/pkg/onions/adservices/adservices.go +++ b/pkg/onions/adservices/adservices.go @@ -7,12 +7,10 @@ import ( "github.com/indra-labs/indra/pkg/crypto/sha256" "github.com/indra-labs/indra/pkg/engine/coding" "github.com/indra-labs/indra/pkg/engine/magic" - "github.com/indra-labs/indra/pkg/engine/sess" "github.com/indra-labs/indra/pkg/onions/adintro" "github.com/indra-labs/indra/pkg/onions/adproto" "github.com/indra-labs/indra/pkg/onions/reg" log2 "github.com/indra-labs/indra/pkg/proc/log" - "github.com/indra-labs/indra/pkg/util/qu" "github.com/indra-labs/indra/pkg/util/slice" "github.com/indra-labs/indra/pkg/util/splice" "time" @@ -91,8 +89,6 @@ func (x *Ad) Encode(s *splice.Splice) (e error) { func (x *Ad) GetOnion() interface{} { return nil } -func (x *Ad) Gossip(sm *sess.Manager, c qu.C) {} - func (x *Ad) Len() int { return adproto.Len + len(x.Services)*ServiceLen + slice.Uint32Len } func (x *Ad) Magic() string { return "" } diff --git a/pkg/onions/balance/balance.go b/pkg/onions/balance/balance.go index 7dbd571d..465dac37 100644 --- a/pkg/onions/balance/balance.go +++ b/pkg/onions/balance/balance.go @@ -67,9 +67,9 @@ func (x *Balance) Handle(s *splice.Splice, p ont.Onion, ng ont.Ngin) (e error) { log.D.S("found pending", pending.ID) for i := range pending.Billable { session := ng.Mgr().FindSessionByPubkey(pending.Billable[i]) - out := session.Node.RelayRate * s.Len() + out := int(session.Node.RelayRate) * s.Len() if session != nil { - in := session.Node.RelayRate * pending.SentSize + in := int(session.Node.RelayRate) * pending.SentSize switch { case i < 2: ng.Mgr().DecSession(session.Header.Bytes, in, true, "reverse") diff --git a/pkg/onions/exit/exit.go b/pkg/onions/exit/exit.go index d7eb0fbc..745a599a 100644 --- a/pkg/onions/exit/exit.go +++ b/pkg/onions/exit/exit.go @@ -71,7 +71,7 @@ func (x *Exit) Account(res *sess.Data, sm *sess.Manager, res.PostAcct = append(res.PostAcct, func() { sm.DecSession(s.Header.Bytes, - s.Node.Services[j].RelayRate*len(res.B)/2, true, "exit") + int(s.Node.Services[j].RelayRate)*len(res.B)/2, true, "exit") }) break } @@ -145,8 +145,8 @@ func (x *Exit) Handle(s *splice.Splice, p ont.Onion, ng ont.Ngin) (e error) { sess.Node.Unlock() continue } - in := sess.Node.Services[i].RelayRate * s.Len() / 2 - out := sess.Node.Services[i].RelayRate * rb.Len() / 2 + in := int(sess.Node.Services[i].RelayRate) * s.Len() / 2 + out := int(sess.Node.Services[i].RelayRate) * rb.Len() / 2 sess.Node.Unlock() ng.Mgr().DecSession(sess.Header.Bytes, in+out, false, "exit") break diff --git a/pkg/onions/forward/forward.go b/pkg/onions/forward/forward.go index 1817e92e..5ee6ab19 100644 --- a/pkg/onions/forward/forward.go +++ b/pkg/onions/forward/forward.go @@ -35,7 +35,7 @@ func (x *Forward) Account(res *sess.Data, sm *sess.Manager, res.Billable = append(res.Billable, s.Header.Bytes) res.PostAcct = append(res.PostAcct, func() { - sm.DecSession(s.Header.Bytes, s.Node.RelayRate*len(res.B), + sm.DecSession(s.Header.Bytes, int(s.Node.RelayRate)*len(res.B), true, "forward") }) return @@ -71,7 +71,7 @@ func (x *Forward) Handle(s *splice.Splice, p ont.Onion, ng ont.Ngin) (e error) { sess := ng.Mgr().FindSessionByHeader(on1.ToPriv) if sess != nil { ng.Mgr().DecSession(sess.Header.Bytes, - ng.Mgr().GetLocalNodeRelayRate()*s.Len(), + int(ng.Mgr().GetLocalNodeRelayRate())*s.Len(), false, "forward") } } diff --git a/pkg/onions/getbalance/getbalance.go b/pkg/onions/getbalance/getbalance.go index 4f69a3c7..10763aa8 100644 --- a/pkg/onions/getbalance/getbalance.go +++ b/pkg/onions/getbalance/getbalance.go @@ -109,8 +109,8 @@ func (x *GetBalance) Handle(s *splice.Splice, p ont.Onion, ng ont.Ngin) (e error case *crypt.Crypt: sess := ng.Mgr().FindSessionByHeader(on1.ToPriv) if sess != nil { - in := sess.Node.RelayRate * s.Len() / 2 - out := sess.Node.RelayRate * len(rb) / 2 + in := int(sess.Node.RelayRate) * s.Len() / 2 + out := int(sess.Node.RelayRate) * len(rb) / 2 ng.Mgr().DecSession(sess.Header.Bytes, in+out, false, "getbalance") } } diff --git a/pkg/onions/hiddenservice/hiddenservice.go b/pkg/onions/hiddenservice/hiddenservice.go index d0be615d..301ebcb5 100644 --- a/pkg/onions/hiddenservice/hiddenservice.go +++ b/pkg/onions/hiddenservice/hiddenservice.go @@ -97,7 +97,7 @@ func (x *HiddenService) Handle(s *splice.Splice, p ont.Onion, ng ont.Ngin) (e er }, }) log.D.Ln("stored new introduction, starting broadcast") - go x.Intro.Gossip(ng.Mgr(), ng.KillSwitch()) + // go x.Intro.Gossip(ng.Mgr(), ng.WaitForShutdown()) return } diff --git a/pkg/onions/introquery/introquery.go b/pkg/onions/introquery/introquery.go index eff895b3..7a41a59f 100644 --- a/pkg/onions/introquery/introquery.go +++ b/pkg/onions/introquery/introquery.go @@ -103,8 +103,8 @@ func (x *IntroQuery) Handle(s *splice.Splice, p ont.Onion, ng ont.Ngin) (e error case *crypt.Crypt: sess := ng.Mgr().FindSessionByHeader(on1.ToPriv) if sess != nil { - in := sess.Node.RelayRate * s.Len() / 2 - out := sess.Node.RelayRate * rb.Len() / 2 + in := int(sess.Node.RelayRate) * s.Len() / 2 + out := int(sess.Node.RelayRate) * rb.Len() / 2 ng.Mgr().DecSession(sess.Header.Bytes, in+out, false, "introquery") } } diff --git a/pkg/onions/ont/interfaces.go b/pkg/onions/ont/interfaces.go index 41af47c8..af9db97a 100644 --- a/pkg/onions/ont/interfaces.go +++ b/pkg/onions/ont/interfaces.go @@ -25,7 +25,7 @@ type Ngin interface { Mgr() *sess.Manager Pending() *responses.Pending GetHidden() *hidden.Hidden - KillSwitch() <-chan struct{} + WaitForShutdown() <-chan struct{} Keyset() *crypto.KeySet } diff --git a/pkg/onions/response/response.go b/pkg/onions/response/response.go index 576be664..15227209 100644 --- a/pkg/onions/response/response.go +++ b/pkg/onions/response/response.go @@ -94,7 +94,7 @@ func (x *Response) Handle(s *splice.Splice, p ont.Onion, ng ont.Ngin) (e error) } se.Node.Unlock() } - ng.Mgr().DecSession(se.Header.Bytes, relayRate*dataSize, true, typ) + ng.Mgr().DecSession(se.Header.Bytes, int(relayRate)*dataSize, true, typ) } } ng.Pending().ProcessAndDelete(x.ID, nil, x.Bytes) diff --git a/pkg/onions/reverse/reverse.go b/pkg/onions/reverse/reverse.go index 1952d596..e144cc15 100644 --- a/pkg/onions/reverse/reverse.go +++ b/pkg/onions/reverse/reverse.go @@ -110,7 +110,7 @@ func (x *Reverse) Handle(s *splice.Splice, p ont.Onion, ng ont.Ngin) (e error) { sess := ng.Mgr().FindSessionByHeader(hdr) if sess != nil { ng.Mgr().DecSession(sess.Header.Bytes, - ng.Mgr().GetLocalNodeRelayRate()*s.Len(), false, "reverse") + int(ng.Mgr().GetLocalNodeRelayRate())*s.Len(), false, "reverse") ng.HandleMessage(splice.BudgeUp(s.SetCursor(start)), on) } } else if p != nil { diff --git a/pkg/util/multi/multiaddr.go b/pkg/util/multi/multiaddr.go index 2bdac746..7b8eb78e 100644 --- a/pkg/util/multi/multiaddr.go +++ b/pkg/util/multi/multiaddr.go @@ -1,6 +1,7 @@ package multi import ( + "fmt" "github.com/indra-labs/indra/pkg/crypto" log2 "github.com/indra-labs/indra/pkg/proc/log" "github.com/libp2p/go-libp2p/core/peer" @@ -30,6 +31,20 @@ func AddrToAddrPort(ma multiaddr.Multiaddr) (ap netip.AddrPort, e error) { return } +func AddrFromAddrPort(ap netip.AddrPort) (ma multiaddr.Multiaddr, e error) { + var ipv string + if ap.Addr().Is6() { + ipv = "ip6" + } else { + ipv = "ip4" + } + if ma, e = multiaddr.NewMultiaddr(fmt.Sprintf("/%s/%s/tcp/%d", + ipv, ap.Addr().String(), ap.Port())); fails(e) { + return + } + return +} + func AddKeyToMultiaddr(in multiaddr.Multiaddr, pub *crypto.Pub) (ma multiaddr.Multiaddr) { var pid peer.ID var e error From b4810106ceeb87836472120d9f63fdc7b8e9e7df Mon Sep 17 00:00:00 2001 From: l0k18 Date: Mon, 19 Jun 2023 11:54:09 +0100 Subject: [PATCH 4/8] Refactor to PayChan, use Mgr() method and unexport manager, ad load ad. --- pkg/engine/{ => ads}/ads.go | 25 +++++++++++--- pkg/engine/eng_senders.go | 54 +++++++++++++++---------------- pkg/engine/eng_sessions.go | 20 ++++++------ pkg/engine/engine.go | 52 ++++++++++++++++------------- pkg/engine/engine_test.go | 22 ++++++------- pkg/engine/fail_test.go | 20 ++++++------ pkg/engine/mock.go | 10 +++--- pkg/engine/node/node.go | 4 +-- pkg/engine/payments/payments.go | 10 +++--- pkg/engine/peerstore_test.go | 12 +++---- pkg/engine/reply.go | 2 +- pkg/engine/sendgetbalance_test.go | 6 ++-- pkg/engine/sess/sessionmanager.go | 6 ++-- 13 files changed, 133 insertions(+), 110 deletions(-) rename pkg/engine/{ => ads}/ads.go (74%) diff --git a/pkg/engine/ads.go b/pkg/engine/ads/ads.go similarity index 74% rename from pkg/engine/ads.go rename to pkg/engine/ads/ads.go index 42146e0a..85db937a 100644 --- a/pkg/engine/ads.go +++ b/pkg/engine/ads/ads.go @@ -1,23 +1,31 @@ -package engine +package ads import ( "github.com/indra-labs/indra/pkg/crypto/nonce" "github.com/indra-labs/indra/pkg/engine/node" "github.com/indra-labs/indra/pkg/onions/adaddress" + "github.com/indra-labs/indra/pkg/onions/adload" "github.com/indra-labs/indra/pkg/onions/adpeer" "github.com/indra-labs/indra/pkg/onions/adproto" "github.com/indra-labs/indra/pkg/onions/adservices" + log2 "github.com/indra-labs/indra/pkg/proc/log" "github.com/indra-labs/indra/pkg/util/multi" "github.com/multiformats/go-multiaddr" "time" ) +var ( + log = log2.GetLogger() + fails = log.E.Chk +) + const DefaultAdExpiry = time.Hour * 24 * 7 // one week type NodeAds struct { Peer adpeer.Ad Address adaddress.Ad Services adservices.Ad + Load adload.Ad } func GetMultiaddr(n *node.Node) (ma multiaddr.Multiaddr, e error) { @@ -28,20 +36,19 @@ func GetMultiaddr(n *node.Node) (ma multiaddr.Multiaddr, e error) { return } -func GenerateAds(n *node.Node) (na *NodeAds, e error) { +func GenerateAds(n *node.Node, load byte) (na *NodeAds, e error) { expiry := time.Now().Add(DefaultAdExpiry) var svcs []adservices.Service for i := range n.Services { svcs = append(svcs, adservices.Service{ Port: n.Services[i].Port, - RelayRate: uint32(n.Services[i].RelayRate), + RelayRate: n.Services[i].RelayRate, }) } var ma multiaddr.Multiaddr - if ma, e = multi.AddrFromAddrPort(*n.AddrPort); fails(e) { + if ma, e = GetMultiaddr(n); fails(e) { return } - ma = multi.AddKeyToMultiaddr(ma, n.Identity.Pub) na = &NodeAds{ Peer: adpeer.Ad{ Ad: adproto.Ad{ @@ -67,6 +74,14 @@ func GenerateAds(n *node.Node) (na *NodeAds, e error) { }, Services: svcs, }, + Load: adload.Ad{ + Ad: adproto.Ad{ + ID: nonce.NewID(), + Key: n.Identity.Pub, + Expiry: time.Now().Add(time.Minute * 10), + }, + Load: load, + }, } return } diff --git a/pkg/engine/eng_senders.go b/pkg/engine/eng_senders.go index 856bac02..6eecc836 100644 --- a/pkg/engine/eng_senders.go +++ b/pkg/engine/eng_senders.go @@ -29,12 +29,12 @@ func (ng *Engine) SendExit(port uint16, msg slice.Bytes, id nonce.ID, s := make(sessions.Sessions, len(hops)) s[2] = bob s[5] = alice - se := ng.Manager.SelectHops(hops, s, "exit") + se := ng.Mgr().SelectHops(hops, s, "exit") var c sessions.Circuit copy(c[:], se) o := MakeExit(exit.ExitParams{port, msg, id, bob, alice, c, ng.KeySet}) - res := PostAcctOnion(ng.Manager, o) - ng.Manager.SendWithOneHook(c[0].Node.AddrPort, res, hook, ng.Responses) + res := PostAcctOnion(ng.Mgr(), o) + ng.Mgr().SendWithOneHook(c[0].Node.AddrPort, res, hook, ng.Responses) } func (ng *Engine) SendGetBalance(alice, bob *sessions.Data, hook responses.Callback) { @@ -42,14 +42,14 @@ func (ng *Engine) SendGetBalance(alice, bob *sessions.Data, hook responses.Callb s := make(sessions.Sessions, len(hops)) s[2] = bob s[5] = alice - se := ng.Manager.SelectHops(hops, s, "sendgetbalance") + se := ng.Mgr().SelectHops(hops, s, "sendgetbalance") var c sessions.Circuit copy(c[:], se) o := MakeGetBalance(getbalance.GetBalanceParams{alice.ID, alice, bob, c, ng.KeySet}) log.D.S("sending out getbalance onion", o) - res := PostAcctOnion(ng.Manager, o) - ng.Manager.SendWithOneHook(c[0].Node.AddrPort, res, hook, ng.Responses) + res := PostAcctOnion(ng.Mgr(), o) + ng.Mgr().SendWithOneHook(c[0].Node.AddrPort, res, hook, ng.Responses) } func (ng *Engine) SendHiddenService(id nonce.ID, key *crypto.Prv, @@ -60,18 +60,18 @@ func (ng *Engine) SendHiddenService(id nonce.ID, key *crypto.Prv, hops := sess.StandardCircuit() s := make(sessions.Sessions, len(hops)) s[2] = alice - se := ng.Manager.SelectHops(hops, s, "sendhiddenservice") + se := ng.Mgr().SelectHops(hops, s, "sendhiddenservice") var c sessions.Circuit copy(c[:], se[:len(c)]) in = adintro.New(id, key, alice.Node.AddrPort, relayRate, port, expiry) o := MakeHiddenService(in, alice, bob, c, ng.KeySet) log.D.F("%s sending out hidden service onion %s", - ng.Manager.GetLocalNodeAddressString(), + ng.Mgr().GetLocalNodeAddressString(), color.Yellow.Sprint(alice.Node.AddrPort.String())) - res := PostAcctOnion(ng.Manager, o) + res := PostAcctOnion(ng.Mgr(), o) ng.GetHidden().AddHiddenService(svc, key, in, - ng.Manager.GetLocalNodeAddressString()) - ng.Manager.SendWithOneHook(c[0].Node.AddrPort, res, hook, ng.Responses) + ng.Mgr().GetLocalNodeAddressString()) + ng.Mgr().SendWithOneHook(c[0].Node.AddrPort, res, hook, ng.Responses) return } @@ -97,24 +97,24 @@ func (ng *Engine) SendIntroQuery(id nonce.ID, hsk *crypto.Pub, s := make(sessions.Sessions, len(hops)) s[2] = bob s[5] = alice - se := ng.Manager.SelectHops(hops, s, "sendintroquery") + se := ng.Mgr().SelectHops(hops, s, "sendintroquery") var c sessions.Circuit copy(c[:], se) o := MakeIntroQuery(id, hsk, bob, alice, c, ng.KeySet) - res := PostAcctOnion(ng.Manager, o) + res := PostAcctOnion(ng.Mgr(), o) log.D.Ln(res.ID) - ng.Manager.SendWithOneHook(c[0].Node.AddrPort, res, fn, ng.Responses) + ng.Mgr().SendWithOneHook(c[0].Node.AddrPort, res, fn, ng.Responses) } func (ng *Engine) SendMessage(mp *message.Message, hook responses.Callback) (id nonce.ID) { // Add another two hops for security against unmasking. preHops := []byte{0, 1} - oo := ng.Manager.SelectHops(preHops, mp.Forwards[:], "sendmessage") + oo := ng.Mgr().SelectHops(preHops, mp.Forwards[:], "sendmessage") mp.Forwards = [2]*sessions.Data{oo[0], oo[1]} o := []ont.Onion{mp} - res := PostAcctOnion(ng.Manager, o) + res := PostAcctOnion(ng.Mgr(), o) log.D.Ln("sending out message onion") - ng.Manager.SendWithOneHook(mp.Forwards[0].Node.AddrPort, res, hook, + ng.Mgr().SendWithOneHook(mp.Forwards[0].Node.AddrPort, res, hook, ng.Responses) return res.ID } @@ -123,20 +123,20 @@ func (ng *Engine) SendPing(c sessions.Circuit, hook responses.Callback) { hops := sess.StandardCircuit() s := make(sessions.Sessions, len(hops)) copy(s, c[:]) - se := ng.Manager.SelectHops(hops, s, "sendping") + se := ng.Mgr().SelectHops(hops, s, "sendping") copy(c[:], se) id := nonce.NewID() o := Ping(id, se[len(se)-1], c, ng.KeySet) - res := PostAcctOnion(ng.Manager, o) - ng.Manager.SendWithOneHook(c[0].Node.AddrPort, res, hook, ng.Responses) + res := PostAcctOnion(ng.Mgr(), o) + ng.Mgr().SendWithOneHook(c[0].Node.AddrPort, res, hook, ng.Responses) } func (ng *Engine) SendRoute(k *crypto.Pub, ap *netip.AddrPort, hook responses.Callback) { - ng.Manager.FindNodeByAddrPort(ap) + ng.Mgr().FindNodeByAddrPort(ap) var ss *sessions.Data - ng.Manager.IterateSessions(func(s *sessions.Data) bool { + ng.Mgr().IterateSessions(func(s *sessions.Data) bool { if s.Node.AddrPort.String() == ap.String() { ss = s return true @@ -144,20 +144,20 @@ func (ng *Engine) SendRoute(k *crypto.Pub, ap *netip.AddrPort, return false }) if ss == nil { - log.E.Ln(ng.Manager.GetLocalNodeAddressString(), + log.E.Ln(ng.Mgr().GetLocalNodeAddressString(), "could not find session for address", ap.String()) return } - log.D.Ln(ng.Manager.GetLocalNodeAddressString(), "sending route", + log.D.Ln(ng.Mgr().GetLocalNodeAddressString(), "sending route", k.ToBased32Abbreviated()) hops := sess.StandardCircuit() s := make(sessions.Sessions, len(hops)) s[2] = ss - se := ng.Manager.SelectHops(hops, s, "sendroute") + se := ng.Mgr().SelectHops(hops, s, "sendroute") var c sessions.Circuit copy(c[:], se) o := MakeRoute(nonce.NewID(), k, ng.KeySet, se[5], c[2], c) - res := PostAcctOnion(ng.Manager, o) + res := PostAcctOnion(ng.Mgr(), o) log.D.Ln("sending out route request onion") - ng.Manager.SendWithOneHook(c[0].Node.AddrPort, res, hook, ng.Responses) + ng.Mgr().SendWithOneHook(c[0].Node.AddrPort, res, hook, ng.Responses) } diff --git a/pkg/engine/eng_sessions.go b/pkg/engine/eng_sessions.go index d9aa3d2d..0cd75369 100644 --- a/pkg/engine/eng_sessions.go +++ b/pkg/engine/eng_sessions.go @@ -18,7 +18,7 @@ import ( func (ng *Engine) BuyNewSessions(amount lnwire.MilliSatoshi, fn func()) (e error) { var nodes [5]*node.Node - nodes = ng.Manager.SelectUnusedCircuit() + nodes = ng.Mgr().SelectUnusedCircuit() for i := range nodes { if nodes[i] == nil { e = fmt.Errorf("failed to find nodes %d", i) @@ -27,7 +27,7 @@ func (ng *Engine) BuyNewSessions(amount lnwire.MilliSatoshi, } // Get a random return hop session (index 5). var returnSession *sessions.Data - returnHops := ng.Manager.GetSessionsAtHop(5) + returnHops := ng.Mgr().GetSessionsAtHop(5) if len(returnHops) > 1 { cryptorand.Shuffle(len(returnHops), func(i, j int) { returnHops[i], returnHops[j] = returnHops[j], returnHops[i] @@ -45,7 +45,7 @@ func (ng *Engine) BuyNewSessions(amount lnwire.MilliSatoshi, var pendingConfirms int for i := range nodes { confirmChans[i] = nodes[i]. - Chan.Send(amount, s[i].ID, s[i].PreimageHash()) + PayChan.Send(amount, s[i].ID, s[i].PreimageHash()) pendingConfirms++ } var success bool @@ -78,12 +78,12 @@ func (ng *Engine) BuyNewSessions(amount lnwire.MilliSatoshi, } // todo: handle payment failures! o := MakeSession(conf, s, returnSession, nodes[:], ng.KeySet) - res := PostAcctOnion(ng.Manager, o) - ng.Manager.SendWithOneHook(nodes[0].AddrPort, res, func(id nonce.ID, + res := PostAcctOnion(ng.Mgr(), o) + ng.Mgr().SendWithOneHook(nodes[0].AddrPort, res, func(id nonce.ID, ifc interface{}, b slice.Bytes) (e error) { - ng.Manager.Lock() - defer ng.Manager.Unlock() + ng.Mgr().Lock() + defer ng.Mgr().Unlock() var ss [5]*sessions.Data for i := range nodes { log.D.F("confirming and storing session at hop %d %s for %s with"+ @@ -93,9 +93,9 @@ func (ng *Engine) BuyNewSessions(amount lnwire.MilliSatoshi, amount) ss[i] = sessions.NewSessionData(s[i].ID, nodes[i], amount, s[i].Header, s[i].Payload, byte(i)) - ng.Manager.Add(ss[i]) - ng.Manager.Sessions = append(ng.Manager.Sessions, ss[i]) - ng.Manager.PendingPayments.Delete(s[i].PreimageHash()) + ng.Mgr().Add(ss[i]) + ng.Mgr().Sessions = append(ng.manager.Sessions, ss[i]) + ng.Mgr().PendingPayments.Delete(s[i].PreimageHash()) } fn() return diff --git a/pkg/engine/engine.go b/pkg/engine/engine.go index 6369eaae..7791db60 100644 --- a/pkg/engine/engine.go +++ b/pkg/engine/engine.go @@ -4,6 +4,7 @@ import ( "context" "github.com/indra-labs/indra/pkg/crypto" "github.com/indra-labs/indra/pkg/crypto/nonce" + "github.com/indra-labs/indra/pkg/engine/ads" "github.com/indra-labs/indra/pkg/engine/node" "github.com/indra-labs/indra/pkg/engine/responses" "github.com/indra-labs/indra/pkg/engine/sess" @@ -33,8 +34,9 @@ type ( Engine struct { ctx context.Context cancel func() - Responses *responses.Pending - Manager *sess.Manager + Responses *responses.Pending + manager *sess.Manager + NodeAds *ads.NodeAds Listener *transport.Listener PubSub *pubsub.PubSub topic *pubsub.Topic @@ -67,7 +69,7 @@ func (ng *Engine) GetLoad() byte { return byte(ng.Load.Load()) } func (ng *Engine) HandleMessage(s *splice.Splice, pr ont.Onion) { log.D.F("%s handling received message", - ng.Manager.GetLocalNodeAddressString()) + ng.Mgr().GetLocalNodeAddressString()) s.SetCursor(0) s.Segments = s.Segments[:0] on := reg.Recognise(s) @@ -92,7 +94,7 @@ func (ng *Engine) HandleMessage(s *splice.Splice, pr ont.Onion) { func (ng *Engine) Handler() (out bool) { log.T.C(func() string { - return ng.Manager.GetLocalNodeAddressString() + " awaiting message" + return ng.Mgr().GetLocalNodeAddressString() + " awaiting message" }) var prev ont.Onion select { @@ -105,13 +107,13 @@ func (ng *Engine) Handler() (out bool) { log.D.Ln("new connection inbound (TODO):", c.Host.Addrs()) _ = c }() - case b := <-ng.Manager.ReceiveToLocalNode(): + case b := <-ng.Mgr().ReceiveToLocalNode(): s := splice.Load(b, slice.NewCursor()) ng.HandleMessage(s, prev) - case p := <-ng.Manager.GetLocalNode().Chan.Receive(): + case p := <-ng.Mgr().GetLocalNode().PayChan.Receive(): log.D.F("incoming payment for %s: %v", p.ID, p.Amount) topUp := false - ng.Manager.IterateSessions(func(s *sessions.Data) bool { + ng.Mgr().IterateSessions(func(s *sessions.Data) bool { if s.Preimage == p.Preimage { s.IncSats(p.Amount, false, "top-up") topUp = true @@ -121,7 +123,7 @@ func (ng *Engine) Handler() (out bool) { return false }) if !topUp { - ng.Manager.AddPendingPayment(p) + ng.Mgr().AddPendingPayment(p) log.T.F("awaiting session keys for preimage %s session Keys %s", p.Preimage, p.ID) } @@ -129,21 +131,21 @@ func (ng *Engine) Handler() (out bool) { // a timeout on the lnd node returning the success to trigger this. p.ConfirmChan <- true case <-ng.Pause: - log.D.Ln("pausing", ng.Manager.GetLocalNodeAddressString()) + log.D.Ln("pausing", ng.Mgr().GetLocalNodeAddressString()) // For testing purposes we need to halt this Handler and discard channel // messages. out: for { select { - case <-ng.Manager.GetLocalNode().Chan.Receive(): + case <-ng.Mgr().GetLocalNode().PayChan.Receive(): log.D.Ln("discarding payments while in pause") - case <-ng.Manager.ReceiveToLocalNode(): + case <-ng.Mgr().ReceiveToLocalNode(): log.D.Ln("discarding messages while in pause") case <-ng.ctx.Done(): break out case <-ng.Pause: // This will then resume to the top level select. - log.D.Ln("unpausing", ng.Manager.GetLocalNodeAddressString()) + log.D.Ln("unpausing", ng.Mgr().GetLocalNodeAddressString()) break out } } @@ -153,14 +155,14 @@ func (ng *Engine) Handler() (out bool) { func (ng *Engine) Keyset() *crypto.KeySet { return ng.KeySet } func (ng *Engine) WaitForShutdown() <-chan struct{} { return ng.ctx.Done() } -func (ng *Engine) Mgr() *sess.Manager { return ng.Manager } +func (ng *Engine) Mgr() *sess.Manager { return ng.manager } func (ng *Engine) Pending() *responses.Pending { return ng.Responses } func (ng *Engine) SetLoad(load byte) { ng.Load.Store(uint32(load)) } // Shutdown triggers the shutdown of the client and the Cleanup before // finishing. func (ng *Engine) Shutdown() { - log.T.Ln("shutting down", ng.Manager.GetLocalNodeAddress().String()) + log.T.Ln("shutting down", ng.Mgr().GetLocalNodeAddress().String()) if ng.ShuttingDown.Load() { return } @@ -184,7 +186,7 @@ func (ng *Engine) Start() { } // New creates a new Engine according to the Params given. -func New(p Params) (c *Engine, e error) { +func New(p Params) (ng *Engine, e error) { p.Node.Transport = p.Transport p.Node.Identity = p.Keys var ks *crypto.KeySet @@ -192,34 +194,40 @@ func New(p Params) (c *Engine, e error) { return } ctx, cancel := context.WithCancel(context.Background()) - c = &Engine{ + ng = &Engine{ ctx: ctx, cancel: cancel, Responses: &responses.Pending{}, KeySet: ks, Listener: p.Listener, - Manager: sess.NewSessionManager(), + manager: sess.NewSessionManager(), h: hidden.NewHiddenrouting(), Pause: qu.T(), } if p.Listener != nil && p.Listener.Host != nil { - if c.PubSub, e = pubsub.NewGossipSub(ctx, p.Listener.Host); fails(e) { + if ng.PubSub, e = pubsub.NewGossipSub(ctx, p.Listener.Host); fails(e) { cancel() return } - if c.topic, e = c.PubSub.Join(PubSubTopic); fails(e) { + if ng.topic, e = ng.PubSub.Join(PubSubTopic); fails(e) { + cancel() return } - if c.sub, e = c.topic.Subscribe(); fails(e) { + if ng.sub, e = ng.topic.Subscribe(); fails(e) { + cancel() return } log.T.Ln("subscribed to", PubSubTopic, "topic on gossip network") } - c.Manager.AddNodes(append([]*node.Node{p.Node}, p.Nodes...)...) + if ng.NodeAds, e = ads.GenerateAds(p.Node, 25); fails(e) { + cancel() + return + } + ng.Mgr().AddNodes(append([]*node.Node{p.Node}, p.Nodes...)...) // Add return sessions for receiving responses, ideally more of these // will be generated during operation and rotated out over time. for i := 0; i < p.NReturnSessions; i++ { - c.Manager.AddSession(sessions.NewSessionData(nonce.NewID(), p.Node, 0, + ng.Mgr().AddSession(sessions.NewSessionData(nonce.NewID(), p.Node, 0, nil, nil, 5)) } return diff --git a/pkg/engine/engine_test.go b/pkg/engine/engine_test.go index 12edf677..10dbc6ac 100644 --- a/pkg/engine/engine_test.go +++ b/pkg/engine/engine_test.go @@ -35,12 +35,12 @@ func TestClient_SendExit(t *testing.T) { t.FailNow() } client := clients[0] - log.D.Ln("client", client.Manager.GetLocalNodeAddressString()) + log.D.Ln("client", client.Mgr().GetLocalNodeAddressString()) // set up forwarding port service const port = 3455 sim := transport.NewByteChan(0) for i := range clients { - e = clients[i].Manager.AddServiceToLocalNode(&services.Service{ + e = clients[i].Mgr().AddServiceToLocalNode(&services.Service{ Port: port, Transport: sim, RelayRate: 58000, @@ -66,7 +66,7 @@ func TestClient_SendExit(t *testing.T) { t.Error("Exit test failed") }() out: - for i := 3; i < len(clients[0].Manager.Sessions)-1; i++ { + for i := 3; i < len(clients[0].Mgr().Sessions)-1; i++ { wg.Add(1) var msg slice.Bytes if msg, _, e = tests.GenMessage(64, "request"); fails(e) { @@ -80,8 +80,8 @@ out: t.Error(e) t.FailNow() } - bob := clients[0].Manager.Sessions[i] - returnHops := client.Manager.GetSessionsAtHop(5) + bob := clients[0].Mgr().Sessions[i] + returnHops := client.Mgr().GetSessionsAtHop(5) var alice *sessions.Data if len(returnHops) > 1 { cryptorand.Shuffle(len(returnHops), func(i, j int) { @@ -105,7 +105,7 @@ out: }) bb := <-clients[3].Mgr().GetLocalNode().ReceiveFrom(port) log.T.S(bb.ToBytes()) - if e = clients[3].Manager.SendFromLocalNode(port, respMsg); fails(e) { + if e = clients[3].Mgr().SendFromLocalNode(port, respMsg); fails(e) { t.Error("fail send") } log.T.Ln("response sent") @@ -151,11 +151,11 @@ func TestClient_SendPing(t *testing.T) { t.Error("SendPing test failed") }() out: - for i := 3; i < len(clients[0].Manager.Sessions)-1; i++ { + for i := 3; i < len(clients[0].Mgr().Sessions)-1; i++ { wg.Add(1) var c sessions.Circuit - sess := clients[0].Manager.Sessions[i] - c[sess.Hop] = clients[0].Manager.Sessions[i] + sess := clients[0].Mgr().Sessions[i] + c[sess.Hop] = clients[0].Mgr().Sessions[i] clients[0].SendPing(c, func(id nonce.ID, ifc interface{}, b slice.Bytes) (e error) { log.D.Ln("success") @@ -219,8 +219,8 @@ func TestClient_SendSessionKeys(t *testing.T) { counter.Dec() } wg.Wait() - for j := range clients[0].Manager.CircuitCache { - log.D.F("%d %s %v", i, j, clients[0].Manager.CircuitCache[j]) + for j := range clients[0].Mgr().CircuitCache { + log.D.F("%d %s %v", i, j, clients[0].Mgr().CircuitCache[j]) } quit.Q() } diff --git a/pkg/engine/fail_test.go b/pkg/engine/fail_test.go index e6192ec1..1fe502a3 100644 --- a/pkg/engine/fail_test.go +++ b/pkg/engine/fail_test.go @@ -44,7 +44,7 @@ func TestEngine_Message(t *testing.T) { t.FailNow() } client := clients[0] - log.D.Ln("client", client.Manager.GetLocalNodeAddressString()) + log.D.Ln("client", client.Mgr().GetLocalNodeAddressString()) // Start up the clients. for _, v := range clients { go v.Start() @@ -89,9 +89,9 @@ func TestEngine_Message(t *testing.T) { } id := nonce.NewID() _ = id - introducerHops := client.Manager.GetSessionsAtHop(2) + introducerHops := client.Mgr().GetSessionsAtHop(2) var introducer *sessions.Data - returnHops := client.Manager.GetSessionsAtHop(5) + returnHops := client.Mgr().GetSessionsAtHop(5) var returner *sessions.Data _ = returner if len(introducerHops) > 1 { @@ -112,7 +112,7 @@ func TestEngine_Message(t *testing.T) { returner = returnHops[0] log.D.Ln("getting sessions for introducer...") for i := range clients { - if introducer.Node.ID == clients[i].Manager.GetLocalNode().ID { + if introducer.Node.ID == clients[i].Mgr().GetLocalNode().ID { for j := 0; j < nCircuits; j++ { wg.Add(1) counter.Inc() @@ -217,7 +217,7 @@ func TestEngine_Route(t *testing.T) { t.FailNow() } client := clients[0] - log.W.Ln("client", client.Manager.GetLocalNodeAddressString()) + log.W.Ln("client", client.Mgr().GetLocalNodeAddressString()) // Start up the clients. for _, v := range clients { go v.Start() @@ -262,9 +262,9 @@ func TestEngine_Route(t *testing.T) { } id := nonce.NewID() _ = id - introducerHops := client.Manager.GetSessionsAtHop(2) + introducerHops := client.Mgr().GetSessionsAtHop(2) var introducer *sessions.Data - returnHops := client.Manager.GetSessionsAtHop(5) + returnHops := client.Mgr().GetSessionsAtHop(5) var returner *sessions.Data _ = returner if len(introducerHops) > 1 { @@ -286,7 +286,7 @@ func TestEngine_Route(t *testing.T) { const localPort = 25234 log.D.Ln("getting sessions for introducer...") for i := range clients { - if introducer.Node.ID == clients[i].Manager.GetLocalNode().ID { + if introducer.Node.ID == clients[i].Mgr().GetLocalNode().ID { for j := 0; j < nCircuits; j++ { wg.Add(1) counter.Inc() @@ -397,8 +397,8 @@ func TestEngine_SendHiddenService(t *testing.T) { return } id := nonce.NewID() - introducerHops := clients[0].Manager.GetSessionsAtHop(2) - returnHops := clients[0].Manager.GetSessionsAtHop(5) + introducerHops := clients[0].Mgr().GetSessionsAtHop(2) + returnHops := clients[0].Mgr().GetSessionsAtHop(5) var introducer *sessions.Data if len(introducerHops) > 1 { cryptorand.Shuffle(len(introducerHops), func(i, j int) { diff --git a/pkg/engine/mock.go b/pkg/engine/mock.go index a518399d..f6dd719c 100644 --- a/pkg/engine/mock.go +++ b/pkg/engine/mock.go @@ -60,8 +60,8 @@ func createNMockCircuits(inclSessions bool, nCircuits int, }); fails(e) { return } - cl[i].Manager.SetLocalNodeAddress(nodes[i].AddrPort) - cl[i].Manager.SetLocalNode(nodes[i]) + cl[i].Mgr().SetLocalNodeAddress(nodes[i].AddrPort) + cl[i].Mgr().SetLocalNode(nodes[i]) if inclSessions { // Create a session for all but the first. if i > 0 { @@ -69,11 +69,11 @@ func createNMockCircuits(inclSessions bool, nCircuits int, 1<<16, nil, nil, byte((i-1)/nCircuits)) // AddIntro session to node, so it will be able to relay if it // gets a message with the key. - cl[i].Manager.AddSession(ss[i-1]) + cl[i].Mgr().AddSession(ss[i-1]) // we need a copy for the node so the balance adjustments don't // double up. s := *ss[i-1] - cl[0].Manager.AddSession(&s) + cl[0].Mgr().AddSession(&s) } } } @@ -83,7 +83,7 @@ func createNMockCircuits(inclSessions bool, nCircuits int, if i == j { continue } - cl[i].Manager.AddNodes(nodes[j]) + cl[i].Mgr().AddNodes(nodes[j]) } } return diff --git a/pkg/engine/node/node.go b/pkg/engine/node/node.go index f6215f1c..13c494c5 100644 --- a/pkg/engine/node/node.go +++ b/pkg/engine/node/node.go @@ -31,7 +31,7 @@ type Node struct { Identity *crypto.Keys RelayRate uint32 // Base relay price mSAT/Mb. Services services.Services // Services offered by this peer. - payments.Chan + payments.PayChan Transport tpt.Transport } @@ -45,7 +45,7 @@ func NewNode(addr *netip.AddrPort, keys *crypto.Keys, tpt tpt.Transport, AddrPort: addr, Identity: keys, RelayRate: relayRate, - Chan: make(payments.Chan, PaymentChanBuffers), + PayChan: make(payments.PayChan, PaymentChanBuffers), Transport: tpt, } return diff --git a/pkg/engine/payments/payments.go b/pkg/engine/payments/payments.go index fa295619..f17ac595 100644 --- a/pkg/engine/payments/payments.go +++ b/pkg/engine/payments/payments.go @@ -11,7 +11,7 @@ func (p PendingPayments) Add(np *Payment) (pp PendingPayments) { } type ( - Chan chan *Payment + PayChan chan *Payment Payment struct { ID nonce.ID Preimage sha256.Hash @@ -53,11 +53,11 @@ func (p PendingPayments) FindPreimage(pi sha256.Hash) (pp *Payment) { return } -// Receive waits on receiving a Payment on a Chan. -func (pc Chan) Receive() <-chan *Payment { return pc } +// Receive waits on receiving a Payment on a PayChan. +func (pc PayChan) Receive() <-chan *Payment { return pc } -// Send a payment on the Chan. -func (pc Chan) Send(amount lnwire.MilliSatoshi, +// Send a payment on the PayChan. +func (pc PayChan) Send(amount lnwire.MilliSatoshi, id nonce.ID, preimage sha256.Hash) (confirmChan chan bool) { confirmChan = make(chan bool) pc <- &Payment{ diff --git a/pkg/engine/peerstore_test.go b/pkg/engine/peerstore_test.go index 12cbb862..8adca689 100644 --- a/pkg/engine/peerstore_test.go +++ b/pkg/engine/peerstore_test.go @@ -44,7 +44,7 @@ func TestEngine_PeerStore(t *testing.T) { } time.Sleep(time.Second) newAddressAd := adaddress.New(nonce.NewID(), - engines[0].Manager.GetLocalNodeIdentityPrv(), + engines[0].Mgr().GetLocalNodeIdentityPrv(), engines[0].Listener.Host.Addrs()[0], time.Now().Add(time.Hour*24*7)) sa := splice.New(newAddressAd.Len()) @@ -56,8 +56,8 @@ func TestEngine_PeerStore(t *testing.T) { } time.Sleep(time.Second) newIntroAd := adintro.New(nonce.NewID(), - engines[0].Manager.GetLocalNodeIdentityPrv(), - engines[0].Manager.GetLocalNodeAddress(), + engines[0].Mgr().GetLocalNodeIdentityPrv(), + engines[0].Mgr().GetLocalNodeAddress(), 20000, 443, time.Now().Add(time.Hour*24*7)) si := splice.New(newIntroAd.Len()) @@ -69,7 +69,7 @@ func TestEngine_PeerStore(t *testing.T) { } time.Sleep(time.Second) newLoadAd := adload.New(nonce.NewID(), - engines[0].Manager.GetLocalNodeIdentityPrv(), + engines[0].Mgr().GetLocalNodeIdentityPrv(), 17, time.Now().Add(time.Hour*24*7)) sl := splice.New(newLoadAd.Len()) @@ -81,7 +81,7 @@ func TestEngine_PeerStore(t *testing.T) { } time.Sleep(time.Second) newPeerAd := adpeer.New(nonce.NewID(), - engines[0].Manager.GetLocalNodeIdentityPrv(), + engines[0].Mgr().GetLocalNodeIdentityPrv(), 20000, time.Now().Add(time.Hour*24*7)) log.D.S("peer ad", newPeerAd) @@ -94,7 +94,7 @@ func TestEngine_PeerStore(t *testing.T) { } time.Sleep(time.Second * 1) newServiceAd := adservices.New(nonce.NewID(), - engines[0].Manager.GetLocalNodeIdentityPrv(), + engines[0].Mgr().GetLocalNodeIdentityPrv(), []adservices.Service{{20000, 54321}}, time.Now().Add(time.Hour*24*7)) ss := splice.New(newServiceAd.Len()) diff --git a/pkg/engine/reply.go b/pkg/engine/reply.go index 5e550121..4b5111c9 100644 --- a/pkg/engine/reply.go +++ b/pkg/engine/reply.go @@ -13,7 +13,7 @@ func MakeReplyHeader(ng *Engine) (returnHeader *hidden.ReplyHeader) { rvKeys := ng.KeySet.Next3() hops := []byte{3, 4, 5} s := make(sessions.Sessions, len(hops)) - ng.Manager.SelectHops(hops, s, "make message reply header") + ng.Mgr().SelectHops(hops, s, "make message reply header") rt := &exit.Routing{ Sessions: [3]*sessions.Data{s[0], s[1], s[2]}, Keys: crypto.Privs{rvKeys[0], rvKeys[1], rvKeys[2]}, diff --git a/pkg/engine/sendgetbalance_test.go b/pkg/engine/sendgetbalance_test.go index 160cda80..304eb7fa 100644 --- a/pkg/engine/sendgetbalance_test.go +++ b/pkg/engine/sendgetbalance_test.go @@ -27,7 +27,7 @@ func TestClient_SendGetBalance(t *testing.T) { t.FailNow() } client := clients[0] - log.D.Ln("client", client.Manager.GetLocalNodeAddressString()) + log.D.Ln("client", client.Mgr().GetLocalNodeAddressString()) // Start up the clients. for _, v := range clients { go v.Start() @@ -46,7 +46,7 @@ func TestClient_SendGetBalance(t *testing.T) { }() i := 0 wg.Add(1) - returnHops := client.Manager.GetSessionsAtHop(5) + returnHops := client.Mgr().GetSessionsAtHop(5) var returner *sessions.Data if len(returnHops) > 1 { cryptorand.Shuffle(len(returnHops), func(i, j int) { @@ -55,7 +55,7 @@ func TestClient_SendGetBalance(t *testing.T) { }) } returner = returnHops[0] - clients[0].SendGetBalance(returner, clients[0].Manager.Sessions[i], + clients[0].SendGetBalance(returner, clients[0].Mgr().Sessions[i], func(cf nonce.ID, ifc interface{}, b slice.Bytes) (e error) { log.I.Ln("success") wg.Done() diff --git a/pkg/engine/sess/sessionmanager.go b/pkg/engine/sess/sessionmanager.go index a3941c9a..ff530bf6 100644 --- a/pkg/engine/sess/sessionmanager.go +++ b/pkg/engine/sess/sessionmanager.go @@ -399,9 +399,9 @@ func (sm *Manager) GetLocalNodeIdentityPrv() (ident *crypto.Prv) { return sm.GetLocalNode().Identity.Prv } -// GetLocalNodePaymentChan returns the engine's local Node Chan. -func (sm *Manager) GetLocalNodePaymentChan() payments.Chan { - return sm.nodes[0].Chan +// GetLocalNodePaymentChan returns the engine's local Node PayChan. +func (sm *Manager) GetLocalNodePaymentChan() payments.PayChan { + return sm.nodes[0].PayChan } // GetLocalNodeRelayRate returns the relay rate for the local node. From 55f806c9f4247bbc54cd37c4a1dfd42101d23614 Mon Sep 17 00:00:00 2001 From: l0k18 Date: Mon, 19 Jun 2023 17:07:54 +0100 Subject: [PATCH 5/8] relay and client launcher started --- cmd/indra/client/client.go | 18 ++++++++++ cmd/indra/relay.go | 68 -------------------------------------- cmd/indra/relay/relay.go | 17 ++++++++++ cmd/indra/root.go | 6 +++- cmd/indra/version.go | 2 +- pkg/engine/ads/ads.go | 57 +++++++++++++++++++++++++++----- pkg/engine/engine.go | 6 ++-- pkg/engine/node/node.go | 3 +- pkg/p2p/configure.go | 2 +- 9 files changed, 96 insertions(+), 83 deletions(-) create mode 100644 cmd/indra/client/client.go delete mode 100644 cmd/indra/relay.go create mode 100644 cmd/indra/relay/relay.go diff --git a/cmd/indra/client/client.go b/cmd/indra/client/client.go new file mode 100644 index 00000000..25b49e37 --- /dev/null +++ b/cmd/indra/client/client.go @@ -0,0 +1,18 @@ +package client + +import ( + "github.com/spf13/cobra" +) + +func Init(c *cobra.Command) { + c.AddCommand(clientCommand) +} + +var clientCommand = &cobra.Command{ + Use: "client", + Short: "run a client", + Long: "Runs indra as a client, providing a wireguard tunnel and socks5 " + + "proxy as connectivity options", + Run: func(cmd *cobra.Command, args []string) { + }, +} diff --git a/cmd/indra/relay.go b/cmd/indra/relay.go deleted file mode 100644 index d8974623..00000000 --- a/cmd/indra/relay.go +++ /dev/null @@ -1,68 +0,0 @@ -package main - -// -// import ( -// "github.com/spf13/cobra" -// "github.com/spf13/viper" -// -// "github.com/indra-labs/indra" -// "github.com/indra-labs/indra/pkg/crypto/key/prv" -// "github.com/indra-labs/indra/pkg/crypto/key/pub" -// "github.com/indra-labs/indra/pkg/interrupt" -// log2 "github.com/indra-labs/indra/pkg/proc/log" -// "github.com/indra-labs/indra/pkg/relay" -// "github.com/indra-labs/indra/pkg/relay/transport" -// "github.com/indra-labs/indra/pkg/util/slice" -// ) -// -// var ( -// eng *relay.Engine -// engineP2P []string -// engineRPC []string -// ) -// -// func init() { -// pf := relayCmd.PersistentFlags() -// pf.StringSliceVarP(&engineP2P, "engineP2P-relay", "P", -// []string{"127.0.0.1:8337", "::1:8337"}, -// "address/ports for IPv4 and v6 listeners") -// pf.StringSliceVarP(&engineRPC, "relay-control", "r", -// []string{"127.0.0.1:8339", "::1:8339"}, -// "address/ports for IPv4 and v6 listeners") -// viper.BindPFlag("engineP2P-relay", seedCommand.PersistentFlags().Lookup("engineP2P-relay")) -// viper.BindPFlag("relay-control", seedCommand.PersistentFlags().Lookup( -// "relay-control")) -// rootCmd.AddCommand(relayCmd) -// } -// -// var relayCmd = &cobra.Command{ -// Use: "relay", -// Short: "Runs a relay server.", -// Long: `Runs a server that can be controlled with RPC and CLI interfaces.`, -// Run: func(cmd *cobra.Command, args []string) { -// -// log2.SetLogLevel(log2.Debug) -// -// log.I.Ln("-- ", log2.App, "("+viper.GetString(""+ -// "network")+") -", -// indra.SemVer, "- Network Freedom. --") -// -// var e error -// var idPrv *prv.HiddenService -// if idPrv, e = prv.GenerateKey(); check(e) { -// return -// } -// idPub := pub.Derive(idPrv) -// nTotal := 5 -// tpt := transport.NewSim(nTotal) -// addr := slice.GenerateRandomAddrPortIPv4() -// nod, _ := relay.NewNode(addr, idPub, idPrv, tpt, 50000, true) -// eng, e = relay.NewEngine(tpt, idPrv, nod, nil, 5) -// interrupt.AddHandler(func() { eng.Q() }) -// log.D.Ln("starting up server") -// eng.Start() -// eng.Wait() -// log.I.Ln("fin") -// return -// }, -// } diff --git a/cmd/indra/relay/relay.go b/cmd/indra/relay/relay.go new file mode 100644 index 00000000..8a884a6c --- /dev/null +++ b/cmd/indra/relay/relay.go @@ -0,0 +1,17 @@ +package relay + +import ( + "github.com/spf13/cobra" +) + +func Init(c *cobra.Command) { + c.AddCommand(relayCommand) +} + +var relayCommand = &cobra.Command{ + Use: "relay", + Short: "run a relay", + Long: "Runs indra as a full relay, with optional client", + Run: func(cmd *cobra.Command, args []string) { + }, +} diff --git a/cmd/indra/root.go b/cmd/indra/root.go index 9937a652..e1c3ca01 100644 --- a/cmd/indra/root.go +++ b/cmd/indra/root.go @@ -3,6 +3,8 @@ package main import ( "errors" "github.com/indra-labs/indra" + "github.com/indra-labs/indra/cmd/indra/client" + "github.com/indra-labs/indra/cmd/indra/relay" "github.com/indra-labs/indra/cmd/indra/seed" log2 "github.com/indra-labs/indra/pkg/proc/log" "github.com/spf13/cobra" @@ -13,7 +15,7 @@ import ( var indraTxt = `indra (` + indra.SemVer + `) - Network Freedom. -indra is a lightning powered, distributed virtual private network for anonymising traffic on decentralised protocol networks. +indra is a lightning powered, distributed virtual private network that protects you from metadata collection and enables novel service models. ` var ( @@ -55,6 +57,8 @@ func init() { viper.BindPFlag("data-dir", rootCmd.PersistentFlags().Lookup("data-dir")) viper.BindPFlag("network", rootCmd.PersistentFlags().Lookup("network")) + relay.Init(rootCmd) + client.Init(rootCmd) seed.Init(rootCmd) } diff --git a/cmd/indra/version.go b/cmd/indra/version.go index 782d89c7..b91790e4 100644 --- a/cmd/indra/version.go +++ b/cmd/indra/version.go @@ -14,7 +14,7 @@ func init() { var versionCmd = &cobra.Command{ Use: "version", Short: "Prints the version number", - Long: `All software has versions. This is Hugo's`, + Long: `All software has versions. This is mine. Semver formatted.`, Run: func(cmd *cobra.Command, args []string) { fmt.Println(indra.SemVer) }, diff --git a/pkg/engine/ads/ads.go b/pkg/engine/ads/ads.go index 85db937a..c2d2f435 100644 --- a/pkg/engine/ads/ads.go +++ b/pkg/engine/ads/ads.go @@ -1,8 +1,12 @@ package ads import ( + "errors" + "github.com/indra-labs/indra/pkg/crypto" "github.com/indra-labs/indra/pkg/crypto/nonce" "github.com/indra-labs/indra/pkg/engine/node" + "github.com/indra-labs/indra/pkg/engine/payments" + "github.com/indra-labs/indra/pkg/engine/services" "github.com/indra-labs/indra/pkg/onions/adaddress" "github.com/indra-labs/indra/pkg/onions/adload" "github.com/indra-labs/indra/pkg/onions/adpeer" @@ -11,6 +15,7 @@ import ( log2 "github.com/indra-labs/indra/pkg/proc/log" "github.com/indra-labs/indra/pkg/util/multi" "github.com/multiformats/go-multiaddr" + "net/netip" "time" ) @@ -22,10 +27,10 @@ var ( const DefaultAdExpiry = time.Hour * 24 * 7 // one week type NodeAds struct { - Peer adpeer.Ad - Address adaddress.Ad - Services adservices.Ad - Load adload.Ad + Peer *adpeer.Ad + Address *adaddress.Ad + Services *adservices.Ad + Load *adload.Ad } func GetMultiaddr(n *node.Node) (ma multiaddr.Multiaddr, e error) { @@ -50,7 +55,7 @@ func GenerateAds(n *node.Node, load byte) (na *NodeAds, e error) { return } na = &NodeAds{ - Peer: adpeer.Ad{ + Peer: &adpeer.Ad{ Ad: adproto.Ad{ ID: nonce.NewID(), Key: n.Identity.Pub, @@ -58,7 +63,7 @@ func GenerateAds(n *node.Node, load byte) (na *NodeAds, e error) { }, RelayRate: n.RelayRate, }, - Address: adaddress.Ad{ + Address: &adaddress.Ad{ Ad: adproto.Ad{ ID: nonce.NewID(), Key: n.Identity.Pub, @@ -66,7 +71,7 @@ func GenerateAds(n *node.Node, load byte) (na *NodeAds, e error) { }, Addr: ma, }, - Services: adservices.Ad{ + Services: &adservices.Ad{ Ad: adproto.Ad{ ID: nonce.NewID(), Key: n.Identity.Pub, @@ -74,7 +79,7 @@ func GenerateAds(n *node.Node, load byte) (na *NodeAds, e error) { }, Services: svcs, }, - Load: adload.Ad{ + Load: &adload.Ad{ Ad: adproto.Ad{ ID: nonce.NewID(), Key: n.Identity.Pub, @@ -85,3 +90,39 @@ func GenerateAds(n *node.Node, load byte) (na *NodeAds, e error) { } return } + +const ErrNilNodeAds = "cannot process nil NodeAds" + +func NodeFromAds(a *NodeAds) (n *node.Node, e error) { + if a == nil || + a.Services == nil || a.Load == nil || + a.Address == nil || a.Peer == nil { + return n, errors.New(ErrNilNodeAds) + } + var ap netip.AddrPort + if ap, e = multi.AddrToAddrPort(a.Address.Addr); fails(e) { + return + } + var svcs services.Services + for i := range a.Services.Services { + svcs = append(svcs, &services.Service{ + Port: a.Services.Services[i].Port, + RelayRate: a.Services.Services[i].RelayRate, + Transport: nil, // todo: wen making? + }) + } + n = &node.Node{ + ID: nonce.NewID(), + AddrPort: &ap, + Identity: &crypto.Keys{ + Pub: a.Address.Key, + Bytes: a.Address.Key.ToBytes(), + }, + RelayRate: a.Peer.RelayRate, + Services: svcs, + Load: a.Load.Load, + PayChan: make(payments.PayChan, node.PaymentChanBuffers), // todo: other end stuff + Transport: nil, // this is populated when we dial it. + } + return +} diff --git a/pkg/engine/engine.go b/pkg/engine/engine.go index 7791db60..ceb0065b 100644 --- a/pkg/engine/engine.go +++ b/pkg/engine/engine.go @@ -34,9 +34,9 @@ type ( Engine struct { ctx context.Context cancel func() - Responses *responses.Pending - manager *sess.Manager - NodeAds *ads.NodeAds + Responses *responses.Pending + manager *sess.Manager + NodeAds *ads.NodeAds Listener *transport.Listener PubSub *pubsub.PubSub topic *pubsub.Topic diff --git a/pkg/engine/node/node.go b/pkg/engine/node/node.go index 13c494c5..1dcb9a42 100644 --- a/pkg/engine/node/node.go +++ b/pkg/engine/node/node.go @@ -29,8 +29,9 @@ type Node struct { sync.Mutex AddrPort *netip.AddrPort Identity *crypto.Keys - RelayRate uint32 // Base relay price mSAT/Mb. + RelayRate uint32 // Base relay price mSAT/Mb. Services services.Services // Services offered by this peer. + Load byte payments.PayChan Transport tpt.Transport } diff --git a/pkg/p2p/configure.go b/pkg/p2p/configure.go index c428a542..eb9aa724 100644 --- a/pkg/p2p/configure.go +++ b/pkg/p2p/configure.go @@ -1,9 +1,9 @@ package p2p import ( + "github.com/dgraph-io/badger/v3" "github.com/indra-labs/indra/pkg/cfg" "github.com/indra-labs/indra/pkg/storage" - "github.com/dgraph-io/badger/v3" "github.com/libp2p/go-libp2p/core/crypto" "github.com/multiformats/go-multiaddr" "github.com/spf13/viper" From adae9504cd312f119bd95dd01bb5ce716894dfd5 Mon Sep 17 00:00:00 2001 From: l0k18 Date: Tue, 20 Jun 2023 09:26:06 +0100 Subject: [PATCH 6/8] some more detailed info on how to use the scripts for development --- cmd/indra/client/client.go | 1 + cmd/indra/relay/relay.go | 28 +++++++++++++++++++++++++--- scripts/cdwork | 4 ---- scripts/cdwork.sh | 7 +++++++ scripts/readme.md | 11 +++++++++++ scripts/run.sh | 3 +++ scripts/test.sh | 2 +- scripts/testpkg.sh | 2 +- scripts/trace.sh | 3 +++ 9 files changed, 52 insertions(+), 9 deletions(-) delete mode 100755 scripts/cdwork create mode 100755 scripts/cdwork.sh create mode 100644 scripts/readme.md create mode 100755 scripts/run.sh create mode 100755 scripts/trace.sh diff --git a/cmd/indra/client/client.go b/cmd/indra/client/client.go index 25b49e37..d1a074c5 100644 --- a/cmd/indra/client/client.go +++ b/cmd/indra/client/client.go @@ -14,5 +14,6 @@ var clientCommand = &cobra.Command{ Long: "Runs indra as a client, providing a wireguard tunnel and socks5 " + "proxy as connectivity options", Run: func(cmd *cobra.Command, args []string) { + }, } diff --git a/cmd/indra/relay/relay.go b/cmd/indra/relay/relay.go index 8a884a6c..848ec974 100644 --- a/cmd/indra/relay/relay.go +++ b/cmd/indra/relay/relay.go @@ -1,17 +1,39 @@ package relay import ( + "github.com/indra-labs/indra" + log2 "github.com/indra-labs/indra/pkg/proc/log" "github.com/spf13/cobra" ) +var ( + log = log2.GetLogger() + check = log.E.Chk +) + +var ( + wireguardEnable bool + wireguardCIDR string + socksEnable bool + socksListener string +) + func Init(c *cobra.Command) { + relayCommand.PersistentFlags().BoolVarP(&wireguardEnable, "wireguard", + "w", false, "enable wiregfuard tunnel") + relayCommand.PersistentFlags().BoolVarP(&socksEnable, "socks", + "s", false, "enable socks proxy") + relayCommand.PersistentFlags().StringVar(&socksListener, "socks-listener", + "localhost:8080", "set address for socks 5 proxy listener") + c.AddCommand(relayCommand) } var relayCommand = &cobra.Command{ Use: "relay", Short: "run a relay", - Long: "Runs indra as a full relay, with optional client", - Run: func(cmd *cobra.Command, args []string) { - }, + Long: "Runs indra as a full relay, with optional client.", + Run: func(cmd *cobra.Command, args []string) { + log.I.Ln(log2.App.Load(), indra.SemVer) + }, } diff --git a/scripts/cdwork b/scripts/cdwork deleted file mode 100755 index 05476202..00000000 --- a/scripts/cdwork +++ /dev/null @@ -1,4 +0,0 @@ -#!/usr/bin/zsh -cd /home/loki/work/loki/indra-labs/indra -export PATH=/home/loki/work/loki/indra-labs/indra/scripts:$PATH -zsh \ No newline at end of file diff --git a/scripts/cdwork.sh b/scripts/cdwork.sh new file mode 100755 index 00000000..4cd89112 --- /dev/null +++ b/scripts/cdwork.sh @@ -0,0 +1,7 @@ +#!/usr/bin/zsh +# populate ~/.workpath thus: `cd path/to/repo/root; pwd>~/.workpath +export INDRAROOT=$(cat ~/.workpath) +export PATH=$INDRAROOT/scripts:$PATH +# put the path of the root of the repository in ./scripts/path +cd $INDRAROOT +zsh \ No newline at end of file diff --git a/scripts/readme.md b/scripts/readme.md new file mode 100644 index 00000000..8a613059 --- /dev/null +++ b/scripts/readme.md @@ -0,0 +1,11 @@ +# scripts + +populate ~/.workpath thus: `cd path/to/repo/root; pwd>~/.workpath` + +add this to your `~/.bashrc` or `~/.zshrc`: + + export PATH=$(cat ~/.workpath)/scripts:$HOME/sdk/go1.19.10/bin:$PATH + export GOBIN=$HOME/.local/bin + +`source` the `rc` file or open a new terminal session and type `cdwork.sh` and you will have a number of useful commands +that handle paths and special build parameters to make the code locations work without hard coding them anywhere. \ No newline at end of file diff --git a/scripts/run.sh b/scripts/run.sh new file mode 100755 index 00000000..39ec4aab --- /dev/null +++ b/scripts/run.sh @@ -0,0 +1,3 @@ +#!/usr/bin/env zsh +reset +go run -tags local -gcflags "all=-trimpath=$INDRAROOT" $1 --logs-level=trace $2 $3 $4 $5 \ No newline at end of file diff --git a/scripts/test.sh b/scripts/test.sh index 25cd4939..b95aff8f 100755 --- a/scripts/test.sh +++ b/scripts/test.sh @@ -1,3 +1,3 @@ #!/usr/bin/env zsh reset -go test -v -tags local -gcflags "all=-trimpath=/home/loki/work/loki/indra-labs/indra" $1 $2 $3 $4 $5 \ No newline at end of file +go test -v -tags local -gcflags "all=-trimpath=$INDRAROOT" $1 $2 $3 $4 $5 \ No newline at end of file diff --git a/scripts/testpkg.sh b/scripts/testpkg.sh index 9c8a2b0e..ce636cd2 100755 --- a/scripts/testpkg.sh +++ b/scripts/testpkg.sh @@ -1,3 +1,3 @@ #!/usr/bin/env zsh cd pkg -go test -v -tags local -gcflags "all=-trimpath=/home/loki/work/loki/indra-labs/indra" ./... \ No newline at end of file +go test -v -tags local -gcflags "all=-trimpath=$INDRAROOT" ./... \ No newline at end of file diff --git a/scripts/trace.sh b/scripts/trace.sh new file mode 100755 index 00000000..298e9733 --- /dev/null +++ b/scripts/trace.sh @@ -0,0 +1,3 @@ +#!/usr/bin/env zsh +reset +go run -tags local -gcflags "all=-trimpath=$INDRAROOT" $1 $2 $3 $4 $5 \ No newline at end of file From 94ec23242a4744fe21fe14514c5d7a55421fd2de Mon Sep 17 00:00:00 2001 From: l0k18 Date: Tue, 20 Jun 2023 10:01:06 +0100 Subject: [PATCH 7/8] furthur improvements --- cmd/indra/relay/relay.go | 4 ++++ cmd/indra/root.go | 9 +++------ pkg/engine/sendgetbalance_test.go | 2 +- pkg/proc/log/log.go | 2 +- scripts/cdwork.sh | 2 +- scripts/readme.md | 0 scripts/run.sh | 2 +- scripts/runci.sh | 3 +++ scripts/testci.sh | 3 +++ scripts/trace.sh | 2 +- 10 files changed, 18 insertions(+), 11 deletions(-) mode change 100644 => 100755 scripts/readme.md create mode 100755 scripts/runci.sh create mode 100755 scripts/testci.sh diff --git a/cmd/indra/relay/relay.go b/cmd/indra/relay/relay.go index 848ec974..f632f437 100644 --- a/cmd/indra/relay/relay.go +++ b/cmd/indra/relay/relay.go @@ -35,5 +35,9 @@ var relayCommand = &cobra.Command{ Long: "Runs indra as a full relay, with optional client.", Run: func(cmd *cobra.Command, args []string) { log.I.Ln(log2.App.Load(), indra.SemVer) + nw, _ := cmd.Parent().PersistentFlags().GetString("network") + var dd string + dd, _ = cmd.Parent().PersistentFlags().GetString("data-dir") + log.T.S("cmd", dd, nw, args) }, } diff --git a/cmd/indra/root.go b/cmd/indra/root.go index e1c3ca01..2906b5bb 100644 --- a/cmd/indra/root.go +++ b/cmd/indra/root.go @@ -7,6 +7,7 @@ import ( "github.com/indra-labs/indra/cmd/indra/relay" "github.com/indra-labs/indra/cmd/indra/seed" log2 "github.com/indra-labs/indra/pkg/proc/log" + "github.com/indra-labs/indra/pkg/util/appdata" "github.com/spf13/cobra" "github.com/spf13/viper" "os" @@ -49,7 +50,7 @@ func init() { rootCmd.PersistentFlags().BoolVarP(&cfgSave, "config-save", "", false, "saves the config file with any eligible envs/flags passed") rootCmd.PersistentFlags().StringVarP(&logsDir, "logs-dir", "L", "", "logging directory (default is $HOME/.indra/logs)") rootCmd.PersistentFlags().StringVarP(&logsLevel, "logs-level", "", "info", "set logging level off|fatal|error|warn|info|check|debug|trace") - rootCmd.PersistentFlags().StringVarP(&dataDir, "data-dir", "D", "", "data directory (default is $HOME/.indra/data)") + rootCmd.PersistentFlags().StringVarP(&dataDir, "data-dir", "D", appdata.Dir("indra", false), "data directory (default is $HOME/.indra/data)") rootCmd.PersistentFlags().StringVarP(&network, "network", "N", "mainnet", "selects the network mainnet|testnet|simnet") viper.BindPFlag("logs-dir", rootCmd.PersistentFlags().Lookup("logs-dir")) @@ -65,11 +66,7 @@ func init() { func initData() { if viper.GetString("data-dir") == "" { - home, err := os.UserHomeDir() - - cobra.CheckErr(err) - - viper.Set("data-dir", home+"/.indra/data") + viper.Set("data-dir", appdata.Dir("indra", false)) } } diff --git a/pkg/engine/sendgetbalance_test.go b/pkg/engine/sendgetbalance_test.go index 304eb7fa..1cecd59a 100644 --- a/pkg/engine/sendgetbalance_test.go +++ b/pkg/engine/sendgetbalance_test.go @@ -57,7 +57,7 @@ func TestClient_SendGetBalance(t *testing.T) { returner = returnHops[0] clients[0].SendGetBalance(returner, clients[0].Mgr().Sessions[i], func(cf nonce.ID, ifc interface{}, b slice.Bytes) (e error) { - log.I.Ln("success") + log.D.Ln("success") wg.Done() quit.Q() return diff --git a/pkg/proc/log/log.go b/pkg/proc/log/log.go index ef617394..4ccb61f8 100644 --- a/pkg/proc/log/log.go +++ b/pkg/proc/log/log.go @@ -403,7 +403,7 @@ func logPrint( ), LevelSpecs[level].Colorizer(loc), printFunc(), - LevelSpecs[level].Colorizer(timeText), + color.Black.Sprint(timeText), ) s = strings.TrimSuffix(s, "\n") fmt.Fprintln(writer, s) diff --git a/scripts/cdwork.sh b/scripts/cdwork.sh index 4cd89112..e73033fd 100755 --- a/scripts/cdwork.sh +++ b/scripts/cdwork.sh @@ -4,4 +4,4 @@ export INDRAROOT=$(cat ~/.workpath) export PATH=$INDRAROOT/scripts:$PATH # put the path of the root of the repository in ./scripts/path cd $INDRAROOT -zsh \ No newline at end of file +zsh diff --git a/scripts/readme.md b/scripts/readme.md old mode 100644 new mode 100755 diff --git a/scripts/run.sh b/scripts/run.sh index 39ec4aab..298e9733 100755 --- a/scripts/run.sh +++ b/scripts/run.sh @@ -1,3 +1,3 @@ #!/usr/bin/env zsh reset -go run -tags local -gcflags "all=-trimpath=$INDRAROOT" $1 --logs-level=trace $2 $3 $4 $5 \ No newline at end of file +go run -tags local -gcflags "all=-trimpath=$INDRAROOT" $1 $2 $3 $4 $5 \ No newline at end of file diff --git a/scripts/runci.sh b/scripts/runci.sh new file mode 100755 index 00000000..53176867 --- /dev/null +++ b/scripts/runci.sh @@ -0,0 +1,3 @@ +#!/usr/bin/env zsh +reset +go run -gcflags "all=-trimpath=$INDRAROOT" $1 $2 $3 $4 $5 \ No newline at end of file diff --git a/scripts/testci.sh b/scripts/testci.sh new file mode 100755 index 00000000..81210f01 --- /dev/null +++ b/scripts/testci.sh @@ -0,0 +1,3 @@ +#!/usr/bin/env zsh +reset +go test -v -gcflags "all=-trimpath=$INDRAROOT" $1 $2 $3 $4 $5 \ No newline at end of file diff --git a/scripts/trace.sh b/scripts/trace.sh index 298e9733..39ec4aab 100755 --- a/scripts/trace.sh +++ b/scripts/trace.sh @@ -1,3 +1,3 @@ #!/usr/bin/env zsh reset -go run -tags local -gcflags "all=-trimpath=$INDRAROOT" $1 $2 $3 $4 $5 \ No newline at end of file +go run -tags local -gcflags "all=-trimpath=$INDRAROOT" $1 --logs-level=trace $2 $3 $4 $5 \ No newline at end of file From e943991f8fe0e522a4fe05753a0af1e25fd6a590 Mon Sep 17 00:00:00 2001 From: l0k18 Date: Tue, 20 Jun 2023 12:14:21 +0100 Subject: [PATCH 8/8] viper flags and tidier bits in engine --- cmd/indra/relay/relay.go | 14 ++- pkg/engine/engine.go | 261 +++++++++++++++++++-------------------- 2 files changed, 140 insertions(+), 135 deletions(-) diff --git a/cmd/indra/relay/relay.go b/cmd/indra/relay/relay.go index f632f437..40b6a519 100644 --- a/cmd/indra/relay/relay.go +++ b/cmd/indra/relay/relay.go @@ -4,6 +4,7 @@ import ( "github.com/indra-labs/indra" log2 "github.com/indra-labs/indra/pkg/proc/log" "github.com/spf13/cobra" + "github.com/spf13/viper" ) var ( @@ -13,18 +14,22 @@ var ( var ( wireguardEnable bool - wireguardCIDR string + wireguardCIDR string // todo: there must be something like this. default route to 1 socksEnable bool socksListener string ) func Init(c *cobra.Command) { relayCommand.PersistentFlags().BoolVarP(&wireguardEnable, "wireguard", - "w", false, "enable wiregfuard tunnel") + "w", false, "enable wireguard tunnel") relayCommand.PersistentFlags().BoolVarP(&socksEnable, "socks", "s", false, "enable socks proxy") - relayCommand.PersistentFlags().StringVar(&socksListener, "socks-listener", - "localhost:8080", "set address for socks 5 proxy listener") + relayCommand.PersistentFlags().StringVarP(&socksListener, "socks-listener", + "l", "localhost:8080", "set address for socks 5 proxy listener") + + viper.BindPFlag("wireguard", relayCommand.PersistentFlags().Lookup("wireguard")) + viper.BindPFlag("socks", relayCommand.PersistentFlags().Lookup("socks")) + viper.BindPFlag("socks-listener", relayCommand.PersistentFlags().Lookup("socks-listener")) c.AddCommand(relayCommand) } @@ -33,6 +38,7 @@ var relayCommand = &cobra.Command{ Use: "relay", Short: "run a relay", Long: "Runs indra as a full relay, with optional client.", + Run: func(cmd *cobra.Command, args []string) { log.I.Ln(log2.App.Load(), indra.SemVer) nw, _ := cmd.Parent().PersistentFlags().GetString("network") diff --git a/pkg/engine/engine.go b/pkg/engine/engine.go index ceb0065b..d4621169 100644 --- a/pkg/engine/engine.go +++ b/pkg/engine/engine.go @@ -48,143 +48,15 @@ type ( ShuttingDown atomic.Bool } Params struct { - Transport tpt.Transport - Listener *transport.Listener - *crypto.Keys + Transport tpt.Transport + Listener *transport.Listener + Keys *crypto.Keys Node *node.Node Nodes []*node.Node NReturnSessions int } ) -// Cleanup closes and flushes any resources the client opened that require sync -// in order to reopen correctly. -func (ng *Engine) Cleanup() { - // Do cleanup stuff before shutdown. -} - -func (ng *Engine) GetHidden() *hidden.Hidden { return ng.h } - -func (ng *Engine) GetLoad() byte { return byte(ng.Load.Load()) } - -func (ng *Engine) HandleMessage(s *splice.Splice, pr ont.Onion) { - log.D.F("%s handling received message", - ng.Mgr().GetLocalNodeAddressString()) - s.SetCursor(0) - s.Segments = s.Segments[:0] - on := reg.Recognise(s) - if on != nil { - log.D.Ln("magic", on.Magic()) - if fails(on.Decode(s)) { - return - } - if pr != nil && on.Magic() != pr.Magic() { - log.D.S("", s.GetAll().ToBytes()) - } - m := on.GetOnion() - if m == nil { - log.D.Ln("did not get onion") - return - } - if fails(m.(ont.Onion).Handle(s, pr, ng)) { - log.W.S("unrecognised packet", s.GetAll().ToBytes()) - } - } -} - -func (ng *Engine) Handler() (out bool) { - log.T.C(func() string { - return ng.Mgr().GetLocalNodeAddressString() + " awaiting message" - }) - var prev ont.Onion - select { - case <-ng.ctx.Done(): - ng.Shutdown() - out = true - break - case c := <-ng.Listener.Accept(): - go func() { - log.D.Ln("new connection inbound (TODO):", c.Host.Addrs()) - _ = c - }() - case b := <-ng.Mgr().ReceiveToLocalNode(): - s := splice.Load(b, slice.NewCursor()) - ng.HandleMessage(s, prev) - case p := <-ng.Mgr().GetLocalNode().PayChan.Receive(): - log.D.F("incoming payment for %s: %v", p.ID, p.Amount) - topUp := false - ng.Mgr().IterateSessions(func(s *sessions.Data) bool { - if s.Preimage == p.Preimage { - s.IncSats(p.Amount, false, "top-up") - topUp = true - log.T.F("topping up %x with %v", s.Header.Bytes, p.Amount) - return true - } - return false - }) - if !topUp { - ng.Mgr().AddPendingPayment(p) - log.T.F("awaiting session keys for preimage %s session Keys %s", - p.Preimage, p.ID) - } - // For now if we received this we return true. Later this will wait with - // a timeout on the lnd node returning the success to trigger this. - p.ConfirmChan <- true - case <-ng.Pause: - log.D.Ln("pausing", ng.Mgr().GetLocalNodeAddressString()) - // For testing purposes we need to halt this Handler and discard channel - // messages. - out: - for { - select { - case <-ng.Mgr().GetLocalNode().PayChan.Receive(): - log.D.Ln("discarding payments while in pause") - case <-ng.Mgr().ReceiveToLocalNode(): - log.D.Ln("discarding messages while in pause") - case <-ng.ctx.Done(): - break out - case <-ng.Pause: - // This will then resume to the top level select. - log.D.Ln("unpausing", ng.Mgr().GetLocalNodeAddressString()) - break out - } - } - } - return -} - -func (ng *Engine) Keyset() *crypto.KeySet { return ng.KeySet } -func (ng *Engine) WaitForShutdown() <-chan struct{} { return ng.ctx.Done() } -func (ng *Engine) Mgr() *sess.Manager { return ng.manager } -func (ng *Engine) Pending() *responses.Pending { return ng.Responses } -func (ng *Engine) SetLoad(load byte) { ng.Load.Store(uint32(load)) } - -// Shutdown triggers the shutdown of the client and the Cleanup before -// finishing. -func (ng *Engine) Shutdown() { - log.T.Ln("shutting down", ng.Mgr().GetLocalNodeAddress().String()) - if ng.ShuttingDown.Load() { - return - } - ng.ShuttingDown.Store(true) - ng.Cleanup() - ng.cancel() -} - -// Start a single thread of the Engine. -func (ng *Engine) Start() { - log.T.Ln("starting engine") - if ng.sub != nil { - log.T.Ln("starting gossip handling") - ng.RunAdHandler(ng.HandleAd) - } - for { - if ng.Handler() { - break - } - } -} - // New creates a new Engine according to the Params given. func New(p Params) (ng *Engine, e error) { p.Node.Transport = p.Transport @@ -232,3 +104,130 @@ func New(p Params) (ng *Engine, e error) { } return } + +// Cleanup closes and flushes any resources the client opened that require sync +// in order to reopen correctly. +func (ng *Engine) Cleanup() { + // Do cleanup stuff before shutdown. +} + +func (ng *Engine) GetHidden() *hidden.Hidden { return ng.h } + +func (ng *Engine) GetLoad() byte { return byte(ng.Load.Load()) } + +func (ng *Engine) HandleMessage(s *splice.Splice, pr ont.Onion) { + log.D.F("%s handling received message", + ng.Mgr().GetLocalNodeAddressString()) + s.SetCursor(0) + s.Segments = s.Segments[:0] + on := reg.Recognise(s) + if on != nil { + log.D.Ln("magic", on.Magic()) + if fails(on.Decode(s)) { + return + } + if pr != nil && on.Magic() != pr.Magic() { + log.D.S("", s.GetAll().ToBytes()) + } + m := on.GetOnion() + if m == nil { + log.D.Ln("did not get onion") + return + } + if fails(m.(ont.Onion).Handle(s, pr, ng)) { + log.W.S("unrecognised packet", s.GetAll().ToBytes()) + } + } +} + +func (ng *Engine) Handler() (terminate bool) { + log.T.C(func() string { + return ng.Mgr().GetLocalNodeAddressString() + " awaiting message" + }) + var prev ont.Onion + select { + case <-ng.ctx.Done(): + ng.Shutdown() + return true + case c := <-ng.Listener.Accept(): + go func() { + log.D.Ln("new connection inbound (TODO):", c.Host.Addrs()) + _ = c + }() + case b := <-ng.Mgr().ReceiveToLocalNode(): + s := splice.Load(b, slice.NewCursor()) + ng.HandleMessage(s, prev) + case p := <-ng.Mgr().GetLocalNode().PayChan.Receive(): + log.D.F("incoming payment for %s: %v", p.ID, p.Amount) + topUp := false + ng.Mgr().IterateSessions(func(s *sessions.Data) bool { + if s.Preimage == p.Preimage { + s.IncSats(p.Amount, false, "top-up") + topUp = true + log.T.F("topping up %x with %v", s.Header.Bytes, p.Amount) + return true + } + return false + }) + if !topUp { + ng.Mgr().AddPendingPayment(p) + log.T.F("awaiting session keys for preimage %s session Keys %s", + p.Preimage, p.ID) + } + // For now if we received this we return true. Later this will wait with a + // timeout on the ln node returning the success to trigger this. + p.ConfirmChan <- true + case <-ng.Pause: + log.D.Ln("pausing", ng.Mgr().GetLocalNodeAddressString()) + // For testing purposes we need to halt this Handler and discard channel + // messages. + out: + for { + select { + case <-ng.Mgr().GetLocalNode().PayChan.Receive(): + log.D.Ln("discarding payments while in pause") + case <-ng.Mgr().ReceiveToLocalNode(): + log.D.Ln("discarding messages while in pause") + case <-ng.ctx.Done(): + return true + case <-ng.Pause: + // This will then resume to the top level select. + log.D.Ln("unpausing", ng.Mgr().GetLocalNodeAddressString()) + break out + } + } + } + return +} + +func (ng *Engine) Keyset() *crypto.KeySet { return ng.KeySet } +func (ng *Engine) WaitForShutdown() <-chan struct{} { return ng.ctx.Done() } +func (ng *Engine) Mgr() *sess.Manager { return ng.manager } +func (ng *Engine) Pending() *responses.Pending { return ng.Responses } +func (ng *Engine) SetLoad(load byte) { ng.Load.Store(uint32(load)) } + +// Shutdown triggers the shutdown of the client and the Cleanup before +// finishing. +func (ng *Engine) Shutdown() { + log.T.Ln("shutting down", ng.Mgr().GetLocalNodeAddress().String()) + if ng.ShuttingDown.Load() { + return + } + ng.ShuttingDown.Store(true) + ng.Cleanup() + ng.cancel() +} + +// Start a single thread of the Engine. +func (ng *Engine) Start() { + log.T.Ln("starting engine") + if ng.sub != nil { + log.T.Ln("starting gossip handling") + ng.RunAdHandler(ng.HandleAd) + } + for { + if ng.Handler() { + break + } + } +}