peer metadata store test improved

problem has come up that the package we are using does not expose the iterator of the underlying kv store so a wrapper will be made to embed it and add this functionality to the back end exposed.
This commit is contained in:
l0k18
2023-07-21 11:22:36 +01:00
parent a44ae2ab0e
commit 26005c9c2e
26 changed files with 3770 additions and 12 deletions

View File

@@ -191,6 +191,11 @@ func (ng *Engine) HandleAd(p *pubsub.Message) (e error) {
log.E.Ln("received slice of no length")
return
}
if p.ReceivedFrom == ng.Listener.Host.ID() {
// Not sure why the Next function delivers these but they are not
// needed.
return
}
s := splice.NewFrom(p.Data)
c := reg.Recognise(s)
if c == nil {
@@ -202,8 +207,6 @@ func (ng *Engine) HandleAd(p *pubsub.Message) (e error) {
var ok bool
switch c.(type) {
case *addresses.Ad:
log.D.Ln(ng.LogEntry(fmt.Sprint("received ", reflect.TypeOf(c),
" from gossip network")))
var addr *addresses.Ad
if addr, ok = c.(*addresses.Ad); !ok {
return fmt.Errorf(ErrWrongTypeDecode,
@@ -211,6 +214,12 @@ func (ng *Engine) HandleAd(p *pubsub.Message) (e error) {
} else if !addr.Validate() {
return errors.New("addr ad failed validation")
}
// No need to store our own (why does pubsub do this?)
if addr.Key.Fingerprint() == ng.Listener.Pub.Fingerprint() {
break
}
log.D.Ln(ng.LogEntry("received"), reflect.TypeOf(c),
"from gossip network for node", addr.Key.Fingerprint())
// If we got to here now we can add to the PeerStore.
var id peer.ID
if id, e = peer.IDFromPublicKey(addr.Key); fails(e) {
@@ -230,6 +239,10 @@ func (ng *Engine) HandleAd(p *pubsub.Message) (e error) {
} else if !intr.Validate() {
return errors.New("intro ad failed validation")
}
// We don't need to store introductions we are hosting again.
if intr.Introducer.Fingerprint() == ng.Listener.Pub.Fingerprint() {
break
}
log.D.Ln(ng.LogEntry("received"), reflect.TypeOf(c),
"from gossip network for node", intr.Key.Fingerprint())
// If we got to here now we can add to the PeerStore.
@@ -249,6 +262,9 @@ func (ng *Engine) HandleAd(p *pubsub.Message) (e error) {
} else if !lod.Validate() {
return errors.New("load ad failed validation")
}
if lod.Key.Fingerprint() == ng.Listener.Pub.Fingerprint() {
break
}
log.D.Ln(ng.LogEntry("received"), reflect.TypeOf(c),
"from gossip network for node", lod.Key.Fingerprint())
// If we got to here now we can add to the PeerStore.
@@ -258,7 +274,7 @@ func (ng *Engine) HandleAd(p *pubsub.Message) (e error) {
}
log.T.Ln(ng.LogEntry("storing ad"))
if e = ng.Listener.Host.
Peerstore().Put(id, services.Magic, s.GetAll().ToBytes()); fails(e) {
Peerstore().Put(id, load.Magic, s.GetAll().ToBytes()); fails(e) {
return
}
case *peer2.Ad:
@@ -269,6 +285,9 @@ func (ng *Engine) HandleAd(p *pubsub.Message) (e error) {
} else if !pa.Validate() {
return errors.New("peer ad failed validation")
}
if pa.Key.Fingerprint() == ng.Listener.Pub.Fingerprint() {
break
}
log.D.Ln(ng.LogEntry("received"), reflect.TypeOf(c),
"from gossip network for node", pa.Key.Fingerprint())
// If we got to here now we can add to the PeerStore.
@@ -288,6 +307,9 @@ func (ng *Engine) HandleAd(p *pubsub.Message) (e error) {
} else if !sa.Validate() {
return errors.New("services ad failed validation")
}
if sa.Key.Fingerprint() == ng.Listener.Pub.Fingerprint() {
break
}
log.D.Ln(ng.LogEntry("received"), reflect.TypeOf(c),
"from gossip network for node", sa.Key.Fingerprint())
// If we got to here now we can add to the PeerStore.
@@ -335,15 +357,18 @@ func (ng *Engine) GetPeerRecord(id peer.ID, key string) (add cert.Act, e error)
return
}
func (ng *Engine) IteratePeerRecords(func()) {
}
// ClearPeerRecord places an empty slice into a peer record by way of deleting it.
//
// todo: these should be purged from the peerstore in a GC pass.
// todo: these should be purged from the peerstore in a GC pass. Expiry and storage limits...
func (ng *Engine) ClearPeerRecord(id peer.ID, key string) (e error) {
if _, e = ng.Listener.Host.Peerstore().Get(id, key); fails(e) {
return
}
if e = ng.Listener.Host.
Peerstore().Put(id, key, []byte{}); fails(e) {
if e = ng.Listener.Host.Peerstore().Put(id, key, []byte{}); fails(e) {
return
}
return

View File

@@ -95,7 +95,9 @@ func pauza() {
func TestEngine_PeerStoreDiscovery(t *testing.T) {
// This test doesn't have a concrete failure mode, as inevitably the nodes
// that start in the first half of the set, and the nodes in the second
// half, the latter may miss the former's advertisements.
// half, the latter may miss the former's advertisements. If the functions
// are broken this will fail however, as this is the conditions that trigger
// a failure.
//
// It is more for demonstration to see that the gossip is indeed propagating
// in the first round as nodes start up, and more comprehensive tests by
@@ -104,7 +106,7 @@ func TestEngine_PeerStoreDiscovery(t *testing.T) {
if indra.CI == "false" {
log2.SetLogLevel(log2.Trace)
}
const nTotal = 20
const nTotal = 10
var e error
var engines []*Engine
var cleanup func()
@@ -112,8 +114,20 @@ func TestEngine_PeerStoreDiscovery(t *testing.T) {
if engines, cleanup, e = CreateAndStartMockEngines(nTotal, ctx); fails(e) {
t.FailNow()
}
_ = engines
time.Sleep(time.Second * 8)
time.Sleep(time.Second * 3)
// Send them all again after a bit to make sure everyone gets them.
for i := range engines {
if e = engines[i].SendAds(); fails(e) {
t.FailNow()
}
}
time.Sleep(time.Second * 3)
for i := range engines {
_ = i
// check that all peers now have nTotal-1 distinct peer ads (of all 4
// types)
}
cleanup()
pauza()
}

View File

@@ -87,7 +87,6 @@ func (l *Listener) Tick(h host.Host, rendezvous []multiaddr.Multiaddr,
peers <-chan peer.AddrInfo, disco *routing.RoutingDiscovery,
ctx context.Context) (e error) {
log.T.Ln()
for i := range rendezvous {
if peers, e = disco.FindPeers(ctx,
rendezvous[i].String()); fails(e) {

View File

@@ -0,0 +1,617 @@
package pstoreds
import (
"bytes"
"context"
"fmt"
"sort"
"sync"
"time"
"github.com/libp2p/go-libp2p/core/peer"
pstore "github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/record"
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoreds/pb"
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"
lru "github.com/hashicorp/golang-lru/v2"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
logging "github.com/ipfs/go-log/v2"
b32 "github.com/multiformats/go-base32"
ma "github.com/multiformats/go-multiaddr"
"google.golang.org/protobuf/proto"
)
type ttlWriteMode int
const (
ttlOverride ttlWriteMode = iota
ttlExtend
)
var (
log = logging.Logger("peerstore/ds")
// Peer addresses are stored db key pattern:
// /peers/addrs/<b32 peer id no padding>
addrBookBase = ds.NewKey("/peers/addrs")
)
// addrsRecord decorates the AddrBookRecord with locks and metadata.
type addrsRecord struct {
sync.RWMutex
*pb.AddrBookRecord
dirty bool
}
// flush writes the record to the datastore by calling ds.Put, unless the record is
// marked for deletion, in which case we call ds.Delete. To be called within a lock.
func (r *addrsRecord) flush(write ds.Write) (err error) {
key := addrBookBase.ChildString(b32.RawStdEncoding.EncodeToString(r.Id))
if len(r.Addrs) == 0 {
if err = write.Delete(context.TODO(), key); err == nil {
r.dirty = false
}
return err
}
data, err := proto.Marshal(r)
if err != nil {
return err
}
if err = write.Put(context.TODO(), key, data); err != nil {
return err
}
// write succeeded; record is no longer dirty.
r.dirty = false
return nil
}
// clean is called on records to perform housekeeping. The return value indicates if the record was changed
// as a result of this call.
//
// clean does the following:
// * sorts addresses by expiration (soonest expiring first).
// * removes expired addresses.
//
// It short-circuits optimistically when there's nothing to do.
//
// clean is called from several points:
// * when accessing an entry.
// * when performing periodic GC.
// * after an entry has been modified (e.g. addresses have been added or removed, TTLs updated, etc.)
//
// If the return value is true, the caller should perform a flush immediately to sync the record with the store.
func (r *addrsRecord) clean(now time.Time) (chgd bool) {
nowUnix := now.Unix()
addrsLen := len(r.Addrs)
if !r.dirty && !r.hasExpiredAddrs(nowUnix) {
// record is not dirty, and we have no expired entries to purge.
return false
}
if addrsLen == 0 {
// this is a ghost record; let's signal it has to be written.
// flush() will take care of doing the deletion.
return true
}
if r.dirty && addrsLen > 1 {
sort.Slice(r.Addrs, func(i, j int) bool {
return r.Addrs[i].Expiry < r.Addrs[j].Expiry
})
}
r.Addrs = removeExpired(r.Addrs, nowUnix)
return r.dirty || len(r.Addrs) != addrsLen
}
func (r *addrsRecord) hasExpiredAddrs(now int64) bool {
if len(r.Addrs) > 0 && r.Addrs[0].Expiry <= now {
return true
}
return false
}
func removeExpired(entries []*pb.AddrBookRecord_AddrEntry, now int64) []*pb.AddrBookRecord_AddrEntry {
// since addresses are sorted by expiration, we find the first
// survivor and split the slice on its index.
pivot := -1
for i, addr := range entries {
if addr.Expiry > now {
break
}
pivot = i
}
return entries[pivot+1:]
}
// dsAddrBook is an address book backed by a Datastore with a GC procedure to purge expired entries. It uses an
// in-memory address stream manager. See the NewAddrBook for more information.
type dsAddrBook struct {
ctx context.Context
opts Options
cache cache[peer.ID, *addrsRecord]
ds ds.Batching
gc *dsAddrBookGc
subsManager *pstoremem.AddrSubManager
// controls children goroutine lifetime.
childrenDone sync.WaitGroup
cancelFn func()
clock clock
}
type clock interface {
Now() time.Time
After(d time.Duration) <-chan time.Time
}
type realclock struct{}
func (rc realclock) Now() time.Time {
return time.Now()
}
func (rc realclock) After(d time.Duration) <-chan time.Time {
return time.After(d)
}
var _ pstore.AddrBook = (*dsAddrBook)(nil)
var _ pstore.CertifiedAddrBook = (*dsAddrBook)(nil)
// NewAddrBook initializes a new datastore-backed address book. It serves as a drop-in replacement for pstoremem
// (memory-backed peerstore), and works with any datastore implementing the ds.Batching interface.
//
// Addresses and peer records are serialized into protobuf, storing one datastore entry per peer, along with metadata
// to control address expiration. To alleviate disk access and serde overhead, we internally use a read/write-through
// ARC cache, the size of which is adjustable via Options.CacheSize.
//
// The user has a choice of two GC algorithms:
//
// - lookahead GC: minimises the amount of full store traversals by maintaining a time-indexed list of entries that
// need to be visited within the period specified in Options.GCLookaheadInterval. This is useful in scenarios with
// considerable TTL variance, coupled with datastores whose native iterators return entries in lexicographical key
// order. Enable this mode by passing a value Options.GCLookaheadInterval > 0. Lookahead windows are jumpy, not
// sliding. Purges operate exclusively over the lookahead window with periodicity Options.GCPurgeInterval.
//
// - full-purge GC (default): performs a full visit of the store with periodicity Options.GCPurgeInterval. Useful when
// the range of possible TTL values is small and the values themselves are also extreme, e.g. 10 minutes or
// permanent, popular values used in other libp2p modules. In this cited case, optimizing with lookahead windows
// makes little sense.
func NewAddrBook(ctx context.Context, store ds.Batching, opts Options) (ab *dsAddrBook, err error) {
ctx, cancelFn := context.WithCancel(ctx)
ab = &dsAddrBook{
ctx: ctx,
ds: store,
opts: opts,
cancelFn: cancelFn,
subsManager: pstoremem.NewAddrSubManager(),
clock: realclock{},
}
if opts.Clock != nil {
ab.clock = opts.Clock
}
if opts.CacheSize > 0 {
if ab.cache, err = lru.NewARC[peer.ID, *addrsRecord](int(opts.CacheSize)); err != nil {
return nil, err
}
} else {
ab.cache = new(noopCache[peer.ID, *addrsRecord])
}
if ab.gc, err = newAddressBookGc(ctx, ab); err != nil {
return nil, err
}
return ab, nil
}
func (ab *dsAddrBook) Close() error {
ab.cancelFn()
ab.childrenDone.Wait()
return nil
}
// loadRecord is a read-through fetch. It fetches a record from cache, falling back to the
// datastore upon a miss, and returning a newly initialized record if the peer doesn't exist.
//
// loadRecord calls clean() on an existing record before returning it. If the record changes
// as a result and the update argument is true, the resulting state is saved in the datastore.
//
// If the cache argument is true, the record is inserted in the cache when loaded from the datastore.
func (ab *dsAddrBook) loadRecord(id peer.ID, cache bool, update bool) (pr *addrsRecord, err error) {
if pr, ok := ab.cache.Get(id); ok {
pr.Lock()
defer pr.Unlock()
if pr.clean(ab.clock.Now()) && update {
err = pr.flush(ab.ds)
}
return pr, err
}
pr = &addrsRecord{AddrBookRecord: &pb.AddrBookRecord{}}
key := addrBookBase.ChildString(b32.RawStdEncoding.EncodeToString([]byte(id)))
data, err := ab.ds.Get(context.TODO(), key)
switch err {
case ds.ErrNotFound:
err = nil
pr.Id = []byte(id)
case nil:
if err := proto.Unmarshal(data, pr); err != nil {
return nil, err
}
// this record is new and local for now (not in cache), so we don't need to lock.
if pr.clean(ab.clock.Now()) && update {
err = pr.flush(ab.ds)
}
default:
return nil, err
}
if cache {
ab.cache.Add(id, pr)
}
return pr, err
}
// AddAddr will add a new address if it's not already in the AddrBook.
func (ab *dsAddrBook) AddAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration) {
ab.AddAddrs(p, []ma.Multiaddr{addr}, ttl)
}
// AddAddrs will add many new addresses if they're not already in the AddrBook.
func (ab *dsAddrBook) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
if ttl <= 0 {
return
}
addrs = cleanAddrs(addrs, p)
ab.setAddrs(p, addrs, ttl, ttlExtend, false)
}
// ConsumePeerRecord adds addresses from a signed peer.PeerRecord (contained in
// a record.Envelope), which will expire after the given TTL.
// See https://godoc.org/github.com/libp2p/go-libp2p/core/peerstore#CertifiedAddrBook for more details.
func (ab *dsAddrBook) ConsumePeerRecord(recordEnvelope *record.Envelope, ttl time.Duration) (bool, error) {
r, err := recordEnvelope.Record()
if err != nil {
return false, err
}
rec, ok := r.(*peer.PeerRecord)
if !ok {
return false, fmt.Errorf("envelope did not contain PeerRecord")
}
if !rec.PeerID.MatchesPublicKey(recordEnvelope.PublicKey) {
return false, fmt.Errorf("signing key does not match PeerID in PeerRecord")
}
// ensure that the seq number from envelope is >= any previously received seq no
// update when equal to extend the ttls
if ab.latestPeerRecordSeq(rec.PeerID) > rec.Seq {
return false, nil
}
addrs := cleanAddrs(rec.Addrs, rec.PeerID)
err = ab.setAddrs(rec.PeerID, addrs, ttl, ttlExtend, true)
if err != nil {
return false, err
}
err = ab.storeSignedPeerRecord(rec.PeerID, recordEnvelope, rec)
if err != nil {
return false, err
}
return true, nil
}
func (ab *dsAddrBook) latestPeerRecordSeq(p peer.ID) uint64 {
pr, err := ab.loadRecord(p, true, false)
if err != nil {
// We ignore the error because we don't want to fail storing a new record in this
// case.
log.Errorw("unable to load record", "peer", p, "error", err)
return 0
}
pr.RLock()
defer pr.RUnlock()
if len(pr.Addrs) == 0 || pr.CertifiedRecord == nil || len(pr.CertifiedRecord.Raw) == 0 {
return 0
}
return pr.CertifiedRecord.Seq
}
func (ab *dsAddrBook) storeSignedPeerRecord(p peer.ID, envelope *record.Envelope, rec *peer.PeerRecord) error {
envelopeBytes, err := envelope.Marshal()
if err != nil {
return err
}
// reload record and add routing state
// this has to be done after we add the addresses, since if
// we try to flush a datastore record with no addresses,
// it will just get deleted
pr, err := ab.loadRecord(p, true, false)
if err != nil {
return err
}
pr.Lock()
defer pr.Unlock()
pr.CertifiedRecord = &pb.AddrBookRecord_CertifiedRecord{
Seq: rec.Seq,
Raw: envelopeBytes,
}
pr.dirty = true
err = pr.flush(ab.ds)
return err
}
// GetPeerRecord returns a record.Envelope containing a peer.PeerRecord for the
// given peer id, if one exists.
// Returns nil if no signed PeerRecord exists for the peer.
func (ab *dsAddrBook) GetPeerRecord(p peer.ID) *record.Envelope {
pr, err := ab.loadRecord(p, true, false)
if err != nil {
log.Errorf("unable to load record for peer %s: %v", p.Pretty(), err)
return nil
}
pr.RLock()
defer pr.RUnlock()
if pr.CertifiedRecord == nil || len(pr.CertifiedRecord.Raw) == 0 || len(pr.Addrs) == 0 {
return nil
}
state, _, err := record.ConsumeEnvelope(pr.CertifiedRecord.Raw, peer.PeerRecordEnvelopeDomain)
if err != nil {
log.Errorf("error unmarshaling stored signed peer record for peer %s: %v", p.Pretty(), err)
return nil
}
return state
}
// SetAddr will add or update the TTL of an address in the AddrBook.
func (ab *dsAddrBook) SetAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration) {
ab.SetAddrs(p, []ma.Multiaddr{addr}, ttl)
}
// SetAddrs will add or update the TTLs of addresses in the AddrBook.
func (ab *dsAddrBook) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
addrs = cleanAddrs(addrs, p)
if ttl <= 0 {
ab.deleteAddrs(p, addrs)
return
}
ab.setAddrs(p, addrs, ttl, ttlOverride, false)
}
// UpdateAddrs will update any addresses for a given peer and TTL combination to
// have a new TTL.
func (ab *dsAddrBook) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL time.Duration) {
pr, err := ab.loadRecord(p, true, false)
if err != nil {
log.Errorf("failed to update ttls for peer %s: %s\n", p.Pretty(), err)
return
}
pr.Lock()
defer pr.Unlock()
newExp := ab.clock.Now().Add(newTTL).Unix()
for _, entry := range pr.Addrs {
if entry.Ttl != int64(oldTTL) {
continue
}
entry.Ttl, entry.Expiry = int64(newTTL), newExp
pr.dirty = true
}
if pr.clean(ab.clock.Now()) {
pr.flush(ab.ds)
}
}
// Addrs returns all of the non-expired addresses for a given peer.
func (ab *dsAddrBook) Addrs(p peer.ID) []ma.Multiaddr {
pr, err := ab.loadRecord(p, true, true)
if err != nil {
log.Warn("failed to load peerstore entry for peer %v while querying addrs, err: %v", p, err)
return nil
}
pr.RLock()
defer pr.RUnlock()
addrs := make([]ma.Multiaddr, len(pr.Addrs))
for i, a := range pr.Addrs {
var err error
addrs[i], err = ma.NewMultiaddrBytes(a.Addr)
if err != nil {
log.Warn("failed to parse peerstore entry for peer %v while querying addrs, err: %v", p, err)
return nil
}
}
return addrs
}
// Peers returns all of the peer IDs for which the AddrBook has addresses.
func (ab *dsAddrBook) PeersWithAddrs() peer.IDSlice {
ids, err := uniquePeerIds(ab.ds, addrBookBase, func(result query.Result) string {
return ds.RawKey(result.Key).Name()
})
if err != nil {
log.Errorf("error while retrieving peers with addresses: %v", err)
}
return ids
}
// AddrStream returns a channel on which all new addresses discovered for a
// given peer ID will be published.
func (ab *dsAddrBook) AddrStream(ctx context.Context, p peer.ID) <-chan ma.Multiaddr {
initial := ab.Addrs(p)
return ab.subsManager.AddrStream(ctx, p, initial)
}
// ClearAddrs will delete all known addresses for a peer ID.
func (ab *dsAddrBook) ClearAddrs(p peer.ID) {
ab.cache.Remove(p)
key := addrBookBase.ChildString(b32.RawStdEncoding.EncodeToString([]byte(p)))
if err := ab.ds.Delete(context.TODO(), key); err != nil {
log.Errorf("failed to clear addresses for peer %s: %v", p.Pretty(), err)
}
}
func (ab *dsAddrBook) setAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration, mode ttlWriteMode, signed bool) (err error) {
if len(addrs) == 0 {
return nil
}
pr, err := ab.loadRecord(p, true, false)
if err != nil {
return fmt.Errorf("failed to load peerstore entry for peer %v while setting addrs, err: %v", p, err)
}
pr.Lock()
defer pr.Unlock()
// // if we have a signed PeerRecord, ignore attempts to add unsigned addrs
// if !signed && pr.CertifiedRecord != nil {
// return nil
// }
newExp := ab.clock.Now().Add(ttl).Unix()
addrsMap := make(map[string]*pb.AddrBookRecord_AddrEntry, len(pr.Addrs))
for _, addr := range pr.Addrs {
addrsMap[string(addr.Addr)] = addr
}
updateExisting := func(incoming ma.Multiaddr) *pb.AddrBookRecord_AddrEntry {
existingEntry := addrsMap[string(incoming.Bytes())]
if existingEntry == nil {
return nil
}
switch mode {
case ttlOverride:
existingEntry.Ttl = int64(ttl)
existingEntry.Expiry = newExp
case ttlExtend:
if int64(ttl) > existingEntry.Ttl {
existingEntry.Ttl = int64(ttl)
}
if newExp > existingEntry.Expiry {
existingEntry.Expiry = newExp
}
default:
panic("BUG: unimplemented ttl mode")
}
return existingEntry
}
var entries []*pb.AddrBookRecord_AddrEntry
for _, incoming := range addrs {
existingEntry := updateExisting(incoming)
if existingEntry == nil {
// if signed {
// entries = append(entries, existingEntry)
// }
// } else {
// new addr, add & broadcast
entry := &pb.AddrBookRecord_AddrEntry{
Addr: incoming.Bytes(),
Ttl: int64(ttl),
Expiry: newExp,
}
entries = append(entries, entry)
// note: there's a minor chance that writing the record will fail, in which case we would've broadcast
// the addresses without persisting them. This is very unlikely and not much of an issue.
ab.subsManager.BroadcastAddr(p, incoming)
}
}
// if signed {
// // when adding signed addrs, we want to keep _only_ the incoming addrs
// pr.Addrs = entries
// } else {
pr.Addrs = append(pr.Addrs, entries...)
// }
pr.dirty = true
pr.clean(ab.clock.Now())
return pr.flush(ab.ds)
}
// deletes addresses in place, avoiding copies until we encounter the first deletion.
// does not preserve order, but entries are re-sorted before flushing to disk anyway.
func deleteInPlace(s []*pb.AddrBookRecord_AddrEntry, addrs []ma.Multiaddr) []*pb.AddrBookRecord_AddrEntry {
if s == nil || len(addrs) == 0 {
return s
}
survived := len(s)
Outer:
for i, addr := range s {
for _, del := range addrs {
if !bytes.Equal(del.Bytes(), addr.Addr) {
continue
}
survived--
// if there are no survivors, bail out
if survived == 0 {
break Outer
}
s[i] = s[survived]
// we've already dealt with s[i], move to the next
continue Outer
}
}
return s[:survived]
}
func (ab *dsAddrBook) deleteAddrs(p peer.ID, addrs []ma.Multiaddr) (err error) {
pr, err := ab.loadRecord(p, false, false)
if err != nil {
return fmt.Errorf("failed to load peerstore entry for peer %v while deleting addrs, err: %v", p, err)
}
pr.Lock()
defer pr.Unlock()
if pr.Addrs == nil {
return nil
}
pr.Addrs = deleteInPlace(pr.Addrs, addrs)
pr.dirty = true
pr.clean(ab.clock.Now())
return pr.flush(ab.ds)
}
func cleanAddrs(addrs []ma.Multiaddr, pid peer.ID) []ma.Multiaddr {
clean := make([]ma.Multiaddr, 0, len(addrs))
for _, addr := range addrs {
// Remove suffix of /p2p/peer-id from address
addr, addrPid := peer.SplitAddr(addr)
if addr == nil {
log.Warnw("Was passed a nil multiaddr", "peer", pid)
continue
}
if addrPid != "" && addrPid != pid {
log.Warnf("Was passed p2p address with a different peerId. found: %s, expected: %s", addrPid, pid)
continue
}
clean = append(clean, addr)
}
return clean
}

View File

@@ -0,0 +1,408 @@
package pstoreds
import (
"context"
"fmt"
"strconv"
"time"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoreds/pb"
"google.golang.org/protobuf/proto"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
b32 "github.com/multiformats/go-base32"
)
var (
// GC lookahead entries are stored in key pattern:
// /peers/gc/addrs/<unix timestamp of next visit>/<peer ID b32> => nil
// in databases with lexicographical key order, this time-indexing allows us to visit
// only the timeslice we are interested in.
gcLookaheadBase = ds.NewKey("/peers/gc/addrs")
// queries
purgeLookaheadQuery = query.Query{
Prefix: gcLookaheadBase.String(),
Orders: []query.Order{query.OrderByFunction(orderByTimestampInKey)},
KeysOnly: true,
}
purgeStoreQuery = query.Query{
Prefix: addrBookBase.String(),
Orders: []query.Order{query.OrderByKey{}},
KeysOnly: false,
}
populateLookaheadQuery = query.Query{
Prefix: addrBookBase.String(),
Orders: []query.Order{query.OrderByKey{}},
KeysOnly: true,
}
)
// dsAddrBookGc is responsible for garbage collection in a datastore-backed address book.
type dsAddrBookGc struct {
ctx context.Context
ab *dsAddrBook
running chan struct{}
lookaheadEnabled bool
purgeFunc func()
currWindowEnd int64
}
func newAddressBookGc(ctx context.Context, ab *dsAddrBook) (*dsAddrBookGc, error) {
if ab.opts.GCPurgeInterval < 0 {
return nil, fmt.Errorf("negative GC purge interval provided: %s", ab.opts.GCPurgeInterval)
}
if ab.opts.GCLookaheadInterval < 0 {
return nil, fmt.Errorf("negative GC lookahead interval provided: %s", ab.opts.GCLookaheadInterval)
}
if ab.opts.GCInitialDelay < 0 {
return nil, fmt.Errorf("negative GC initial delay provided: %s", ab.opts.GCInitialDelay)
}
if ab.opts.GCLookaheadInterval > 0 && ab.opts.GCLookaheadInterval < ab.opts.GCPurgeInterval {
return nil, fmt.Errorf("lookahead interval must be larger than purge interval, respectively: %s, %s",
ab.opts.GCLookaheadInterval, ab.opts.GCPurgeInterval)
}
lookaheadEnabled := ab.opts.GCLookaheadInterval > 0
gc := &dsAddrBookGc{
ctx: ctx,
ab: ab,
running: make(chan struct{}, 1),
lookaheadEnabled: lookaheadEnabled,
}
if lookaheadEnabled {
gc.purgeFunc = gc.purgeLookahead
} else {
gc.purgeFunc = gc.purgeStore
}
// do not start GC timers if purge is disabled; this GC can only be triggered manually.
if ab.opts.GCPurgeInterval > 0 {
gc.ab.childrenDone.Add(1)
go gc.background()
}
return gc, nil
}
// gc prunes expired addresses from the datastore at regular intervals. It should be spawned as a goroutine.
func (gc *dsAddrBookGc) background() {
defer gc.ab.childrenDone.Done()
select {
case <-gc.ab.clock.After(gc.ab.opts.GCInitialDelay):
case <-gc.ab.ctx.Done():
// yield if we have been cancelled/closed before the delay elapses.
return
}
purgeTimer := time.NewTicker(gc.ab.opts.GCPurgeInterval)
defer purgeTimer.Stop()
var lookaheadCh <-chan time.Time
if gc.lookaheadEnabled {
lookaheadTimer := time.NewTicker(gc.ab.opts.GCLookaheadInterval)
lookaheadCh = lookaheadTimer.C
gc.populateLookahead() // do a lookahead now
defer lookaheadTimer.Stop()
}
for {
select {
case <-purgeTimer.C:
gc.purgeFunc()
case <-lookaheadCh:
// will never trigger if lookahead is disabled (nil Duration).
gc.populateLookahead()
case <-gc.ctx.Done():
return
}
}
}
// purgeCycle runs a single GC purge cycle. It operates within the lookahead window if lookahead is enabled; else it
// visits all entries in the datastore, deleting the addresses that have expired.
func (gc *dsAddrBookGc) purgeLookahead() {
select {
case gc.running <- struct{}{}:
defer func() { <-gc.running }()
default:
// yield if lookahead is running.
return
}
var id peer.ID
record := &addrsRecord{AddrBookRecord: &pb.AddrBookRecord{}} // empty record to reuse and avoid allocs.
batch, err := newCyclicBatch(gc.ab.ds, defaultOpsPerCyclicBatch)
if err != nil {
log.Warnf("failed while creating batch to purge GC entries: %v", err)
}
// This function drops an unparseable GC entry; this is for safety. It is an escape hatch in case
// we modify the format of keys going forward. If a user runs a new version against an old DB,
// if we don't clean up unparseable entries we'll end up accumulating garbage.
dropInError := func(key ds.Key, err error, msg string) {
if err != nil {
log.Warnf("failed while %s record with GC key: %v, err: %v; deleting", msg, key, err)
}
if err = batch.Delete(context.TODO(), key); err != nil {
log.Warnf("failed to delete corrupt GC lookahead entry: %v, err: %v", key, err)
}
}
// This function drops a GC key if the entry is cleaned correctly. It may reschedule another visit
// if the next earliest expiry falls within the current window again.
dropOrReschedule := func(key ds.Key, ar *addrsRecord) {
if err := batch.Delete(context.TODO(), key); err != nil {
log.Warnf("failed to delete lookahead entry: %v, err: %v", key, err)
}
// re-add the record if it needs to be visited again in this window.
if len(ar.Addrs) != 0 && ar.Addrs[0].Expiry <= gc.currWindowEnd {
gcKey := gcLookaheadBase.ChildString(fmt.Sprintf("%d/%s", ar.Addrs[0].Expiry, key.Name()))
if err := batch.Put(context.TODO(), gcKey, []byte{}); err != nil {
log.Warnf("failed to add new GC key: %v, err: %v", gcKey, err)
}
}
}
results, err := gc.ab.ds.Query(context.TODO(), purgeLookaheadQuery)
if err != nil {
log.Warnf("failed while fetching entries to purge: %v", err)
return
}
defer results.Close()
now := gc.ab.clock.Now().Unix()
// keys: /peers/gc/addrs/<unix timestamp of next visit>/<peer ID b32>
// values: nil
for result := range results.Next() {
gcKey := ds.RawKey(result.Key)
ts, err := strconv.ParseInt(gcKey.Parent().Name(), 10, 64)
if err != nil {
dropInError(gcKey, err, "parsing timestamp")
log.Warnf("failed while parsing timestamp from key: %v, err: %v", result.Key, err)
continue
} else if ts > now {
// this is an ordered cursor; when we hit an entry with a timestamp beyond now, we can break.
break
}
idb32, err := b32.RawStdEncoding.DecodeString(gcKey.Name())
if err != nil {
dropInError(gcKey, err, "parsing peer ID")
log.Warnf("failed while parsing b32 peer ID from key: %v, err: %v", result.Key, err)
continue
}
id, err = peer.IDFromBytes(idb32)
if err != nil {
dropInError(gcKey, err, "decoding peer ID")
log.Warnf("failed while decoding peer ID from key: %v, err: %v", result.Key, err)
continue
}
// if the record is in cache, we clean it and flush it if necessary.
if cached, ok := gc.ab.cache.Peek(id); ok {
cached.Lock()
if cached.clean(gc.ab.clock.Now()) {
if err = cached.flush(batch); err != nil {
log.Warnf("failed to flush entry modified by GC for peer: &v, err: %v", id.Pretty(), err)
}
}
dropOrReschedule(gcKey, cached)
cached.Unlock()
continue
}
record.Reset()
// otherwise, fetch it from the store, clean it and flush it.
entryKey := addrBookBase.ChildString(gcKey.Name())
val, err := gc.ab.ds.Get(context.TODO(), entryKey)
if err != nil {
// captures all errors, including ErrNotFound.
dropInError(gcKey, err, "fetching entry")
continue
}
err = proto.Unmarshal(val, record)
if err != nil {
dropInError(gcKey, err, "unmarshalling entry")
continue
}
if record.clean(gc.ab.clock.Now()) {
err = record.flush(batch)
if err != nil {
log.Warnf("failed to flush entry modified by GC for peer: &v, err: %v", id.Pretty(), err)
}
}
dropOrReschedule(gcKey, record)
}
if err = batch.Commit(context.TODO()); err != nil {
log.Warnf("failed to commit GC purge batch: %v", err)
}
}
func (gc *dsAddrBookGc) purgeStore() {
select {
case gc.running <- struct{}{}:
defer func() { <-gc.running }()
default:
// yield if lookahead is running.
return
}
record := &addrsRecord{AddrBookRecord: &pb.AddrBookRecord{}} // empty record to reuse and avoid allocs.
batch, err := newCyclicBatch(gc.ab.ds, defaultOpsPerCyclicBatch)
if err != nil {
log.Warnf("failed while creating batch to purge GC entries: %v", err)
}
results, err := gc.ab.ds.Query(context.TODO(), purgeStoreQuery)
if err != nil {
log.Warnf("failed while opening iterator: %v", err)
return
}
defer results.Close()
// keys: /peers/addrs/<peer ID b32>
for result := range results.Next() {
record.Reset()
if err = proto.Unmarshal(result.Value, record); err != nil {
// TODO log
continue
}
id := record.Id
if !record.clean(gc.ab.clock.Now()) {
continue
}
if err := record.flush(batch); err != nil {
log.Warnf("failed to flush entry modified by GC for peer: &v, err: %v", id, err)
}
gc.ab.cache.Remove(peer.ID(id))
}
if err = batch.Commit(context.TODO()); err != nil {
log.Warnf("failed to commit GC purge batch: %v", err)
}
}
// populateLookahead populates the lookahead window by scanning the entire store and picking entries whose earliest
// expiration falls within the window period.
//
// Those entries are stored in the lookahead region in the store, indexed by the timestamp when they need to be
// visited, to facilitate temporal range scans.
func (gc *dsAddrBookGc) populateLookahead() {
if gc.ab.opts.GCLookaheadInterval == 0 {
return
}
select {
case gc.running <- struct{}{}:
defer func() { <-gc.running }()
default:
// yield if something's running.
return
}
until := gc.ab.clock.Now().Add(gc.ab.opts.GCLookaheadInterval).Unix()
var id peer.ID
record := &addrsRecord{AddrBookRecord: &pb.AddrBookRecord{}}
results, err := gc.ab.ds.Query(context.TODO(), populateLookaheadQuery)
if err != nil {
log.Warnf("failed while querying to populate lookahead GC window: %v", err)
return
}
defer results.Close()
batch, err := newCyclicBatch(gc.ab.ds, defaultOpsPerCyclicBatch)
if err != nil {
log.Warnf("failed while creating batch to populate lookahead GC window: %v", err)
return
}
for result := range results.Next() {
idb32 := ds.RawKey(result.Key).Name()
k, err := b32.RawStdEncoding.DecodeString(idb32)
if err != nil {
log.Warnf("failed while decoding peer ID from key: %v, err: %v", result.Key, err)
continue
}
if id, err = peer.IDFromBytes(k); err != nil {
log.Warnf("failed while decoding peer ID from key: %v, err: %v", result.Key, err)
}
// if the record is in cache, use the cached version.
if cached, ok := gc.ab.cache.Peek(id); ok {
cached.RLock()
if len(cached.Addrs) == 0 || cached.Addrs[0].Expiry > until {
cached.RUnlock()
continue
}
gcKey := gcLookaheadBase.ChildString(fmt.Sprintf("%d/%s", cached.Addrs[0].Expiry, idb32))
if err = batch.Put(context.TODO(), gcKey, []byte{}); err != nil {
log.Warnf("failed while inserting GC entry for peer: %v, err: %v", id.Pretty(), err)
}
cached.RUnlock()
continue
}
record.Reset()
val, err := gc.ab.ds.Get(context.TODO(), ds.RawKey(result.Key))
if err != nil {
log.Warnf("failed which getting record from store for peer: %v, err: %v", id.Pretty(), err)
continue
}
if err := proto.Unmarshal(val, record); err != nil {
log.Warnf("failed while unmarshalling record from store for peer: %v, err: %v", id.Pretty(), err)
continue
}
if len(record.Addrs) > 0 && record.Addrs[0].Expiry <= until {
gcKey := gcLookaheadBase.ChildString(fmt.Sprintf("%d/%s", record.Addrs[0].Expiry, idb32))
if err = batch.Put(context.TODO(), gcKey, []byte{}); err != nil {
log.Warnf("failed while inserting GC entry for peer: %v, err: %v", id.Pretty(), err)
}
}
}
if err = batch.Commit(context.TODO()); err != nil {
log.Warnf("failed to commit GC lookahead batch: %v", err)
}
gc.currWindowEnd = until
}
// orderByTimestampInKey orders the results by comparing the timestamp in the
// key. A lexiographic sort by itself is wrong since "10" is less than "2", but
// as an int 2 is obviously less than 10.
func orderByTimestampInKey(a, b query.Entry) int {
aKey := ds.RawKey(a.Key)
aInt, err := strconv.ParseInt(aKey.Parent().Name(), 10, 64)
if err != nil {
return -1
}
bKey := ds.RawKey(b.Key)
bInt, err := strconv.ParseInt(bKey.Parent().Name(), 10, 64)
if err != nil {
return -1
}
if aInt < bInt {
return -1
} else if aInt == bInt {
return 0
}
return 1
}

View File

@@ -0,0 +1,264 @@
package pstoreds
import (
"context"
"testing"
"time"
pstore "github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/p2p/host/peerstore/test"
mockClock "github.com/benbjohnson/clock"
"github.com/ipfs/go-datastore/query"
ma "github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"
)
var lookaheadQuery = query.Query{Prefix: gcLookaheadBase.String(), KeysOnly: true}
type testProbe struct {
t *testing.T
ab pstore.AddrBook
}
func (tp *testProbe) countLookaheadEntries() (i int) {
results, err := tp.ab.(*dsAddrBook).ds.Query(context.Background(), lookaheadQuery)
if err != nil {
tp.t.Fatal(err)
}
defer results.Close()
for range results.Next() {
i++
}
return i
}
func (tp *testProbe) clearCache() {
for _, k := range tp.ab.(*dsAddrBook).cache.Keys() {
tp.ab.(*dsAddrBook).cache.Remove(k)
}
}
func TestGCLookahead(t *testing.T) {
opts := DefaultOpts()
// effectively disable automatic GC for this test.
opts.GCInitialDelay = 90 * time.Hour
opts.GCLookaheadInterval = 10 * time.Second
opts.GCPurgeInterval = 1 * time.Second
factory := addressBookFactory(t, leveldbStore, opts)
ab, closeFn := factory()
gc := ab.(*dsAddrBook).gc
defer closeFn()
tp := &testProbe{t, ab}
ids := test.GeneratePeerIDs(10)
addrs := test.GenerateAddrs(100)
// lookahead is 10 seconds, so these entries will be outside the lookahead window.
ab.AddAddrs(ids[0], addrs[:10], time.Hour)
ab.AddAddrs(ids[1], addrs[10:20], time.Hour)
ab.AddAddrs(ids[2], addrs[20:30], time.Hour)
gc.populateLookahead()
if i := tp.countLookaheadEntries(); i != 0 {
t.Errorf("expected no GC lookahead entries, got: %v", i)
}
// change addresses of a peer to have TTL 1 second, placing them in the lookahead window.
ab.UpdateAddrs(ids[1], time.Hour, time.Second)
// Purge the cache, to exercise a different path in the lookahead cycle.
tp.clearCache()
gc.populateLookahead()
if i := tp.countLookaheadEntries(); i != 1 {
t.Errorf("expected 1 GC lookahead entry, got: %v", i)
}
// change addresses of another to have TTL 5 second, placing them in the lookahead window.
ab.UpdateAddrs(ids[2], time.Hour, 5*time.Second)
gc.populateLookahead()
if i := tp.countLookaheadEntries(); i != 2 {
t.Errorf("expected 2 GC lookahead entries, got: %v", i)
}
}
func TestGCPurging(t *testing.T) {
opts := DefaultOpts()
// effectively disable automatic GC for this test.
opts.GCInitialDelay = 90 * time.Hour
opts.GCLookaheadInterval = 20 * time.Second
opts.GCPurgeInterval = 1 * time.Second
clk := mockClock.NewMock()
opts.Clock = clk
factory := addressBookFactory(t, leveldbStore, opts)
ab, closeFn := factory()
gc := ab.(*dsAddrBook).gc
defer closeFn()
tp := &testProbe{t, ab}
ids := test.GeneratePeerIDs(10)
addrs := test.GenerateAddrs(100)
// stagger addresses within the lookahead window, but stagger them.
ab.AddAddrs(ids[0], addrs[:10], 1*time.Second)
ab.AddAddrs(ids[1], addrs[30:40], 1*time.Second)
ab.AddAddrs(ids[2], addrs[60:70], 1*time.Second)
ab.AddAddrs(ids[0], addrs[10:20], 4*time.Second)
ab.AddAddrs(ids[1], addrs[40:50], 4*time.Second)
ab.AddAddrs(ids[0], addrs[20:30], 10*time.Second)
ab.AddAddrs(ids[1], addrs[50:60], 10*time.Second)
// this is inside the window, but it will survive the purges we do in the test.
ab.AddAddrs(ids[3], addrs[70:80], 15*time.Second)
gc.populateLookahead()
if i := tp.countLookaheadEntries(); i != 4 {
t.Errorf("expected 4 GC lookahead entries, got: %v", i)
}
clk.Add(2 * time.Second)
gc.purgeLookahead()
if i := tp.countLookaheadEntries(); i != 3 {
t.Errorf("expected 3 GC lookahead entries, got: %v", i)
}
// Purge the cache, to exercise a different path in the purge cycle.
tp.clearCache()
clk.Add(5 * time.Second)
gc.purgeLookahead()
if i := tp.countLookaheadEntries(); i != 3 {
t.Errorf("expected 3 GC lookahead entries, got: %v", i)
}
clk.Add(5 * time.Second)
gc.purgeLookahead()
if i := tp.countLookaheadEntries(); i != 1 {
t.Errorf("expected 1 GC lookahead entries, got: %v", i)
}
if i := len(ab.PeersWithAddrs()); i != 1 {
t.Errorf("expected 1 entries in database, got: %v", i)
}
if p := ab.PeersWithAddrs()[0]; p != ids[3] {
t.Errorf("expected remaining peer to be #3, got: %v, expected: %v", p, ids[3])
}
}
func TestGCDelay(t *testing.T) {
ids := test.GeneratePeerIDs(10)
addrs := test.GenerateAddrs(100)
clk := mockClock.NewMock()
opts := DefaultOpts()
opts.GCInitialDelay = 2 * time.Second
opts.GCLookaheadInterval = 1 * time.Minute
opts.GCPurgeInterval = 30 * time.Second
opts.Clock = clk
factory := addressBookFactory(t, leveldbStore, opts)
ab, closeFn := factory()
defer closeFn()
// give the background Go routine some time to start
time.Sleep(100 * time.Millisecond)
tp := &testProbe{t, ab}
ab.AddAddrs(ids[0], addrs, 1*time.Second)
// immediately after we should be having no lookahead entries.
if i := tp.countLookaheadEntries(); i != 0 {
t.Fatalf("expected no lookahead entries, got: %d", i)
}
// after the initial delay has passed.
clk.Add(3 * time.Second)
require.Eventually(t, func() bool { return tp.countLookaheadEntries() == 1 }, 3000*time.Millisecond, 10*time.Millisecond, "expected 1 lookahead entry")
}
func TestGCLookaheadDisabled(t *testing.T) {
ids := test.GeneratePeerIDs(10)
addrs := test.GenerateAddrs(100)
opts := DefaultOpts()
// effectively disable automatic GC for this test.
opts.GCInitialDelay = 90 * time.Hour
opts.GCLookaheadInterval = 0 // disable lookahead
opts.GCPurgeInterval = 9 * time.Hour
clk := mockClock.NewMock()
opts.Clock = clk
factory := addressBookFactory(t, leveldbStore, opts)
ab, closeFn := factory()
defer closeFn()
tp := &testProbe{t, ab}
// four peers:
// ids[0] has 10 addresses, all of which expire in 500ms.
// ids[1] has 20 addresses; 50% expire in 500ms and 50% in 10 hours.
// ids[2] has 10 addresses; all expire in 10 hours.
// ids[3] has 60 addresses; all expire in 10 hours.
ab.AddAddrs(ids[0], addrs[:10], 500*time.Millisecond)
ab.AddAddrs(ids[1], addrs[10:20], 500*time.Millisecond)
ab.AddAddrs(ids[1], addrs[20:30], 10*time.Hour)
ab.AddAddrs(ids[2], addrs[30:40], 10*time.Hour)
ab.AddAddrs(ids[3], addrs[40:], 10*time.Hour)
clk.Add(100 * time.Millisecond)
if i := tp.countLookaheadEntries(); i != 0 {
t.Errorf("expected no GC lookahead entries, got: %v", i)
}
clk.Add(500 * time.Millisecond)
gc := ab.(*dsAddrBook).gc
gc.purgeFunc()
var empty []ma.Multiaddr
test.AssertAddressesEqual(t, empty, ab.Addrs(ids[0]))
test.AssertAddressesEqual(t, addrs[20:30], ab.Addrs(ids[1]))
test.AssertAddressesEqual(t, addrs[30:40], ab.Addrs(ids[2]))
test.AssertAddressesEqual(t, addrs[40:], ab.Addrs(ids[3]))
}
func BenchmarkLookaheadCycle(b *testing.B) {
ids := test.GeneratePeerIDs(100)
addrs := test.GenerateAddrs(100)
opts := DefaultOpts()
opts.GCInitialDelay = 2 * time.Hour
opts.GCLookaheadInterval = 2 * time.Hour
opts.GCPurgeInterval = 6 * time.Hour
factory := addressBookFactory(b, leveldbStore, opts)
ab, closeFn := factory()
defer closeFn()
inside, outside := 1*time.Minute, 48*time.Hour
for i, id := range ids {
var ttl time.Duration
if i%2 == 0 {
ttl = inside
} else {
ttl = outside
}
ab.AddAddrs(id, addrs, ttl)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
ab.(*dsAddrBook).gc.populateLookahead()
}
}

View File

@@ -0,0 +1,40 @@
package pstoreds
// cache abstracts all methods we access from ARCCache, to enable alternate
// implementations such as a no-op one.
type cache[K comparable, V any] interface {
Get(key K) (value V, ok bool)
Add(key K, value V)
Remove(key K)
Contains(key K) bool
Peek(key K) (value V, ok bool)
Keys() []K
}
// noopCache is a dummy implementation that's used when the cache is disabled.
type noopCache[K comparable, V any] struct {
}
var _ cache[int, int] = (*noopCache[int, int])(nil)
func (*noopCache[K, V]) Get(key K) (value V, ok bool) {
return *new(V), false
}
func (*noopCache[K, V]) Add(key K, value V) {
}
func (*noopCache[K, V]) Remove(key K) {
}
func (*noopCache[K, V]) Contains(key K) bool {
return false
}
func (*noopCache[K, V]) Peek(key K) (value V, ok bool) {
return *new(V), false
}
func (*noopCache[K, V]) Keys() (keys []K) {
return keys
}

View File

@@ -0,0 +1,78 @@
package pstoreds
import (
"context"
"errors"
"fmt"
ds "github.com/ipfs/go-datastore"
)
// how many operations are queued in a cyclic batch before we flush it.
var defaultOpsPerCyclicBatch = 20
// cyclicBatch buffers ds write operations and automatically flushes them after defaultOpsPerCyclicBatch (20) have been
// queued. An explicit `Commit()` closes this cyclic batch, erroring all further operations.
//
// It is similar to go-ds autobatch, but it's driven by an actual Batch facility offered by the
// ds.
type cyclicBatch struct {
threshold int
ds.Batch
ds ds.Batching
pending int
}
func newCyclicBatch(ds ds.Batching, threshold int) (ds.Batch, error) {
batch, err := ds.Batch(context.TODO())
if err != nil {
return nil, err
}
return &cyclicBatch{Batch: batch, ds: ds}, nil
}
func (cb *cyclicBatch) cycle() (err error) {
if cb.Batch == nil {
return errors.New("cyclic batch is closed")
}
if cb.pending < cb.threshold {
// we haven't reached the threshold yet.
return nil
}
// commit and renew the batch.
if err = cb.Batch.Commit(context.TODO()); err != nil {
return fmt.Errorf("failed while committing cyclic batch: %w", err)
}
if cb.Batch, err = cb.ds.Batch(context.TODO()); err != nil {
return fmt.Errorf("failed while renewing cyclic batch: %w", err)
}
return nil
}
func (cb *cyclicBatch) Put(ctx context.Context, key ds.Key, val []byte) error {
if err := cb.cycle(); err != nil {
return err
}
cb.pending++
return cb.Batch.Put(ctx, key, val)
}
func (cb *cyclicBatch) Delete(ctx context.Context, key ds.Key) error {
if err := cb.cycle(); err != nil {
return err
}
cb.pending++
return cb.Batch.Delete(ctx, key)
}
func (cb *cyclicBatch) Commit(ctx context.Context) error {
if cb.Batch == nil {
return errors.New("cyclic batch is closed")
}
if err := cb.Batch.Commit(ctx); err != nil {
return err
}
cb.pending = 0
cb.Batch = nil
return nil
}

View File

@@ -0,0 +1,173 @@
package pstoreds
import (
"context"
"os"
"testing"
"time"
pstore "github.com/libp2p/go-libp2p/core/peerstore"
pt "github.com/libp2p/go-libp2p/p2p/host/peerstore/test"
mockClock "github.com/benbjohnson/clock"
ds "github.com/ipfs/go-datastore"
badger "github.com/ipfs/go-ds-badger"
leveldb "github.com/ipfs/go-ds-leveldb"
"github.com/stretchr/testify/require"
)
type datastoreFactory func(tb testing.TB) (ds.Batching, func())
var dstores = map[string]datastoreFactory{
// "Badger": badgerStore,
"Leveldb": leveldbStore,
}
func TestDsPeerstore(t *testing.T) {
for name, dsFactory := range dstores {
t.Run(name, func(t *testing.T) {
pt.TestPeerstore(t, peerstoreFactory(t, dsFactory, DefaultOpts()))
})
t.Run("protobook limits", func(t *testing.T) {
const limit = 10
opts := DefaultOpts()
opts.MaxProtocols = limit
ds, close := dsFactory(t)
defer close()
ps, err := NewPeerstore(context.Background(), ds, opts)
require.NoError(t, err)
defer ps.Close()
pt.TestPeerstoreProtoStoreLimits(t, ps, limit)
})
}
}
func TestDsAddrBook(t *testing.T) {
for name, dsFactory := range dstores {
t.Run(name+" Cacheful", func(t *testing.T) {
opts := DefaultOpts()
opts.GCPurgeInterval = 1 * time.Second
opts.CacheSize = 1024
clk := mockClock.NewMock()
opts.Clock = clk
pt.TestAddrBook(t, addressBookFactory(t, dsFactory, opts), clk)
})
t.Run(name+" Cacheless", func(t *testing.T) {
opts := DefaultOpts()
opts.GCPurgeInterval = 1 * time.Second
opts.CacheSize = 0
clk := mockClock.NewMock()
opts.Clock = clk
pt.TestAddrBook(t, addressBookFactory(t, dsFactory, opts), clk)
})
}
}
func TestDsKeyBook(t *testing.T) {
for name, dsFactory := range dstores {
t.Run(name, func(t *testing.T) {
pt.TestKeyBook(t, keyBookFactory(t, dsFactory, DefaultOpts()))
})
}
}
func BenchmarkDsKeyBook(b *testing.B) {
for name, dsFactory := range dstores {
b.Run(name, func(b *testing.B) {
pt.BenchmarkKeyBook(b, keyBookFactory(b, dsFactory, DefaultOpts()))
})
}
}
func BenchmarkDsPeerstore(b *testing.B) {
caching := DefaultOpts()
caching.CacheSize = 1024
cacheless := DefaultOpts()
cacheless.CacheSize = 0
for name, dsFactory := range dstores {
b.Run(name, func(b *testing.B) {
pt.BenchmarkPeerstore(b, peerstoreFactory(b, dsFactory, caching), "Caching")
})
b.Run(name, func(b *testing.B) {
pt.BenchmarkPeerstore(b, peerstoreFactory(b, dsFactory, cacheless), "Cacheless")
})
}
}
// Doesn't work on 32bit because badger.
//
//lint:ignore U1000 disabled for now
func badgerStore(tb testing.TB) (ds.Batching, func()) {
dataPath, err := os.MkdirTemp(os.TempDir(), "badger")
if err != nil {
tb.Fatal(err)
}
store, err := badger.NewDatastore(dataPath, nil)
if err != nil {
tb.Fatal(err)
}
closer := func() {
store.Close()
os.RemoveAll(dataPath)
}
return store, closer
}
func leveldbStore(tb testing.TB) (ds.Batching, func()) {
// Intentionally test in-memory because disks suck, especially in CI.
store, err := leveldb.NewDatastore("", nil)
if err != nil {
tb.Fatal(err)
}
closer := func() {
store.Close()
}
return store, closer
}
func peerstoreFactory(tb testing.TB, storeFactory datastoreFactory, opts Options) pt.PeerstoreFactory {
return func() (pstore.Peerstore, func()) {
store, storeCloseFn := storeFactory(tb)
ps, err := NewPeerstore(context.Background(), store, opts)
if err != nil {
tb.Fatal(err)
}
closer := func() {
ps.Close()
storeCloseFn()
}
return ps, closer
}
}
func addressBookFactory(tb testing.TB, storeFactory datastoreFactory, opts Options) pt.AddrBookFactory {
return func() (pstore.AddrBook, func()) {
store, closeFunc := storeFactory(tb)
ab, err := NewAddrBook(context.Background(), store, opts)
if err != nil {
tb.Fatal(err)
}
closer := func() {
ab.Close()
closeFunc()
}
return ab, closer
}
}
func keyBookFactory(tb testing.TB, storeFactory datastoreFactory, opts Options) pt.KeyBookFactory {
return func() (pstore.KeyBook, func()) {
store, storeCloseFn := storeFactory(tb)
kb, err := NewKeyBook(context.Background(), store, opts)
if err != nil {
tb.Fatal(err)
}
return kb, storeCloseFn
}
}

View File

@@ -0,0 +1,136 @@
package pstoreds
import (
"context"
"errors"
ic "github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
pstore "github.com/libp2p/go-libp2p/core/peerstore"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
"github.com/multiformats/go-base32"
)
// Public and private keys are stored under the following db key pattern:
// /peers/keys/<b32 peer id no padding>/{pub, priv}
var (
kbBase = ds.NewKey("/peers/keys")
pubSuffix = ds.NewKey("/pub")
privSuffix = ds.NewKey("/priv")
)
type dsKeyBook struct {
ds ds.Datastore
}
var _ pstore.KeyBook = (*dsKeyBook)(nil)
func NewKeyBook(_ context.Context, store ds.Datastore, _ Options) (*dsKeyBook, error) {
return &dsKeyBook{store}, nil
}
func (kb *dsKeyBook) PubKey(p peer.ID) ic.PubKey {
key := peerToKey(p, pubSuffix)
var pk ic.PubKey
if value, err := kb.ds.Get(context.TODO(), key); err == nil {
pk, err = ic.UnmarshalPublicKey(value)
if err != nil {
log.Errorf("error when unmarshalling pubkey from datastore for peer %s: %s\n", p.Pretty(), err)
}
} else if err == ds.ErrNotFound {
pk, err = p.ExtractPublicKey()
switch err {
case nil:
case peer.ErrNoPublicKey:
return nil
default:
log.Errorf("error when extracting pubkey from peer ID for peer %s: %s\n", p.Pretty(), err)
return nil
}
pkb, err := ic.MarshalPublicKey(pk)
if err != nil {
log.Errorf("error when turning extracted pubkey into bytes for peer %s: %s\n", p.Pretty(), err)
return nil
}
if err := kb.ds.Put(context.TODO(), key, pkb); err != nil {
log.Errorf("error when adding extracted pubkey to peerstore for peer %s: %s\n", p.Pretty(), err)
return nil
}
} else {
log.Errorf("error when fetching pubkey from datastore for peer %s: %s\n", p.Pretty(), err)
}
return pk
}
func (kb *dsKeyBook) AddPubKey(p peer.ID, pk ic.PubKey) error {
// check it's correct.
if !p.MatchesPublicKey(pk) {
return errors.New("peer ID does not match public key")
}
val, err := ic.MarshalPublicKey(pk)
if err != nil {
log.Errorf("error while converting pubkey byte string for peer %s: %s\n", p.Pretty(), err)
return err
}
if err := kb.ds.Put(context.TODO(), peerToKey(p, pubSuffix), val); err != nil {
log.Errorf("error while updating pubkey in datastore for peer %s: %s\n", p.Pretty(), err)
return err
}
return nil
}
func (kb *dsKeyBook) PrivKey(p peer.ID) ic.PrivKey {
value, err := kb.ds.Get(context.TODO(), peerToKey(p, privSuffix))
if err != nil {
return nil
}
sk, err := ic.UnmarshalPrivateKey(value)
if err != nil {
return nil
}
return sk
}
func (kb *dsKeyBook) AddPrivKey(p peer.ID, sk ic.PrivKey) error {
if sk == nil {
return errors.New("private key is nil")
}
// check it's correct.
if !p.MatchesPrivateKey(sk) {
return errors.New("peer ID does not match private key")
}
val, err := ic.MarshalPrivateKey(sk)
if err != nil {
log.Errorf("error while converting privkey byte string for peer %s: %s\n", p.Pretty(), err)
return err
}
if err := kb.ds.Put(context.TODO(), peerToKey(p, privSuffix), val); err != nil {
log.Errorf("error while updating privkey in datastore for peer %s: %s\n", p.Pretty(), err)
}
return err
}
func (kb *dsKeyBook) PeersWithKeys() peer.IDSlice {
ids, err := uniquePeerIds(kb.ds, kbBase, func(result query.Result) string {
return ds.RawKey(result.Key).Parent().Name()
})
if err != nil {
log.Errorf("error while retrieving peers with keys: %v", err)
}
return ids
}
func (kb *dsKeyBook) RemovePeer(p peer.ID) {
kb.ds.Delete(context.TODO(), peerToKey(p, privSuffix))
kb.ds.Delete(context.TODO(), peerToKey(p, pubSuffix))
}
func peerToKey(p peer.ID, suffix ds.Key) ds.Key {
return kbBase.ChildString(base32.RawStdEncoding.EncodeToString([]byte(p))).Child(suffix)
}

View File

@@ -0,0 +1,82 @@
package pstoreds
import (
"bytes"
"context"
"encoding/gob"
pool "github.com/libp2p/go-buffer-pool"
"github.com/libp2p/go-libp2p/core/peer"
pstore "github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/protocol"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
"github.com/multiformats/go-base32"
)
// Metadata is stored under the following db key pattern:
// /peers/metadata/<b32 peer id no padding>/<key>
var pmBase = ds.NewKey("/peers/metadata")
type dsPeerMetadata struct {
ds ds.Datastore
}
var _ pstore.PeerMetadata = (*dsPeerMetadata)(nil)
func init() {
// Gob registers basic types by default.
//
// Register complex types used by the peerstore itself.
gob.Register(make(map[protocol.ID]struct{}))
}
// NewPeerMetadata creates a metadata store backed by a persistent db. It uses gob for serialisation.
//
// See `init()` to learn which types are registered by default. Modules wishing to store
// values of other types will need to `gob.Register()` them explicitly, or else callers
// will receive runtime errors.
func NewPeerMetadata(_ context.Context, store ds.Datastore, _ Options) (*dsPeerMetadata, error) {
return &dsPeerMetadata{store}, nil
}
func (pm *dsPeerMetadata) Get(p peer.ID, key string) (interface{}, error) {
k := pmBase.ChildString(base32.RawStdEncoding.EncodeToString([]byte(p))).ChildString(key)
value, err := pm.ds.Get(context.TODO(), k)
if err != nil {
if err == ds.ErrNotFound {
err = pstore.ErrNotFound
}
return nil, err
}
var res interface{}
if err := gob.NewDecoder(bytes.NewReader(value)).Decode(&res); err != nil {
return nil, err
}
return res, nil
}
func (pm *dsPeerMetadata) Put(p peer.ID, key string, val interface{}) error {
k := pmBase.ChildString(base32.RawStdEncoding.EncodeToString([]byte(p))).ChildString(key)
var buf pool.Buffer
if err := gob.NewEncoder(&buf).Encode(&val); err != nil {
return err
}
return pm.ds.Put(context.TODO(), k, buf.Bytes())
}
func (pm *dsPeerMetadata) RemovePeer(p peer.ID) {
result, err := pm.ds.Query(context.TODO(), query.Query{
Prefix: pmBase.ChildString(base32.RawStdEncoding.EncodeToString([]byte(p))).String(),
KeysOnly: true,
})
if err != nil {
log.Warnw("querying datastore when removing peer failed", "peer", p, "error", err)
return
}
for entry := range result.Next() {
pm.ds.Delete(context.TODO(), ds.NewKey(entry.Key))
}
}

View File

@@ -0,0 +1,331 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.30.0
// protoc v3.21.12
// source: pb/pstore.proto
package pb
import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
// AddrBookRecord represents a record for a peer in the address book.
type AddrBookRecord struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
// The peer ID.
Id []byte `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
// The multiaddresses. This is a sorted list where element 0 expires the soonest.
Addrs []*AddrBookRecord_AddrEntry `protobuf:"bytes,2,rep,name=addrs,proto3" json:"addrs,omitempty"`
// The most recently received signed PeerRecord.
CertifiedRecord *AddrBookRecord_CertifiedRecord `protobuf:"bytes,3,opt,name=certified_record,json=certifiedRecord,proto3" json:"certified_record,omitempty"`
}
func (x *AddrBookRecord) Reset() {
*x = AddrBookRecord{}
if protoimpl.UnsafeEnabled {
mi := &file_pb_pstore_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *AddrBookRecord) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*AddrBookRecord) ProtoMessage() {}
func (x *AddrBookRecord) ProtoReflect() protoreflect.Message {
mi := &file_pb_pstore_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use AddrBookRecord.ProtoReflect.Descriptor instead.
func (*AddrBookRecord) Descriptor() ([]byte, []int) {
return file_pb_pstore_proto_rawDescGZIP(), []int{0}
}
func (x *AddrBookRecord) GetId() []byte {
if x != nil {
return x.Id
}
return nil
}
func (x *AddrBookRecord) GetAddrs() []*AddrBookRecord_AddrEntry {
if x != nil {
return x.Addrs
}
return nil
}
func (x *AddrBookRecord) GetCertifiedRecord() *AddrBookRecord_CertifiedRecord {
if x != nil {
return x.CertifiedRecord
}
return nil
}
// AddrEntry represents a single multiaddress.
type AddrBookRecord_AddrEntry struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Addr []byte `protobuf:"bytes,1,opt,name=addr,proto3" json:"addr,omitempty"`
// The point in time when this address expires.
Expiry int64 `protobuf:"varint,2,opt,name=expiry,proto3" json:"expiry,omitempty"`
// The original TTL of this address.
Ttl int64 `protobuf:"varint,3,opt,name=ttl,proto3" json:"ttl,omitempty"`
}
func (x *AddrBookRecord_AddrEntry) Reset() {
*x = AddrBookRecord_AddrEntry{}
if protoimpl.UnsafeEnabled {
mi := &file_pb_pstore_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *AddrBookRecord_AddrEntry) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*AddrBookRecord_AddrEntry) ProtoMessage() {}
func (x *AddrBookRecord_AddrEntry) ProtoReflect() protoreflect.Message {
mi := &file_pb_pstore_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use AddrBookRecord_AddrEntry.ProtoReflect.Descriptor instead.
func (*AddrBookRecord_AddrEntry) Descriptor() ([]byte, []int) {
return file_pb_pstore_proto_rawDescGZIP(), []int{0, 0}
}
func (x *AddrBookRecord_AddrEntry) GetAddr() []byte {
if x != nil {
return x.Addr
}
return nil
}
func (x *AddrBookRecord_AddrEntry) GetExpiry() int64 {
if x != nil {
return x.Expiry
}
return 0
}
func (x *AddrBookRecord_AddrEntry) GetTtl() int64 {
if x != nil {
return x.Ttl
}
return 0
}
// CertifiedRecord contains a serialized signed PeerRecord used to
// populate the signedAddrs list.
type AddrBookRecord_CertifiedRecord struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
// The Seq counter from the signed PeerRecord envelope
Seq uint64 `protobuf:"varint,1,opt,name=seq,proto3" json:"seq,omitempty"`
// The serialized bytes of the SignedEnvelope containing the PeerRecord.
Raw []byte `protobuf:"bytes,2,opt,name=raw,proto3" json:"raw,omitempty"`
}
func (x *AddrBookRecord_CertifiedRecord) Reset() {
*x = AddrBookRecord_CertifiedRecord{}
if protoimpl.UnsafeEnabled {
mi := &file_pb_pstore_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *AddrBookRecord_CertifiedRecord) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*AddrBookRecord_CertifiedRecord) ProtoMessage() {}
func (x *AddrBookRecord_CertifiedRecord) ProtoReflect() protoreflect.Message {
mi := &file_pb_pstore_proto_msgTypes[2]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use AddrBookRecord_CertifiedRecord.ProtoReflect.Descriptor instead.
func (*AddrBookRecord_CertifiedRecord) Descriptor() ([]byte, []int) {
return file_pb_pstore_proto_rawDescGZIP(), []int{0, 1}
}
func (x *AddrBookRecord_CertifiedRecord) GetSeq() uint64 {
if x != nil {
return x.Seq
}
return 0
}
func (x *AddrBookRecord_CertifiedRecord) GetRaw() []byte {
if x != nil {
return x.Raw
}
return nil
}
var File_pb_pstore_proto protoreflect.FileDescriptor
var file_pb_pstore_proto_rawDesc = []byte{
0x0a, 0x0f, 0x70, 0x62, 0x2f, 0x70, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x12, 0x09, 0x70, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x70, 0x62, 0x22, 0xb3, 0x02, 0x0a,
0x0e, 0x41, 0x64, 0x64, 0x72, 0x42, 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x12,
0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x02, 0x69, 0x64, 0x12,
0x39, 0x0a, 0x05, 0x61, 0x64, 0x64, 0x72, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x23,
0x2e, 0x70, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x70, 0x62, 0x2e, 0x41, 0x64, 0x64, 0x72, 0x42,
0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x2e, 0x41, 0x64, 0x64, 0x72, 0x45, 0x6e,
0x74, 0x72, 0x79, 0x52, 0x05, 0x61, 0x64, 0x64, 0x72, 0x73, 0x12, 0x54, 0x0a, 0x10, 0x63, 0x65,
0x72, 0x74, 0x69, 0x66, 0x69, 0x65, 0x64, 0x5f, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x18, 0x03,
0x20, 0x01, 0x28, 0x0b, 0x32, 0x29, 0x2e, 0x70, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x70, 0x62,
0x2e, 0x41, 0x64, 0x64, 0x72, 0x42, 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x2e,
0x43, 0x65, 0x72, 0x74, 0x69, 0x66, 0x69, 0x65, 0x64, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x52,
0x0f, 0x63, 0x65, 0x72, 0x74, 0x69, 0x66, 0x69, 0x65, 0x64, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64,
0x1a, 0x49, 0x0a, 0x09, 0x41, 0x64, 0x64, 0x72, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x12, 0x0a,
0x04, 0x61, 0x64, 0x64, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x61, 0x64, 0x64,
0x72, 0x12, 0x16, 0x0a, 0x06, 0x65, 0x78, 0x70, 0x69, 0x72, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28,
0x03, 0x52, 0x06, 0x65, 0x78, 0x70, 0x69, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x74, 0x74, 0x6c,
0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x03, 0x74, 0x74, 0x6c, 0x1a, 0x35, 0x0a, 0x0f, 0x43,
0x65, 0x72, 0x74, 0x69, 0x66, 0x69, 0x65, 0x64, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x12, 0x10,
0x0a, 0x03, 0x73, 0x65, 0x71, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x03, 0x73, 0x65, 0x71,
0x12, 0x10, 0x0a, 0x03, 0x72, 0x61, 0x77, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x03, 0x72,
0x61, 0x77, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
file_pb_pstore_proto_rawDescOnce sync.Once
file_pb_pstore_proto_rawDescData = file_pb_pstore_proto_rawDesc
)
func file_pb_pstore_proto_rawDescGZIP() []byte {
file_pb_pstore_proto_rawDescOnce.Do(func() {
file_pb_pstore_proto_rawDescData = protoimpl.X.CompressGZIP(file_pb_pstore_proto_rawDescData)
})
return file_pb_pstore_proto_rawDescData
}
var file_pb_pstore_proto_msgTypes = make([]protoimpl.MessageInfo, 3)
var file_pb_pstore_proto_goTypes = []interface{}{
(*AddrBookRecord)(nil), // 0: pstore.pb.AddrBookRecord
(*AddrBookRecord_AddrEntry)(nil), // 1: pstore.pb.AddrBookRecord.AddrEntry
(*AddrBookRecord_CertifiedRecord)(nil), // 2: pstore.pb.AddrBookRecord.CertifiedRecord
}
var file_pb_pstore_proto_depIdxs = []int32{
1, // 0: pstore.pb.AddrBookRecord.addrs:type_name -> pstore.pb.AddrBookRecord.AddrEntry
2, // 1: pstore.pb.AddrBookRecord.certified_record:type_name -> pstore.pb.AddrBookRecord.CertifiedRecord
2, // [2:2] is the sub-list for method output_type
2, // [2:2] is the sub-list for method input_type
2, // [2:2] is the sub-list for extension type_name
2, // [2:2] is the sub-list for extension extendee
0, // [0:2] is the sub-list for field type_name
}
func init() { file_pb_pstore_proto_init() }
func file_pb_pstore_proto_init() {
if File_pb_pstore_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_pb_pstore_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*AddrBookRecord); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_pb_pstore_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*AddrBookRecord_AddrEntry); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_pb_pstore_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*AddrBookRecord_CertifiedRecord); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_pb_pstore_proto_rawDesc,
NumEnums: 0,
NumMessages: 3,
NumExtensions: 0,
NumServices: 0,
},
GoTypes: file_pb_pstore_proto_goTypes,
DependencyIndexes: file_pb_pstore_proto_depIdxs,
MessageInfos: file_pb_pstore_proto_msgTypes,
}.Build()
File_pb_pstore_proto = out.File
file_pb_pstore_proto_rawDesc = nil
file_pb_pstore_proto_goTypes = nil
file_pb_pstore_proto_depIdxs = nil
}

View File

@@ -0,0 +1,35 @@
syntax = "proto3";
package pstore.pb;
// AddrBookRecord represents a record for a peer in the address book.
message AddrBookRecord {
// The peer ID.
bytes id = 1;
// The multiaddresses. This is a sorted list where element 0 expires the soonest.
repeated AddrEntry addrs = 2;
// The most recently received signed PeerRecord.
CertifiedRecord certified_record = 3;
// AddrEntry represents a single multiaddress.
message AddrEntry {
bytes addr = 1;
// The point in time when this address expires.
int64 expiry = 2;
// The original TTL of this address.
int64 ttl = 3;
}
// CertifiedRecord contains a serialized signed PeerRecord used to
// populate the signedAddrs list.
message CertifiedRecord {
// The Seq counter from the signed PeerRecord envelope
uint64 seq = 1;
// The serialized bytes of the SignedEnvelope containing the PeerRecord.
bytes raw = 2;
}
}

View File

@@ -0,0 +1,193 @@
package pstoreds
import (
"context"
"fmt"
"io"
"time"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
pstore "github.com/libp2p/go-libp2p/p2p/host/peerstore"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
"github.com/multiformats/go-base32"
)
//go:generate protoc --proto_path=$PWD:$PWD/../../../.. --go_out=. --go_opt=Mpb/pstore.proto=./pb pb/pstore.proto
// Configuration object for the peerstore.
type Options struct {
// The size of the in-memory cache. A value of 0 or lower disables the cache.
CacheSize uint
// MaxProtocols is the maximum number of protocols we store for one peer.
MaxProtocols int
// Sweep interval to purge expired addresses from the datastore. If this is a zero value, GC will not run
// automatically, but it'll be available on demand via explicit calls.
GCPurgeInterval time.Duration
// Interval to renew the GC lookahead window. If this is a zero value, lookahead will be disabled and we'll
// traverse the entire datastore for every purge cycle.
GCLookaheadInterval time.Duration
// Initial delay before GC processes start. Intended to give the system breathing room to fully boot
// before starting GC.
GCInitialDelay time.Duration
Clock clock
}
// DefaultOpts returns the default options for a persistent peerstore, with the full-purge GC algorithm:
//
// * Cache size: 1024.
// * MaxProtocols: 1024.
// * GC purge interval: 2 hours.
// * GC lookahead interval: disabled.
// * GC initial delay: 60 seconds.
func DefaultOpts() Options {
return Options{
CacheSize: 1024,
MaxProtocols: 1024,
GCPurgeInterval: 2 * time.Hour,
GCLookaheadInterval: 0,
GCInitialDelay: 60 * time.Second,
Clock: realclock{},
}
}
type pstoreds struct {
peerstore.Metrics
*dsKeyBook
*dsAddrBook
*dsProtoBook
*dsPeerMetadata
}
var _ peerstore.Peerstore = &pstoreds{}
// NewPeerstore creates a peerstore backed by the provided persistent datastore.
// It's the caller's responsibility to call RemovePeer to ensure
// that memory consumption of the peerstore doesn't grow unboundedly.
func NewPeerstore(ctx context.Context, store ds.Batching, opts Options) (*pstoreds, error) {
addrBook, err := NewAddrBook(ctx, store, opts)
if err != nil {
return nil, err
}
keyBook, err := NewKeyBook(ctx, store, opts)
if err != nil {
return nil, err
}
peerMetadata, err := NewPeerMetadata(ctx, store, opts)
if err != nil {
return nil, err
}
protoBook, err := NewProtoBook(peerMetadata, WithMaxProtocols(opts.MaxProtocols))
if err != nil {
return nil, err
}
return &pstoreds{
Metrics: pstore.NewMetrics(),
dsKeyBook: keyBook,
dsAddrBook: addrBook,
dsPeerMetadata: peerMetadata,
dsProtoBook: protoBook,
}, nil
}
// uniquePeerIds extracts and returns unique peer IDs from database keys.
func uniquePeerIds(ds ds.Datastore, prefix ds.Key, extractor func(result query.Result) string) (peer.IDSlice, error) {
var (
q = query.Query{Prefix: prefix.String(), KeysOnly: true}
results query.Results
err error
)
if results, err = ds.Query(context.TODO(), q); err != nil {
log.Error(err)
return nil, err
}
defer results.Close()
idset := make(map[string]struct{})
for result := range results.Next() {
k := extractor(result)
idset[k] = struct{}{}
}
if len(idset) == 0 {
return peer.IDSlice{}, nil
}
ids := make(peer.IDSlice, 0, len(idset))
for id := range idset {
pid, _ := base32.RawStdEncoding.DecodeString(id)
id, _ := peer.IDFromBytes(pid)
ids = append(ids, id)
}
return ids, nil
}
func (ps *pstoreds) Close() (err error) {
var errs []error
weakClose := func(name string, c interface{}) {
if cl, ok := c.(io.Closer); ok {
if err = cl.Close(); err != nil {
errs = append(errs, fmt.Errorf("%s error: %s", name, err))
}
}
}
weakClose("keybook", ps.dsKeyBook)
weakClose("addressbook", ps.dsAddrBook)
weakClose("protobook", ps.dsProtoBook)
weakClose("peermetadata", ps.dsPeerMetadata)
if len(errs) > 0 {
return fmt.Errorf("failed while closing peerstore; err(s): %q", errs)
}
return nil
}
func (ps *pstoreds) Peers() peer.IDSlice {
set := map[peer.ID]struct{}{}
for _, p := range ps.PeersWithKeys() {
set[p] = struct{}{}
}
for _, p := range ps.PeersWithAddrs() {
set[p] = struct{}{}
}
pps := make(peer.IDSlice, 0, len(set))
for p := range set {
pps = append(pps, p)
}
return pps
}
func (ps *pstoreds) PeerInfo(p peer.ID) peer.AddrInfo {
return peer.AddrInfo{
ID: p,
Addrs: ps.dsAddrBook.Addrs(p),
}
}
// RemovePeer removes entries associated with a peer from:
// * the KeyBook
// * the ProtoBook
// * the PeerMetadata
// * the Metrics
// It DOES NOT remove the peer from the AddrBook.
func (ps *pstoreds) RemovePeer(p peer.ID) {
ps.dsKeyBook.RemovePeer(p)
ps.dsProtoBook.RemovePeer(p)
ps.dsPeerMetadata.RemovePeer(p)
ps.Metrics.RemovePeer(p)
}

View File

@@ -0,0 +1,196 @@
package pstoreds
import (
"errors"
"fmt"
"sync"
"github.com/libp2p/go-libp2p/core/peer"
pstore "github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/protocol"
)
type protoSegment struct {
sync.RWMutex
}
type protoSegments [256]*protoSegment
func (s *protoSegments) get(p peer.ID) *protoSegment {
return s[byte(p[len(p)-1])]
}
var errTooManyProtocols = errors.New("too many protocols")
type ProtoBookOption func(*dsProtoBook) error
func WithMaxProtocols(num int) ProtoBookOption {
return func(pb *dsProtoBook) error {
pb.maxProtos = num
return nil
}
}
type dsProtoBook struct {
segments protoSegments
meta pstore.PeerMetadata
maxProtos int
}
var _ pstore.ProtoBook = (*dsProtoBook)(nil)
func NewProtoBook(meta pstore.PeerMetadata, opts ...ProtoBookOption) (*dsProtoBook, error) {
pb := &dsProtoBook{
meta: meta,
segments: func() (ret protoSegments) {
for i := range ret {
ret[i] = &protoSegment{}
}
return ret
}(),
maxProtos: 1024,
}
for _, opt := range opts {
if err := opt(pb); err != nil {
return nil, err
}
}
return pb, nil
}
func (pb *dsProtoBook) SetProtocols(p peer.ID, protos ...protocol.ID) error {
if len(protos) > pb.maxProtos {
return errTooManyProtocols
}
protomap := make(map[protocol.ID]struct{}, len(protos))
for _, proto := range protos {
protomap[proto] = struct{}{}
}
s := pb.segments.get(p)
s.Lock()
defer s.Unlock()
return pb.meta.Put(p, "protocols", protomap)
}
func (pb *dsProtoBook) AddProtocols(p peer.ID, protos ...protocol.ID) error {
s := pb.segments.get(p)
s.Lock()
defer s.Unlock()
pmap, err := pb.getProtocolMap(p)
if err != nil {
return err
}
if len(pmap)+len(protos) > pb.maxProtos {
return errTooManyProtocols
}
for _, proto := range protos {
pmap[proto] = struct{}{}
}
return pb.meta.Put(p, "protocols", pmap)
}
func (pb *dsProtoBook) GetProtocols(p peer.ID) ([]protocol.ID, error) {
s := pb.segments.get(p)
s.RLock()
defer s.RUnlock()
pmap, err := pb.getProtocolMap(p)
if err != nil {
return nil, err
}
res := make([]protocol.ID, 0, len(pmap))
for proto := range pmap {
res = append(res, proto)
}
return res, nil
}
func (pb *dsProtoBook) SupportsProtocols(p peer.ID, protos ...protocol.ID) ([]protocol.ID, error) {
s := pb.segments.get(p)
s.RLock()
defer s.RUnlock()
pmap, err := pb.getProtocolMap(p)
if err != nil {
return nil, err
}
res := make([]protocol.ID, 0, len(protos))
for _, proto := range protos {
if _, ok := pmap[proto]; ok {
res = append(res, proto)
}
}
return res, nil
}
func (pb *dsProtoBook) FirstSupportedProtocol(p peer.ID, protos ...protocol.ID) (protocol.ID, error) {
s := pb.segments.get(p)
s.RLock()
defer s.RUnlock()
pmap, err := pb.getProtocolMap(p)
if err != nil {
return "", err
}
for _, proto := range protos {
if _, ok := pmap[proto]; ok {
return proto, nil
}
}
return "", nil
}
func (pb *dsProtoBook) RemoveProtocols(p peer.ID, protos ...protocol.ID) error {
s := pb.segments.get(p)
s.Lock()
defer s.Unlock()
pmap, err := pb.getProtocolMap(p)
if err != nil {
return err
}
if len(pmap) == 0 {
// nothing to do.
return nil
}
for _, proto := range protos {
delete(pmap, proto)
}
return pb.meta.Put(p, "protocols", pmap)
}
func (pb *dsProtoBook) getProtocolMap(p peer.ID) (map[protocol.ID]struct{}, error) {
iprotomap, err := pb.meta.Get(p, "protocols")
switch err {
default:
return nil, err
case pstore.ErrNotFound:
return make(map[protocol.ID]struct{}), nil
case nil:
cast, ok := iprotomap.(map[protocol.ID]struct{})
if !ok {
return nil, fmt.Errorf("stored protocol set was not a map")
}
return cast, nil
}
}
func (pb *dsProtoBook) RemovePeer(p peer.ID) {
pb.meta.RemovePeer(p)
}

View File

@@ -0,0 +1,530 @@
package pstoremem
import (
"context"
"fmt"
"sort"
"sync"
"time"
"github.com/libp2p/go-libp2p/core/peer"
pstore "github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/record"
logging "github.com/ipfs/go-log/v2"
ma "github.com/multiformats/go-multiaddr"
)
var log = logging.Logger("peerstore")
type expiringAddr struct {
Addr ma.Multiaddr
TTL time.Duration
Expires time.Time
}
func (e *expiringAddr) ExpiredBy(t time.Time) bool {
return !t.Before(e.Expires)
}
type peerRecordState struct {
Envelope *record.Envelope
Seq uint64
}
type addrSegments [256]*addrSegment
type addrSegment struct {
sync.RWMutex
// Use pointers to save memory. Maps always leave some fraction of their
// space unused. storing the *values* directly in the map will
// drastically increase the space waste. In our case, by 6x.
addrs map[peer.ID]map[string]*expiringAddr
signedPeerRecords map[peer.ID]*peerRecordState
}
func (segments *addrSegments) get(p peer.ID) *addrSegment {
if len(p) == 0 { // it's not terribly useful to use an empty peer ID, but at least we should not panic
return segments[0]
}
return segments[uint8(p[len(p)-1])]
}
type clock interface {
Now() time.Time
}
type realclock struct{}
func (rc realclock) Now() time.Time {
return time.Now()
}
// memoryAddrBook manages addresses.
type memoryAddrBook struct {
segments addrSegments
refCount sync.WaitGroup
cancel func()
subManager *AddrSubManager
clock clock
}
var _ pstore.AddrBook = (*memoryAddrBook)(nil)
var _ pstore.CertifiedAddrBook = (*memoryAddrBook)(nil)
func NewAddrBook() *memoryAddrBook {
ctx, cancel := context.WithCancel(context.Background())
ab := &memoryAddrBook{
segments: func() (ret addrSegments) {
for i := range ret {
ret[i] = &addrSegment{
addrs: make(map[peer.ID]map[string]*expiringAddr),
signedPeerRecords: make(map[peer.ID]*peerRecordState)}
}
return ret
}(),
subManager: NewAddrSubManager(),
cancel: cancel,
clock: realclock{},
}
ab.refCount.Add(1)
go ab.background(ctx)
return ab
}
type AddrBookOption func(book *memoryAddrBook) error
func WithClock(clock clock) AddrBookOption {
return func(book *memoryAddrBook) error {
book.clock = clock
return nil
}
}
// background periodically schedules a gc
func (mab *memoryAddrBook) background(ctx context.Context) {
defer mab.refCount.Done()
ticker := time.NewTicker(1 * time.Hour)
defer ticker.Stop()
for {
select {
case <-ticker.C:
mab.gc()
case <-ctx.Done():
return
}
}
}
func (mab *memoryAddrBook) Close() error {
mab.cancel()
mab.refCount.Wait()
return nil
}
// gc garbage collects the in-memory address book.
func (mab *memoryAddrBook) gc() {
now := mab.clock.Now()
for _, s := range mab.segments {
s.Lock()
for p, amap := range s.addrs {
for k, addr := range amap {
if addr.ExpiredBy(now) {
delete(amap, k)
}
}
if len(amap) == 0 {
delete(s.addrs, p)
delete(s.signedPeerRecords, p)
}
}
s.Unlock()
}
}
func (mab *memoryAddrBook) PeersWithAddrs() peer.IDSlice {
// deduplicate, since the same peer could have both signed & unsigned addrs
set := make(map[peer.ID]struct{})
for _, s := range mab.segments {
s.RLock()
for pid, amap := range s.addrs {
if len(amap) > 0 {
set[pid] = struct{}{}
}
}
s.RUnlock()
}
peers := make(peer.IDSlice, 0, len(set))
for pid := range set {
peers = append(peers, pid)
}
return peers
}
// AddAddr calls AddAddrs(p, []ma.Multiaddr{addr}, ttl)
func (mab *memoryAddrBook) AddAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration) {
mab.AddAddrs(p, []ma.Multiaddr{addr}, ttl)
}
// AddAddrs gives memoryAddrBook addresses to use, with a given ttl
// (time-to-live), after which the address is no longer valid.
// This function never reduces the TTL or expiration of an address.
func (mab *memoryAddrBook) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
// if we have a valid peer record, ignore unsigned addrs
// peerRec := mab.GetPeerRecord(p)
// if peerRec != nil {
// return
// }
mab.addAddrs(p, addrs, ttl)
}
// ConsumePeerRecord adds addresses from a signed peer.PeerRecord (contained in
// a record.Envelope), which will expire after the given TTL.
// See https://godoc.org/github.com/libp2p/go-libp2p/core/peerstore#CertifiedAddrBook for more details.
func (mab *memoryAddrBook) ConsumePeerRecord(recordEnvelope *record.Envelope, ttl time.Duration) (bool, error) {
r, err := recordEnvelope.Record()
if err != nil {
return false, err
}
rec, ok := r.(*peer.PeerRecord)
if !ok {
return false, fmt.Errorf("unable to process envelope: not a PeerRecord")
}
if !rec.PeerID.MatchesPublicKey(recordEnvelope.PublicKey) {
return false, fmt.Errorf("signing key does not match PeerID in PeerRecord")
}
// ensure seq is greater than, or equal to, the last received
s := mab.segments.get(rec.PeerID)
s.Lock()
defer s.Unlock()
lastState, found := s.signedPeerRecords[rec.PeerID]
if found && lastState.Seq > rec.Seq {
return false, nil
}
s.signedPeerRecords[rec.PeerID] = &peerRecordState{
Envelope: recordEnvelope,
Seq: rec.Seq,
}
mab.addAddrsUnlocked(s, rec.PeerID, rec.Addrs, ttl, true)
return true, nil
}
func (mab *memoryAddrBook) addAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
s := mab.segments.get(p)
s.Lock()
defer s.Unlock()
mab.addAddrsUnlocked(s, p, addrs, ttl, false)
}
func (mab *memoryAddrBook) addAddrsUnlocked(s *addrSegment, p peer.ID, addrs []ma.Multiaddr, ttl time.Duration, signed bool) {
// if ttl is zero, exit. nothing to do.
if ttl <= 0 {
return
}
amap, ok := s.addrs[p]
if !ok {
amap = make(map[string]*expiringAddr)
s.addrs[p] = amap
}
exp := mab.clock.Now().Add(ttl)
for _, addr := range addrs {
// Remove suffix of /p2p/peer-id from address
addr, addrPid := peer.SplitAddr(addr)
if addr == nil {
log.Warnw("Was passed nil multiaddr", "peer", p)
continue
}
if addrPid != "" && addrPid != p {
log.Warnf("Was passed p2p address with a different peerId. found: %s, expected: %s", addrPid, p)
continue
}
// find the highest TTL and Expiry time between
// existing records and function args
a, found := amap[string(addr.Bytes())] // won't allocate.
if !found {
// not found, announce it.
entry := &expiringAddr{Addr: addr, Expires: exp, TTL: ttl}
amap[string(addr.Bytes())] = entry
mab.subManager.BroadcastAddr(p, addr)
} else {
// update ttl & exp to whichever is greater between new and existing entry
if ttl > a.TTL {
a.TTL = ttl
}
if exp.After(a.Expires) {
a.Expires = exp
}
}
}
}
// SetAddr calls mgr.SetAddrs(p, addr, ttl)
func (mab *memoryAddrBook) SetAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration) {
mab.SetAddrs(p, []ma.Multiaddr{addr}, ttl)
}
// SetAddrs sets the ttl on addresses. This clears any TTL there previously.
// This is used when we receive the best estimate of the validity of an address.
func (mab *memoryAddrBook) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
s := mab.segments.get(p)
s.Lock()
defer s.Unlock()
amap, ok := s.addrs[p]
if !ok {
amap = make(map[string]*expiringAddr)
s.addrs[p] = amap
}
exp := mab.clock.Now().Add(ttl)
for _, addr := range addrs {
addr, addrPid := peer.SplitAddr(addr)
if addr == nil {
log.Warnw("was passed nil multiaddr", "peer", p)
continue
}
if addrPid != "" && addrPid != p {
log.Warnf("was passed p2p address with a different peerId, found: %s wanted: %s", addrPid, p)
continue
}
aBytes := addr.Bytes()
key := string(aBytes)
// re-set all of them for new ttl.
if ttl > 0 {
amap[key] = &expiringAddr{Addr: addr, Expires: exp, TTL: ttl}
mab.subManager.BroadcastAddr(p, addr)
} else {
delete(amap, key)
}
}
}
// UpdateAddrs updates the addresses associated with the given peer that have
// the given oldTTL to have the given newTTL.
func (mab *memoryAddrBook) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL time.Duration) {
s := mab.segments.get(p)
s.Lock()
defer s.Unlock()
exp := mab.clock.Now().Add(newTTL)
amap, found := s.addrs[p]
if !found {
return
}
for k, a := range amap {
if oldTTL == a.TTL {
if newTTL == 0 {
delete(amap, k)
} else {
a.TTL = newTTL
a.Expires = exp
amap[k] = a
}
}
}
}
// Addrs returns all known (and valid) addresses for a given peer
func (mab *memoryAddrBook) Addrs(p peer.ID) []ma.Multiaddr {
s := mab.segments.get(p)
s.RLock()
defer s.RUnlock()
return validAddrs(mab.clock.Now(), s.addrs[p])
}
func validAddrs(now time.Time, amap map[string]*expiringAddr) []ma.Multiaddr {
good := make([]ma.Multiaddr, 0, len(amap))
if amap == nil {
return good
}
for _, m := range amap {
if !m.ExpiredBy(now) {
good = append(good, m.Addr)
}
}
return good
}
// GetPeerRecord returns a Envelope containing a PeerRecord for the
// given peer id, if one exists.
// Returns nil if no signed PeerRecord exists for the peer.
func (mab *memoryAddrBook) GetPeerRecord(p peer.ID) *record.Envelope {
s := mab.segments.get(p)
s.RLock()
defer s.RUnlock()
// although the signed record gets garbage collected when all addrs inside it are expired,
// we may be in between the expiration time and the GC interval
// so, we check to see if we have any valid signed addrs before returning the record
if len(validAddrs(mab.clock.Now(), s.addrs[p])) == 0 {
return nil
}
state := s.signedPeerRecords[p]
if state == nil {
return nil
}
return state.Envelope
}
// ClearAddrs removes all previously stored addresses
func (mab *memoryAddrBook) ClearAddrs(p peer.ID) {
s := mab.segments.get(p)
s.Lock()
defer s.Unlock()
delete(s.addrs, p)
delete(s.signedPeerRecords, p)
}
// AddrStream returns a channel on which all new addresses discovered for a
// given peer ID will be published.
func (mab *memoryAddrBook) AddrStream(ctx context.Context, p peer.ID) <-chan ma.Multiaddr {
s := mab.segments.get(p)
s.RLock()
defer s.RUnlock()
baseaddrslice := s.addrs[p]
initial := make([]ma.Multiaddr, 0, len(baseaddrslice))
for _, a := range baseaddrslice {
initial = append(initial, a.Addr)
}
return mab.subManager.AddrStream(ctx, p, initial)
}
type addrSub struct {
pubch chan ma.Multiaddr
ctx context.Context
}
func (s *addrSub) pubAddr(a ma.Multiaddr) {
select {
case s.pubch <- a:
case <-s.ctx.Done():
}
}
// An abstracted, pub-sub manager for address streams. Extracted from
// memoryAddrBook in order to support additional implementations.
type AddrSubManager struct {
mu sync.RWMutex
subs map[peer.ID][]*addrSub
}
// NewAddrSubManager initializes an AddrSubManager.
func NewAddrSubManager() *AddrSubManager {
return &AddrSubManager{
subs: make(map[peer.ID][]*addrSub),
}
}
// Used internally by the address stream coroutine to remove a subscription
// from the manager.
func (mgr *AddrSubManager) removeSub(p peer.ID, s *addrSub) {
mgr.mu.Lock()
defer mgr.mu.Unlock()
subs := mgr.subs[p]
if len(subs) == 1 {
if subs[0] != s {
return
}
delete(mgr.subs, p)
return
}
for i, v := range subs {
if v == s {
subs[i] = subs[len(subs)-1]
subs[len(subs)-1] = nil
mgr.subs[p] = subs[:len(subs)-1]
return
}
}
}
// BroadcastAddr broadcasts a new address to all subscribed streams.
func (mgr *AddrSubManager) BroadcastAddr(p peer.ID, addr ma.Multiaddr) {
mgr.mu.RLock()
defer mgr.mu.RUnlock()
if subs, ok := mgr.subs[p]; ok {
for _, sub := range subs {
sub.pubAddr(addr)
}
}
}
// AddrStream creates a new subscription for a given peer ID, pre-populating the
// channel with any addresses we might already have on file.
func (mgr *AddrSubManager) AddrStream(ctx context.Context, p peer.ID, initial []ma.Multiaddr) <-chan ma.Multiaddr {
sub := &addrSub{pubch: make(chan ma.Multiaddr), ctx: ctx}
out := make(chan ma.Multiaddr)
mgr.mu.Lock()
mgr.subs[p] = append(mgr.subs[p], sub)
mgr.mu.Unlock()
sort.Sort(addrList(initial))
go func(buffer []ma.Multiaddr) {
defer close(out)
sent := make(map[string]struct{}, len(buffer))
for _, a := range buffer {
sent[string(a.Bytes())] = struct{}{}
}
var outch chan ma.Multiaddr
var next ma.Multiaddr
if len(buffer) > 0 {
next = buffer[0]
buffer = buffer[1:]
outch = out
}
for {
select {
case outch <- next:
if len(buffer) > 0 {
next = buffer[0]
buffer = buffer[1:]
} else {
outch = nil
next = nil
}
case naddr := <-sub.pubch:
if _, ok := sent[string(naddr.Bytes())]; ok {
continue
}
sent[string(naddr.Bytes())] = struct{}{}
if next == nil {
next = naddr
outch = out
} else {
buffer = append(buffer, naddr)
}
case <-ctx.Done():
mgr.removeSub(p, sub)
return
}
}
}(initial)
return out
}

View File

@@ -0,0 +1,84 @@
package pstoremem
import (
"testing"
pstore "github.com/libp2p/go-libp2p/core/peerstore"
pt "github.com/libp2p/go-libp2p/p2p/host/peerstore/test"
mockClock "github.com/benbjohnson/clock"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
)
func TestInvalidOption(t *testing.T) {
_, err := NewPeerstore(1337)
require.EqualError(t, err, "unexpected peer store option: 1337")
}
func TestFuzzInMemoryPeerstore(t *testing.T) {
// Just create and close a bunch of peerstores. If this leaks, we'll
// catch it in the leak check below.
for i := 0; i < 100; i++ {
ps, err := NewPeerstore()
require.NoError(t, err)
ps.Close()
}
}
func TestInMemoryPeerstore(t *testing.T) {
pt.TestPeerstore(t, func() (pstore.Peerstore, func()) {
ps, err := NewPeerstore()
require.NoError(t, err)
return ps, func() { ps.Close() }
})
}
func TestPeerstoreProtoStoreLimits(t *testing.T) {
const limit = 10
ps, err := NewPeerstore(WithMaxProtocols(limit))
require.NoError(t, err)
defer ps.Close()
pt.TestPeerstoreProtoStoreLimits(t, ps, limit)
}
func TestInMemoryAddrBook(t *testing.T) {
clk := mockClock.NewMock()
pt.TestAddrBook(t, func() (pstore.AddrBook, func()) {
ps, err := NewPeerstore(WithClock(clk))
require.NoError(t, err)
return ps, func() { ps.Close() }
}, clk)
}
func TestInMemoryKeyBook(t *testing.T) {
pt.TestKeyBook(t, func() (pstore.KeyBook, func()) {
ps, err := NewPeerstore()
require.NoError(t, err)
return ps, func() { ps.Close() }
})
}
func BenchmarkInMemoryPeerstore(b *testing.B) {
pt.BenchmarkPeerstore(b, func() (pstore.Peerstore, func()) {
ps, err := NewPeerstore()
require.NoError(b, err)
return ps, func() { ps.Close() }
}, "InMem")
}
func BenchmarkInMemoryKeyBook(b *testing.B) {
pt.BenchmarkKeyBook(b, func() (pstore.KeyBook, func()) {
ps, err := NewPeerstore()
require.NoError(b, err)
return ps, func() { ps.Close() }
})
}
func TestMain(m *testing.M) {
goleak.VerifyTestMain(
m,
goleak.IgnoreTopFunction("github.com/ipfs/go-log/v2/writer.(*MirrorWriter).logRoutine"),
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
)
}

View File

@@ -0,0 +1,97 @@
package pstoremem
import (
"errors"
"sync"
ic "github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
pstore "github.com/libp2p/go-libp2p/core/peerstore"
)
type memoryKeyBook struct {
sync.RWMutex // same lock. wont happen a ton.
pks map[peer.ID]ic.PubKey
sks map[peer.ID]ic.PrivKey
}
var _ pstore.KeyBook = (*memoryKeyBook)(nil)
func NewKeyBook() *memoryKeyBook {
return &memoryKeyBook{
pks: map[peer.ID]ic.PubKey{},
sks: map[peer.ID]ic.PrivKey{},
}
}
func (mkb *memoryKeyBook) PeersWithKeys() peer.IDSlice {
mkb.RLock()
ps := make(peer.IDSlice, 0, len(mkb.pks)+len(mkb.sks))
for p := range mkb.pks {
ps = append(ps, p)
}
for p := range mkb.sks {
if _, found := mkb.pks[p]; !found {
ps = append(ps, p)
}
}
mkb.RUnlock()
return ps
}
func (mkb *memoryKeyBook) PubKey(p peer.ID) ic.PubKey {
mkb.RLock()
pk := mkb.pks[p]
mkb.RUnlock()
if pk != nil {
return pk
}
pk, err := p.ExtractPublicKey()
if err == nil {
mkb.Lock()
mkb.pks[p] = pk
mkb.Unlock()
}
return pk
}
func (mkb *memoryKeyBook) AddPubKey(p peer.ID, pk ic.PubKey) error {
// check it's correct first
if !p.MatchesPublicKey(pk) {
return errors.New("ID does not match PublicKey")
}
mkb.Lock()
mkb.pks[p] = pk
mkb.Unlock()
return nil
}
func (mkb *memoryKeyBook) PrivKey(p peer.ID) ic.PrivKey {
mkb.RLock()
defer mkb.RUnlock()
return mkb.sks[p]
}
func (mkb *memoryKeyBook) AddPrivKey(p peer.ID, sk ic.PrivKey) error {
if sk == nil {
return errors.New("sk is nil (PrivKey)")
}
// check it's correct first
if !p.MatchesPrivateKey(sk) {
return errors.New("ID does not match PrivateKey")
}
mkb.Lock()
mkb.sks[p] = sk
mkb.Unlock()
return nil
}
func (mkb *memoryKeyBook) RemovePeer(p peer.ID) {
mkb.Lock()
delete(mkb.sks, p)
delete(mkb.pks, p)
mkb.Unlock()
}

View File

@@ -0,0 +1,54 @@
package pstoremem
import (
"sync"
"github.com/libp2p/go-libp2p/core/peer"
pstore "github.com/libp2p/go-libp2p/core/peerstore"
)
type memoryPeerMetadata struct {
// store other data, like versions
ds map[peer.ID]map[string]interface{}
dslock sync.RWMutex
}
var _ pstore.PeerMetadata = (*memoryPeerMetadata)(nil)
func NewPeerMetadata() *memoryPeerMetadata {
return &memoryPeerMetadata{
ds: make(map[peer.ID]map[string]interface{}),
}
}
func (ps *memoryPeerMetadata) Put(p peer.ID, key string, val interface{}) error {
ps.dslock.Lock()
defer ps.dslock.Unlock()
m, ok := ps.ds[p]
if !ok {
m = make(map[string]interface{})
ps.ds[p] = m
}
m[key] = val
return nil
}
func (ps *memoryPeerMetadata) Get(p peer.ID, key string) (interface{}, error) {
ps.dslock.RLock()
defer ps.dslock.RUnlock()
m, ok := ps.ds[p]
if !ok {
return nil, pstore.ErrNotFound
}
val, ok := m[key]
if !ok {
return nil, pstore.ErrNotFound
}
return val, nil
}
func (ps *memoryPeerMetadata) RemovePeer(p peer.ID) {
ps.dslock.Lock()
delete(ps.ds, p)
ps.dslock.Unlock()
}

View File

@@ -0,0 +1,114 @@
package pstoremem
import (
"fmt"
"io"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
pstore "github.com/libp2p/go-libp2p/p2p/host/peerstore"
)
type pstoremem struct {
peerstore.Metrics
*memoryKeyBook
*memoryAddrBook
*memoryProtoBook
*memoryPeerMetadata
}
var _ peerstore.Peerstore = &pstoremem{}
type Option interface{}
// NewPeerstore creates an in-memory thread-safe collection of peers.
// It's the caller's responsibility to call RemovePeer to ensure
// that memory consumption of the peerstore doesn't grow unboundedly.
func NewPeerstore(opts ...Option) (ps *pstoremem, err error) {
ab := NewAddrBook()
defer func() {
if err != nil {
ab.Close()
}
}()
var protoBookOpts []ProtoBookOption
for _, opt := range opts {
switch o := opt.(type) {
case ProtoBookOption:
protoBookOpts = append(protoBookOpts, o)
case AddrBookOption:
o(ab)
default:
return nil, fmt.Errorf("unexpected peer store option: %v", o)
}
}
pb, err := NewProtoBook(protoBookOpts...)
if err != nil {
return nil, err
}
return &pstoremem{
Metrics: pstore.NewMetrics(),
memoryKeyBook: NewKeyBook(),
memoryAddrBook: ab,
memoryProtoBook: pb,
memoryPeerMetadata: NewPeerMetadata(),
}, nil
}
func (ps *pstoremem) Close() (err error) {
var errs []error
weakClose := func(name string, c interface{}) {
if cl, ok := c.(io.Closer); ok {
if err = cl.Close(); err != nil {
errs = append(errs, fmt.Errorf("%s error: %s", name, err))
}
}
}
weakClose("keybook", ps.memoryKeyBook)
weakClose("addressbook", ps.memoryAddrBook)
weakClose("protobook", ps.memoryProtoBook)
weakClose("peermetadata", ps.memoryPeerMetadata)
if len(errs) > 0 {
return fmt.Errorf("failed while closing peerstore; err(s): %q", errs)
}
return nil
}
func (ps *pstoremem) Peers() peer.IDSlice {
set := map[peer.ID]struct{}{}
for _, p := range ps.PeersWithKeys() {
set[p] = struct{}{}
}
for _, p := range ps.PeersWithAddrs() {
set[p] = struct{}{}
}
pps := make(peer.IDSlice, 0, len(set))
for p := range set {
pps = append(pps, p)
}
return pps
}
func (ps *pstoremem) PeerInfo(p peer.ID) peer.AddrInfo {
return peer.AddrInfo{
ID: p,
Addrs: ps.memoryAddrBook.Addrs(p),
}
}
// RemovePeer removes entries associated with a peer from:
// * the KeyBook
// * the ProtoBook
// * the PeerMetadata
// * the Metrics
// It DOES NOT remove the peer from the AddrBook.
func (ps *pstoremem) RemovePeer(p peer.ID) {
ps.memoryKeyBook.RemovePeer(p)
ps.memoryProtoBook.RemovePeer(p)
ps.memoryPeerMetadata.RemovePeer(p)
ps.Metrics.RemovePeer(p)
}

View File

@@ -0,0 +1,192 @@
package pstoremem
import (
"errors"
"sync"
"github.com/libp2p/go-libp2p/core/peer"
pstore "github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/protocol"
)
type protoSegment struct {
sync.RWMutex
protocols map[peer.ID]map[protocol.ID]struct{}
}
type protoSegments [256]*protoSegment
func (s *protoSegments) get(p peer.ID) *protoSegment {
return s[byte(p[len(p)-1])]
}
var errTooManyProtocols = errors.New("too many protocols")
type memoryProtoBook struct {
segments protoSegments
maxProtos int
lk sync.RWMutex
interned map[protocol.ID]protocol.ID
}
var _ pstore.ProtoBook = (*memoryProtoBook)(nil)
type ProtoBookOption func(book *memoryProtoBook) error
func WithMaxProtocols(num int) ProtoBookOption {
return func(pb *memoryProtoBook) error {
pb.maxProtos = num
return nil
}
}
func NewProtoBook(opts ...ProtoBookOption) (*memoryProtoBook, error) {
pb := &memoryProtoBook{
interned: make(map[protocol.ID]protocol.ID, 256),
segments: func() (ret protoSegments) {
for i := range ret {
ret[i] = &protoSegment{
protocols: make(map[peer.ID]map[protocol.ID]struct{}),
}
}
return ret
}(),
maxProtos: 1024,
}
for _, opt := range opts {
if err := opt(pb); err != nil {
return nil, err
}
}
return pb, nil
}
func (pb *memoryProtoBook) internProtocol(proto protocol.ID) protocol.ID {
// check if it is interned with the read lock
pb.lk.RLock()
interned, ok := pb.interned[proto]
pb.lk.RUnlock()
if ok {
return interned
}
// intern with the write lock
pb.lk.Lock()
defer pb.lk.Unlock()
// check again in case it got interned in between locks
interned, ok = pb.interned[proto]
if ok {
return interned
}
pb.interned[proto] = proto
return proto
}
func (pb *memoryProtoBook) SetProtocols(p peer.ID, protos ...protocol.ID) error {
if len(protos) > pb.maxProtos {
return errTooManyProtocols
}
newprotos := make(map[protocol.ID]struct{}, len(protos))
for _, proto := range protos {
newprotos[pb.internProtocol(proto)] = struct{}{}
}
s := pb.segments.get(p)
s.Lock()
s.protocols[p] = newprotos
s.Unlock()
return nil
}
func (pb *memoryProtoBook) AddProtocols(p peer.ID, protos ...protocol.ID) error {
s := pb.segments.get(p)
s.Lock()
defer s.Unlock()
protomap, ok := s.protocols[p]
if !ok {
protomap = make(map[protocol.ID]struct{})
s.protocols[p] = protomap
}
if len(protomap)+len(protos) > pb.maxProtos {
return errTooManyProtocols
}
for _, proto := range protos {
protomap[pb.internProtocol(proto)] = struct{}{}
}
return nil
}
func (pb *memoryProtoBook) GetProtocols(p peer.ID) ([]protocol.ID, error) {
s := pb.segments.get(p)
s.RLock()
defer s.RUnlock()
out := make([]protocol.ID, 0, len(s.protocols[p]))
for k := range s.protocols[p] {
out = append(out, k)
}
return out, nil
}
func (pb *memoryProtoBook) RemoveProtocols(p peer.ID, protos ...protocol.ID) error {
s := pb.segments.get(p)
s.Lock()
defer s.Unlock()
protomap, ok := s.protocols[p]
if !ok {
// nothing to remove.
return nil
}
for _, proto := range protos {
delete(protomap, pb.internProtocol(proto))
}
return nil
}
func (pb *memoryProtoBook) SupportsProtocols(p peer.ID, protos ...protocol.ID) ([]protocol.ID, error) {
s := pb.segments.get(p)
s.RLock()
defer s.RUnlock()
out := make([]protocol.ID, 0, len(protos))
for _, proto := range protos {
if _, ok := s.protocols[p][proto]; ok {
out = append(out, proto)
}
}
return out, nil
}
func (pb *memoryProtoBook) FirstSupportedProtocol(p peer.ID, protos ...protocol.ID) (protocol.ID, error) {
s := pb.segments.get(p)
s.RLock()
defer s.RUnlock()
for _, proto := range protos {
if _, ok := s.protocols[p][proto]; ok {
return proto, nil
}
}
return "", nil
}
func (pb *memoryProtoBook) RemovePeer(p peer.ID) {
s := pb.segments.get(p)
s.Lock()
delete(s.protocols, p)
s.Unlock()
}

View File

@@ -0,0 +1,50 @@
package pstoremem
import (
"bytes"
ma "github.com/multiformats/go-multiaddr"
mafmt "github.com/multiformats/go-multiaddr-fmt"
manet "github.com/multiformats/go-multiaddr/net"
)
func isFDCostlyTransport(a ma.Multiaddr) bool {
return mafmt.TCP.Matches(a)
}
type addrList []ma.Multiaddr
func (al addrList) Len() int { return len(al) }
func (al addrList) Swap(i, j int) { al[i], al[j] = al[j], al[i] }
func (al addrList) Less(i, j int) bool {
a := al[i]
b := al[j]
// dial localhost addresses next, they should fail immediately
lba := manet.IsIPLoopback(a)
lbb := manet.IsIPLoopback(b)
if lba && !lbb {
return true
}
// dial utp and similar 'non-fd-consuming' addresses first
fda := isFDCostlyTransport(a)
fdb := isFDCostlyTransport(b)
if !fda {
return fdb
}
// if 'b' doesnt take a file descriptor
if !fdb {
return false
}
// if 'b' is loopback and both take file descriptors
if lbb {
return false
}
// for the rest, just sort by bytes
return bytes.Compare(a.Bytes(), b.Bytes()) > 0
}

View File

@@ -0,0 +1,20 @@
package pstoremem
import (
"sort"
"testing"
ma "github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"
)
func TestAddressSorting(t *testing.T) {
u1 := ma.StringCast("/ip4/152.12.23.53/udp/1234/utp")
u2l := ma.StringCast("/ip4/127.0.0.1/udp/1234/utp")
local := ma.StringCast("/ip4/127.0.0.1/tcp/1234")
norm := ma.StringCast("/ip4/6.5.4.3/tcp/1234")
l := addrList{local, u1, u2l, norm}
sort.Sort(l)
require.Equal(t, l, addrList{u2l, u1, local, norm})
}

View File

@@ -0,0 +1,9 @@
# pkg/engine/transport
This is an implementation of typical network handling features, a listener,
which has an `Accept` method that returns a channel that will pick up a new
inbound connection. (todo: is there a proper interface with such a method?)
(answer to todo: plans afoot to make it a standard net.Listener with the minor
caveat that the `Addr` function only allows one address in return and we support
multiple bound addresses.)

View File

@@ -232,6 +232,23 @@ func (l *Listener) ProtocolsAvailable() (p protocols.NetworkProtocols) {
return
}
// TODO: the standard Listener interface really should be implemented here. The Conn already embeds its relevant interface.
// func (l *Listener) Accept() (net.Conn, error) {
// // TODO implement me
// panic("implement me")
// }
//
// func (l *Listener) Close() error {
// // TODO implement me
// panic("implement me")
// }
//
// todo: this is not quite compatible with a listener having multiple bindings, and returning the zero would make no sense. First one only?
// func (l *Listener) Addr() net.Addr {
// // TODO implement me
// panic("implement me")
// }
// Accept waits on inbound connections and hands them out to listeners on the
// returned channel.
func (l *Listener) Accept() <-chan *Conn { return l.newConns }
@@ -420,6 +437,7 @@ func NewListener(rendezvous, multiAddr []string, storePath string,
if store == nil {
return nil, errors.New("could not open database")
}
interrupt.AddHandler(closer)
var st peerstore.Peerstore
st, e = pstoreds.NewPeerstore(ctx, store, pstoreds.DefaultOpts())
if c.Host, e = libp2p.New(

View File

@@ -64,7 +64,6 @@ func NewFrom(b slice.Bytes) (splicer *Splice) {
}
func (s *Splice) AddrPort(a *netip.AddrPort) *Splice {
log.T.S("addrPort", a)
if a == nil {
log.D.Ln("addrport is NIL! (maybe Listener is not yet initialized?)")
s.Advance(AddrLen, "nil Addresses")