added fetch of metadata and relay lists, and private flag to stop spider fetching
This commit is contained in:
@@ -37,8 +37,9 @@ type C struct {
|
||||
Pprof bool `env:"ORLY_PPROF" default:"false" usage:"enable pprof on 127.0.0.1:6060"`
|
||||
AuthRequired bool `env:"ORLY_AUTH_REQUIRED" default:"false" usage:"require authentication for all requests"`
|
||||
PublicReadable bool `env:"ORLY_PUBLIC_READABLE" default:"true" usage:"allow public read access to regardless of whether the client is authed"`
|
||||
SpiderSeeds []string `env:"ORLY_SPIDER_SEEDS" usage:"seeds to use for the spider (relays that are looked up initially to find owner relay lists) (comma separated)" default:"wss://relay.nostr.band/,wss://relay.damus.io/,wss://nostr.wine/,wss://nostr.land/,wss://theforest.nostr1.com"`
|
||||
SpiderSeeds []string `env:"ORLY_SPIDER_SEEDS" usage:"seeds to use for the spider (relays that are looked up initially to find owner relay lists) (comma separated)" default:"wss://relay.nostr.band/,wss://relay.damus.io/,wss://nostr.wine/,wss://nostr.land/,wss://theforest.nostr1.com/"`
|
||||
Owners []string `env:"ORLY_OWNERS" usage:"list of users whose follow lists designate whitelisted users who can publish events, and who can read if public readable is false (comma separated)"`
|
||||
Private bool `env:"ORLY_PRIVATE" usage:"do not spider for user metadata because the relay is private and this would leak relay memberships" default:"false"`
|
||||
}
|
||||
|
||||
// New creates and initializes a new configuration object for the relay
|
||||
|
||||
@@ -4,9 +4,9 @@ import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"orly.dev/pkg/encoders/event"
|
||||
"orly.dev/pkg/encoders/filter"
|
||||
"orly.dev/pkg/encoders/kind"
|
||||
"orly.dev/pkg/encoders/kinds"
|
||||
"orly.dev/pkg/encoders/tag"
|
||||
"orly.dev/pkg/encoders/tags"
|
||||
@@ -32,21 +32,22 @@ import (
|
||||
//
|
||||
// # Expected Behaviour
|
||||
//
|
||||
// - For ephemeral events, the method does not store them and returns
|
||||
// - For ephemeral events, the method doesn't store them and returns
|
||||
// immediately.
|
||||
//
|
||||
// - For replaceable events, it first queries for existing similar events,
|
||||
// deletes older ones, and then stores the new event.
|
||||
//
|
||||
// - For parameterized replaceable events, it performs a similar process but uses additional tags to identify duplicates.
|
||||
// - For parameterized replaceable events, it performs a similar process but
|
||||
// uses additional tags to identify duplicates.
|
||||
func (s *Server) Publish(c context.T, evt *event.E) (err error) {
|
||||
sto := s.relay.Storage()
|
||||
if evt.Kind.IsEphemeral() {
|
||||
// do not store ephemeral events
|
||||
// don't store ephemeral events
|
||||
return nil
|
||||
|
||||
} else if evt.Kind.IsReplaceable() {
|
||||
// replaceable event, delete before storing
|
||||
// replaceable event, delete old after storing
|
||||
var evs []*event.E
|
||||
f := filter.New()
|
||||
f.Authors = tag.New(evt.Pubkey)
|
||||
@@ -66,13 +67,63 @@ func (s *Server) Publish(c context.T, evt *event.E) (err error) {
|
||||
"maybe replace %s with %s", ev.Serialize(), evt.Serialize(),
|
||||
)
|
||||
if ev.CreatedAt.Int() > evt.CreatedAt.Int() {
|
||||
return errorf.W(string(normalize.Invalid.F("not replacing newer replaceable event")))
|
||||
return errorf.W(
|
||||
string(
|
||||
normalize.Invalid.F(
|
||||
"not replacing newer replaceable event",
|
||||
),
|
||||
),
|
||||
)
|
||||
}
|
||||
// not deleting these events because some clients are retarded
|
||||
// and the query will pull the new one, but a backup can recover
|
||||
// the data of old ones
|
||||
if ev.Kind.IsDirectoryEvent() {
|
||||
del = false
|
||||
if evt.Kind.Equal(kind.FollowList) {
|
||||
// if the event is from someone on ownersFollowed or
|
||||
// followedFollows, for now add to this list so they're
|
||||
// immediately effective.
|
||||
var isFollowed bool
|
||||
ownersFollowed := s.OwnersFollowed()
|
||||
for _, pk := range ownersFollowed {
|
||||
if bytes.Equal(evt.Pubkey, pk) {
|
||||
isFollowed = true
|
||||
}
|
||||
}
|
||||
if isFollowed {
|
||||
if _, _, err = sto.SaveEvent(
|
||||
c, evt,
|
||||
); err != nil && !errors.Is(
|
||||
err, store.ErrDupEvent,
|
||||
) {
|
||||
return
|
||||
}
|
||||
// we need to trigger the spider with no fetch
|
||||
if err = s.Spider(true); chk.E(err) {
|
||||
err = nil
|
||||
}
|
||||
// event has been saved and lists updated.
|
||||
return
|
||||
}
|
||||
|
||||
}
|
||||
if evt.Kind.Equal(kind.MuteList) {
|
||||
// check if this is one of the owners, if so, the mute list
|
||||
// should be applied immediately.
|
||||
owners := s.OwnersPubkeys()
|
||||
for _, pk := range owners {
|
||||
if bytes.Equal(evt.Pubkey, pk) {
|
||||
if _, _, err = sto.SaveEvent(
|
||||
c, evt,
|
||||
); err != nil && !errors.Is(
|
||||
err, store.ErrDupEvent,
|
||||
) {
|
||||
return
|
||||
}
|
||||
// we need to trigger the spider with no fetch
|
||||
if err = s.Spider(true); chk.E(err) {
|
||||
err = nil
|
||||
}
|
||||
// event has been saved and lists updated.
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
// defer the delete until after the save, further down, has
|
||||
// completed.
|
||||
|
||||
@@ -184,11 +184,29 @@ func (s *Server) Start(
|
||||
) (err error) {
|
||||
if len(s.C.Owners) > 0 {
|
||||
// start up spider
|
||||
if err = s.Spider(); chk.E(err) {
|
||||
if err = s.Spider(s.C.Private); chk.E(err) {
|
||||
// there wasn't any owners, or they couldn't be found on the spider
|
||||
// seeds.
|
||||
err = nil
|
||||
}
|
||||
}
|
||||
// start up a spider run to trigger every 30 minutes
|
||||
ticker := time.NewTicker(30 * time.Minute)
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
if err = s.Spider(s.C.Private); chk.E(err) {
|
||||
// there wasn't any owners, or they couldn't be found on the spider
|
||||
// seeds.
|
||||
err = nil
|
||||
}
|
||||
case <-s.Ctx.Done():
|
||||
log.I.F("stopping spider ticker")
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
addr := net.JoinHostPort(host, strconv.Itoa(port))
|
||||
log.I.F("starting relay listener at %s", addr)
|
||||
ln, err := net.Listen("tcp", addr)
|
||||
@@ -230,8 +248,10 @@ func (s *Server) Shutdown() {
|
||||
s.Cancel()
|
||||
log.W.Ln("closing event store")
|
||||
chk.E(s.relay.Storage().Close())
|
||||
log.W.Ln("shutting down relay listener")
|
||||
chk.E(s.httpServer.Shutdown(s.Ctx))
|
||||
if s.httpServer != nil {
|
||||
log.W.Ln("shutting down relay listener")
|
||||
chk.E(s.httpServer.Shutdown(s.Ctx))
|
||||
}
|
||||
if f, ok := s.relay.(relay.ShutdownAware); ok {
|
||||
f.OnShutdown(s.Ctx)
|
||||
}
|
||||
|
||||
@@ -16,35 +16,67 @@ import (
|
||||
)
|
||||
|
||||
func (s *Server) SpiderFetch(
|
||||
k *kind.T, pubkeys ...[]byte,
|
||||
k *kind.T, noFetch bool, pubkeys ...[]byte,
|
||||
) (pks [][]byte, err error) {
|
||||
// first search the local database
|
||||
pkList := tag.New(pubkeys...)
|
||||
f := &filter.F{
|
||||
Kinds: kinds.New(k),
|
||||
Authors: pkList,
|
||||
}
|
||||
// first search the local database
|
||||
var evs event.S
|
||||
if evs, err = s.Storage().QueryEvents(s.Ctx, f); chk.E(err) {
|
||||
// none were found, so we need to scan the spiders
|
||||
err = nil
|
||||
}
|
||||
if len(evs) < len(pubkeys) {
|
||||
if len(evs) < len(pubkeys) && !noFetch {
|
||||
// we need to search the spider seeds.
|
||||
for _, seed := range s.C.SpiderSeeds {
|
||||
var evss event.S
|
||||
var cli *ws.Client
|
||||
if cli, err = ws.RelayConnect(context.Bg(), seed); chk.E(err) {
|
||||
// Break up pubkeys into batches of 512
|
||||
log.I.F("breaking up %d pubkeys into batches of 512", len(pubkeys))
|
||||
for i := 0; i < len(pubkeys); i += 512 {
|
||||
end := i + 512
|
||||
if end > len(pubkeys) {
|
||||
end = len(pubkeys)
|
||||
}
|
||||
batchPubkeys := pubkeys[i:end]
|
||||
log.I.F(
|
||||
"processing batch %d to %d of %d pubkeys", i, end, len(pubkeys),
|
||||
)
|
||||
batchPkList := tag.New(batchPubkeys...)
|
||||
batchFilter := &filter.F{
|
||||
Kinds: kinds.New(k),
|
||||
Authors: batchPkList,
|
||||
}
|
||||
|
||||
for _, seed := range s.C.SpiderSeeds {
|
||||
select {
|
||||
case <-s.Ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
var evss event.S
|
||||
var cli *ws.Client
|
||||
if cli, err = ws.RelayConnect(context.Bg(), seed); chk.E(err) {
|
||||
err = nil
|
||||
continue
|
||||
}
|
||||
if evss, err = cli.QuerySync(
|
||||
context.Bg(), batchFilter,
|
||||
); chk.E(err) {
|
||||
err = nil
|
||||
continue
|
||||
}
|
||||
for _, ev := range evss {
|
||||
evs = append(evs, ev)
|
||||
}
|
||||
}
|
||||
}
|
||||
// save the events to the database
|
||||
for _, ev := range evs {
|
||||
if _, _, err = s.Storage().SaveEvent(s.Ctx, ev); chk.E(err) {
|
||||
err = nil
|
||||
continue
|
||||
}
|
||||
if evss, err = cli.QuerySync(context.Bg(), f); chk.E(err) {
|
||||
err = nil
|
||||
continue
|
||||
}
|
||||
for _, ev := range evss {
|
||||
evs = append(evs, ev)
|
||||
}
|
||||
}
|
||||
}
|
||||
// deduplicate and take the newest
|
||||
@@ -63,13 +95,6 @@ func (s *Server) SpiderFetch(
|
||||
tmp = append(tmp, evm[0])
|
||||
}
|
||||
evs = tmp
|
||||
// save the events to the database
|
||||
for _, ev := range evs {
|
||||
if _, _, err = s.Storage().SaveEvent(s.Ctx, ev); chk.E(err) {
|
||||
err = nil
|
||||
continue
|
||||
}
|
||||
}
|
||||
// we have all we're going to get now
|
||||
pkMap := make(map[string]struct{})
|
||||
for _, ev := range evs {
|
||||
|
||||
@@ -10,7 +10,7 @@ import (
|
||||
"orly.dev/pkg/utils/log"
|
||||
)
|
||||
|
||||
func (s *Server) Spider() (err error) {
|
||||
func (s *Server) Spider(noFetch ...bool) (err error) {
|
||||
var ownersPubkeys [][]byte
|
||||
for _, v := range s.C.Owners {
|
||||
var prf []byte
|
||||
@@ -44,24 +44,28 @@ func (s *Server) Spider() (err error) {
|
||||
// there is no OwnersPubkeys, so there is nothing to do.
|
||||
return
|
||||
}
|
||||
dontFetch := false
|
||||
if len(noFetch) > 0 && noFetch[0] {
|
||||
dontFetch = true
|
||||
}
|
||||
log.I.F("getting ownersFollowed")
|
||||
var ownersFollowed [][]byte
|
||||
if ownersFollowed, err = s.SpiderFetch(
|
||||
kind.FollowList, ownersPubkeys...,
|
||||
kind.FollowList, dontFetch, ownersPubkeys...,
|
||||
); chk.E(err) {
|
||||
return
|
||||
}
|
||||
log.I.F("getting followedFollows")
|
||||
var followedFollows [][]byte
|
||||
if followedFollows, err = s.SpiderFetch(
|
||||
kind.FollowList, ownersFollowed...,
|
||||
kind.FollowList, dontFetch, ownersFollowed...,
|
||||
); chk.E(err) {
|
||||
return
|
||||
}
|
||||
log.I.F("getting ownersMuted")
|
||||
var ownersMuted [][]byte
|
||||
if ownersMuted, err = s.SpiderFetch(
|
||||
kind.MuteList, ownersPubkeys...,
|
||||
kind.MuteList, dontFetch, ownersPubkeys...,
|
||||
); chk.E(err) {
|
||||
return
|
||||
}
|
||||
@@ -116,5 +120,14 @@ func (s *Server) Spider() (err error) {
|
||||
s.SetOwnersFollowed(ownersFollowed)
|
||||
s.SetFollowedFollows(followedFollows)
|
||||
s.SetOwnersMuted(ownersMuted)
|
||||
// lastly, update users profile metadata and relay lists in the background
|
||||
if !dontFetch {
|
||||
go func() {
|
||||
everyone := append(ownersFollowed, followedFollows...)
|
||||
s.SpiderFetch(kind.ProfileMetadata, false, everyone...)
|
||||
s.SpiderFetch(kind.RelayListMetadata, false, everyone...)
|
||||
s.SpiderFetch(kind.DMRelaysList, false, everyone...)
|
||||
}()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -18,9 +18,9 @@ func JSONKey(dst, k []byte) (b []byte) {
|
||||
return
|
||||
}
|
||||
|
||||
// UnmarshalHex takes a byte string that should contain a quoted hexadecimal encoded value,
|
||||
// decodes it in-place using a SIMD hex codec and returns the decoded truncated bytes (the other
|
||||
// half will be as it was but no allocation is required).
|
||||
// UnmarshalHex takes a byte string that should contain a quoted hexadecimal
|
||||
// encoded value, decodes it using a SIMD hex codec and returns the decoded
|
||||
// bytes in a newly allocated buffer.
|
||||
func UnmarshalHex(b []byte) (h []byte, rem []byte, err error) {
|
||||
rem = b[:]
|
||||
var inQuote bool
|
||||
@@ -32,24 +32,28 @@ func UnmarshalHex(b []byte) (h []byte, rem []byte, err error) {
|
||||
start = i + 1
|
||||
}
|
||||
} else if b[i] == '"' {
|
||||
h = b[start:i]
|
||||
hexStr := b[start:i]
|
||||
rem = b[i+1:]
|
||||
break
|
||||
l := len(hexStr)
|
||||
if l%2 != 0 {
|
||||
err = errorf.E(
|
||||
"invalid length for hex: %d, %0x",
|
||||
len(hexStr), hexStr,
|
||||
)
|
||||
return
|
||||
}
|
||||
// Allocate a new buffer for the decoded data
|
||||
h = make([]byte, l/2)
|
||||
if err = xhex.Decode(h, hexStr); chk.E(err) {
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
if !inQuote {
|
||||
err = io.EOF
|
||||
return
|
||||
}
|
||||
l := len(h)
|
||||
if l%2 != 0 {
|
||||
err = errorf.E("invalid length for hex: %d, %0x", len(h), h)
|
||||
return
|
||||
}
|
||||
if err = xhex.Decode(h, h); chk.E(err) {
|
||||
return
|
||||
}
|
||||
h = h[:l/2]
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -23,7 +23,7 @@ type HandlerWithSource struct {
|
||||
var (
|
||||
// RestartRequested is set true after restart is requested.
|
||||
RestartRequested bool // = true
|
||||
requested atomic.Bool
|
||||
requested atomic.Bool
|
||||
|
||||
// ch is used to receive SIGINT (Ctrl+C) signals.
|
||||
ch chan os.Signal
|
||||
@@ -63,6 +63,8 @@ func Listener() {
|
||||
HandlersDone.Q()
|
||||
if RestartRequested {
|
||||
Restart()
|
||||
} else {
|
||||
os.Exit(0)
|
||||
}
|
||||
}
|
||||
out:
|
||||
|
||||
Reference in New Issue
Block a user