fix maxlimit to appear in nip-11 and be properly enforced

add flatten flag for store so it flattens on request at shutdown if an import has taken place

import runs GC every 10000 events to limit fragmentation of tables

implement a specific count function that only decodes the event if there is an extra filter (which should be the minority of cases)

change search to use maps for indexes to automatically deduplicate in case of multiple queries matching the same field

improve logging to show origins of request/events in relay accept req/event

count now returns an aproximate flag for the case of extrafilters finding replaceable events
This commit is contained in:
2024-10-29 09:52:03 +00:00
parent e448b4a264
commit ce8f4add85
17 changed files with 207 additions and 74 deletions

View File

@@ -45,18 +45,17 @@ func (r *Relay) Init() (err E) {
return fmt.Sprintf("%v", ownerIds)
})
log.I.S(r.Owners)
r.CheckOwnerLists(context.Bg())
return nil
}
func (r *Relay) AcceptEvent(c context.T, evt *event.T, hr *http.Request, authedPubkey B) bool {
func (r *Relay) AcceptEvent(c context.T, evt *event.T, hr *http.Request, origin S,
authedPubkey B) bool {
// if the authenticator is enabled we require auth to accept events
if !r.AuthEnabled() {
log.I.F("auth not enabled")
return true
}
if len(authedPubkey) != 32 {
log.E.F("client not authed with auth required")
log.E.F("client not authed with auth required %s", origin)
return false
}
if len(r.Owners) > 0 {
@@ -70,8 +69,8 @@ func (r *Relay) AcceptEvent(c context.T, evt *event.T, hr *http.Request, authedP
// are regenerated for the next AcceptReq/AcceptEvent
r.Followed = make(map[S]struct{})
r.Muted = make(map[S]struct{})
log.I.F("clearing owner follow/mute lists because of update from %0x",
evt.PubKey)
log.I.F("clearing owner follow/mute lists because of update from %s %0x",
origin, evt.PubKey)
return true
}
}
@@ -94,15 +93,16 @@ func (r *Relay) AcceptEvent(c context.T, evt *event.T, hr *http.Request, authedP
}
// for all else, check the authed pubkey is in the follow list
for pk := range r.Followed {
// allow all events from follows of owners
if equals(authedPubkey, B(pk)) {
log.I.F("accepting event %0x because %0x on owner follow list",
evt.ID, B(pk))
return true
}
// todo: allow accepting events with p tag of a follow that the follow has not muted
// todo: this will allow outsiders to send messages to users
// todo: users will mute the user if they don't want to receive from this sender
}
// if the authed pubkey was not found, reject the request.
// log.I.F("authed pubkey %0x not found, rejecting event", authedPubkey)
// return false
}
// if auth is enabled and there is no moderators we just check that the pubkey
// has been loaded via the auth function.
@@ -167,7 +167,8 @@ func (r *Relay) CheckOwnerLists(c context.T) {
if dst, err = hex.DecAppend(dst, t.Value()); chk.E(err) {
continue
}
r.Followed[S(dst)] = struct{}{}
f := S(dst)
r.Followed[f] = struct{}{}
}
}
}
@@ -181,16 +182,9 @@ func (r *Relay) CheckOwnerLists(c context.T) {
Kinds: kinds.New(kind.MuteList)}); chk.E(err) {
}
// // preallocate sufficient elements
// var count int
// for _, ev := range evs {
// for _, t := range ev.Tags.F() {
// if equals(t.Key(), B{'p'}) {
// count++
// }
// }
// }
r.Muted = make(map[S]struct{})
mutes := "mutes(access blacklist),["
var first bool
for _, ev := range evs {
for _, t := range ev.Tags.F() {
if equals(t.Key(), B{'p'}) {
@@ -198,19 +192,26 @@ func (r *Relay) CheckOwnerLists(c context.T) {
if dst, err = hex.DecAppend(dst, t.Value()); chk.E(err) {
continue
}
r.Muted[S(dst)] = struct{}{}
if !first {
mutes += ","
} else {
first = true
}
m := S(dst)
mutes += `"` + m + `"`
r.Muted[m] = struct{}{}
}
}
}
o := "followed:\n"
for pk := range r.Followed {
o += fmt.Sprintf("%x,", pk)
o += fmt.Sprintf("%0x,", pk)
}
o += "\nmuted:\n"
for pk := range r.Muted {
o += fmt.Sprintf("%x,", pk)
o += fmt.Sprintf("%0x,", pk)
}
// log.T.F("%s\n", o)
log.D.F("%s\n", o)
}
}
}

View File

@@ -48,7 +48,7 @@ func main() {
r := &app.Relay{Config: cfg, Store: storage}
go app.MonitorResources(c)
var server *realy.Server
if server, err = realy.NewServer(c, cancel, r, path); chk.E(err) {
if server, err = realy.NewServer(c, cancel, r, path, ratel.DefaultMaxLimit); chk.E(err) {
os.Exit(1)
}
if err != nil {

View File

@@ -14,25 +14,30 @@ const maxLen = 500000000
// Import accepts an event
func (r *T) Import(rr io.Reader) {
r.Flatten = true
scan := bufio.NewScanner(rr)
buf := make(B, maxLen)
scan.Buffer(buf, maxLen)
var err E
var count N
for scan.Scan() {
b := scan.Bytes()
// if len(b) > 8192 {
// log.I.F("saving,%s", b)
// }
log.I.F("importing,%s", b)
ev := &event.T{}
if _, err = ev.UnmarshalJSON(b); chk.E(err) {
log.I.F("%s", b)
if _, err = ev.UnmarshalJSON(b); err != nil {
continue
}
if err = r.SaveEvent(r.Ctx, ev); chk.E(err) {
if err = r.SaveEvent(r.Ctx, ev); err != nil {
continue
}
log.I.F("saved,%0x", ev.ID)
count++
if count > 0 && count%10000 == 0 {
chk.T(r.DB.Sync())
log.I.F("imported 10000/%d events, running GC on new data", count)
chk.T(r.DB.RunValueLogGC(0.5))
}
}
err = scan.Err()
if chk.E(err) {

View File

@@ -3,16 +3,15 @@ package ratel
func (r *T) Close() (err E) {
// chk.E(r.DB.Sync())
log.I.F("closing database %s", r.Path())
if err = r.DB.Flatten(4); chk.E(err) {
return
if r.Flatten {
if err = r.DB.Flatten(4); chk.E(err) {
}
log.D.F("database flattened")
}
log.D.F("database flattened")
if err = r.seq.Release(); chk.E(err) {
return
}
log.D.F("database released")
if err = r.DB.Close(); chk.E(err) {
return
}
log.I.F("database closed")
return

View File

@@ -1,14 +1,121 @@
package ratel
import (
"errors"
"github.com/dgraph-io/badger/v4"
"realy.lol/event"
"realy.lol/filter"
"realy.lol/ratel/keys/createdat"
"realy.lol/ratel/keys/index"
"realy.lol/ratel/keys/serial"
"realy.lol/sha256"
"realy.lol/tag"
)
func (r *T) CountEvents(c Ctx, f *filter.T) (count N, err E) {
var evs []*event.T
evs, err = r.QueryEvents(c, f)
count = len(evs)
evs = nil
func (r *T) CountEvents(c Ctx, f *filter.T) (count N, approx bool, err E) {
log.T.F("QueryEvents,%s", f.Serialize())
var queries []query
var extraFilter *filter.T
var since uint64
if queries, extraFilter, since, err = PrepareQueries(f); chk.E(err) {
return
}
// search for the keys generated from the filter
for _, q := range queries {
select {
case <-c.Done():
return
default:
}
var eventKey B
err = r.View(func(txn *badger.Txn) (err E) {
// iterate only through keys and in reverse order
opts := badger.IteratorOptions{
Reverse: true,
}
it := txn.NewIterator(opts)
defer it.Close()
for it.Seek(q.start); it.ValidForPrefix(q.searchPrefix); it.Next() {
item := it.Item()
k := item.KeyCopy(nil)
if !q.skipTS {
if len(k) < createdat.Len+serial.Len {
continue
}
createdAt := createdat.FromKey(k)
if createdAt.Val.U64() < since {
break
}
}
ser := serial.FromKey(k)
eventKey = index.Event.Key(ser)
// eventKeys = append(eventKeys, idx)
}
return
})
if chk.E(err) {
// this means shutdown, probably
if errors.Is(err, badger.ErrDBClosed) {
return
}
}
if extraFilter != nil {
// if there is an extra filter we need to fetch and decode the event to determine a
// match.
err = r.View(func(txn *badger.Txn) (err E) {
opts := badger.IteratorOptions{Reverse: true}
it := txn.NewIterator(opts)
defer it.Close()
for it.Seek(eventKey); it.ValidForPrefix(eventKey); it.Next() {
item := it.Item()
if r.HasL2 && item.ValueSize() == sha256.Size {
// we will count this though it may not match in fact. for general,
// simple filters there isn't likely to be an extrafilter anyway. the
// count result can have an "approximate" flag so we flip this now.
approx = true
return
}
ev := &event.T{}
var appr bool
if err = item.Value(func(eventValue []byte) (err E) {
var rem B
if rem, err = ev.UnmarshalBinary(eventValue); chk.E(err) {
ev = nil
eventValue = eventValue[:0]
return
}
if len(rem) > 0 {
log.T.S(rem)
}
if ev.Kind.IsReplaceable() ||
(ev.Kind.IsParameterizedReplaceable() &&
ev.Tags.GetFirst(tag.New("d")) != nil) {
// we aren't going to spend this extra time so this just flips the
// approximate flag. generally clients are asking for counts to get
// an outside estimate anyway, to avoid exceeding MaxLimit
appr = true
}
return
}); chk.E(err) {
continue
}
if ev == nil {
continue
}
if extraFilter.Matches(ev) {
count++
if appr {
approx = true
}
return
}
}
return
})
} else {
count++
}
}
return
}

View File

@@ -43,6 +43,9 @@ type T struct {
// ActuallyDelete sets whether we actually delete or rewrite deleted entries with a modified
// deleted prefix value (8th bit set)
ActuallyDelete bool
// Flatten should be set to true to trigger a flatten at close... this is mainly
// triggered by running an import
Flatten bool
}
var _ eventstore.I = (*T)(nil)

View File

@@ -26,8 +26,7 @@ func (r *T) QueryEvents(c Ctx, f *filter.T) (evs []*event.T, err E) {
return
}
// search for the keys generated from the filter
// out:
var eventKeys [][]byte
eventKeys := make(map[S]struct{})
for _, q := range queries {
select {
case <-c.Done():
@@ -55,7 +54,8 @@ func (r *T) QueryEvents(c Ctx, f *filter.T) (evs []*event.T, err E) {
}
ser := serial.FromKey(k)
idx := index.Event.Key(ser)
eventKeys = append(eventKeys, idx)
eventKeys[S(idx)] = struct{}{}
// eventKeys = append(eventKeys, idx)
}
return
})
@@ -66,17 +66,20 @@ func (r *T) QueryEvents(c Ctx, f *filter.T) (evs []*event.T, err E) {
}
}
}
log.T.F("found %d event indexes", len(eventKeys))
select {
case <-c.Done():
return
default:
}
for _, eventKey := range eventKeys {
for ek := range eventKeys {
eventKey := B(ek)
select {
case <-c.Done():
return
default:
}
var done bool
err = r.View(func(txn *badger.Txn) (err E) {
opts := badger.IteratorOptions{Reverse: true}
it := txn.NewIterator(opts)
@@ -153,22 +156,22 @@ func (r *T) QueryEvents(c Ctx, f *filter.T) (evs []*event.T, err E) {
}
if extraFilter == nil || extraFilter.Matches(ev) {
evMap[hex.Enc(ev.ID)] = ev
// evs = append(evs, ev)
if filter.Present(f.Limit) {
*f.Limit--
if *f.Limit <= 0 {
log.I.F("found events: %d", len(evs))
return
}
} else {
// if there is no limit, cap it at the MaxLimit, assume this was the
// intent or the client is erroneous, if any limit greater is
// requested this will be used instead as the previous clause.
if len(evMap) > r.MaxLimit {
log.I.F("found MaxLimit events: %d", len(evs))
log.I.F("found events: %d", len(evMap))
return
}
}
// if there is no limit, cap it at the MaxLimit, assume this was the
// intent or the client is erroneous, if any limit greater is
// requested this will be used instead as the previous clause.
if len(evMap) >= r.MaxLimit {
log.T.F("found MaxLimit events: %d", len(evMap))
done = true
return
}
}
}
return
@@ -179,6 +182,10 @@ func (r *T) QueryEvents(c Ctx, f *filter.T) (evs []*event.T, err E) {
return
}
}
if done {
err = nil
return
}
select {
case <-c.Done():
return

View File

@@ -125,7 +125,7 @@ func (r *T) SaveEvent(c Ctx, ev *event.T) (err E) {
if err = txn.Set(counterKey, val); chk.E(err) {
return
}
log.D.F("saved event to ratel %s:\n%s", r.dataDir, ev.Serialize())
// log.D.F("saved event to ratel %s:\n%s", r.dataDir, ev.Serialize())
return
}); chk.E(err) {
return

View File

@@ -14,7 +14,8 @@ import (
var nip20prefixmatcher = regexp.MustCompile(`^\w+: `)
// AddEvent has a business rule to add an event to the relayer
func AddEvent(c Ctx, rl relay.I, ev *event.T, hr *http.Request, authedPubkey B) (accepted bool,
func AddEvent(c Ctx, rl relay.I, ev *event.T, hr *http.Request, origin S,
authedPubkey B) (accepted bool,
message B) {
if ev == nil {
return false, normalize.Invalid.F("empty event")
@@ -24,7 +25,7 @@ func AddEvent(c Ctx, rl relay.I, ev *event.T, hr *http.Request, authedPubkey B)
wrapper := &eventstore.RelayWrapper{I: store}
advancedSaver, _ := store.(relay.AdvancedSaver)
if !rl.AcceptEvent(c, ev, hr, authedPubkey) {
if !rl.AcceptEvent(c, ev, hr, origin, authedPubkey) {
return false, normalize.Blocked.F("event rejected by relay")
}

View File

@@ -49,6 +49,7 @@ func (s *Server) HandleNIP11(w http.ResponseWriter, r *http.Request) {
Nips: supportedNIPs,
Software: "https://realy.lol",
Version: version,
Limitation: ri.Limits{MaxLimit: s.maxLimit},
}
}
if err := json.NewEncoder(w).Encode(info); chk.E(err) {

View File

@@ -132,7 +132,7 @@ func (s *Server) doEvent(c Ctx, ws *web.Socket, req B, sto store.I) (msg B) {
log.I.F("extra '%s'", rem)
}
if !s.relay.AcceptEvent(c, env.T, ws.Req(), B(ws.Authed())) {
if !s.relay.AcceptEvent(c, env.T, ws.Req(), ws.RealRemote(), B(ws.Authed())) {
var auther relay.Authenticator
if auther, ok = s.relay.(relay.Authenticator); ok && auther.AuthEnabled() {
if !ws.AuthRequested() {
@@ -281,7 +281,7 @@ func (s *Server) doEvent(c Ctx, ws *web.Socket, req B, sto store.I) (msg B) {
}
// if the event is a delete we still want to save it.
}
ok, reason := AddEvent(c, s.relay, env.T, ws.Req(), B(ws.Authed()))
ok, reason := AddEvent(c, s.relay, env.T, ws.Req(), ws.RealRemote(), B(ws.Authed()))
if err = okenvelope.NewFrom(env.ID, ok, reason).Write(ws); chk.E(err) {
return
}
@@ -337,6 +337,7 @@ func (s *Server) doCount(c context.Context, ws *web.Socket, req B,
}
var total N
var approx bool
for _, f := range env.Filters.F {
// prevent kind-4 events from being returned to unauthed users, only when
// authentication is a thing
@@ -370,7 +371,7 @@ func (s *Server) doCount(c context.Context, ws *web.Socket, req B,
}
}
var count N
count, err = counter.CountEvents(c, f)
count, approx, err = counter.CountEvents(c, f)
if err != nil {
log.E.F("store: %v", err)
continue
@@ -379,7 +380,7 @@ func (s *Server) doCount(c context.Context, ws *web.Socket, req B,
}
var res *countenvelope.Response
if res, err = countenvelope.NewResponseFrom(env.ID.String(), N(total),
false); chk.E(err) {
approx); chk.E(err) {
return
}
if err = res.Write(ws); chk.E(err) {
@@ -461,10 +462,10 @@ func (s *Server) doReq(c Ctx, ws *web.Socket, req B, sto store.I) (r B) {
"this realy does not serve kind-4 to unauthenticated users," +
" does your client implement NIP-42?")
return notice
case senders.Contains(ws.AuthedBytes()) || receivers.ContainsAny(B("#p"),
tag.New(ws.AuthedBytes())):
log.T.F("user %0x allowed to query for DM", ws.AuthedBytes())
// allowed filter: ws.authed is sole receiver (filter specifies one or all senders)
case senders.Contains(ws.AuthedBytes()) ||
receivers.ContainsAny(B("#p"), tag.New(ws.AuthedBytes())):
log.T.F("user %0x from %s allowed to query for privileged event",
ws.AuthedBytes(), ws.RealRemote())
default:
// restricted filter: do not return any events,
// even if other elements in filters array were not restricted).
@@ -475,8 +476,7 @@ func (s *Server) doReq(c Ctx, ws *web.Socket, req B, sto store.I) (r B) {
}
}
var events []*event.T
events, err = sto.QueryEvents(c, f)
if err != nil {
if events, err = sto.QueryEvents(c, f); err != nil {
log.E.F("eventstore: %v", err)
if errors.Is(err, badger.ErrDBClosed) {
return

View File

@@ -38,13 +38,15 @@ type Server struct {
serveMux *http.ServeMux
httpServer, adminServer *http.Server
authRequired bool
maxLimit N
}
func (s *Server) Router() *http.ServeMux { return s.serveMux }
// NewServer initializes the realy and its storage using their respective Init methods,
// returning any non-nil errors, and returns a Server ready to listen for HTTP requests.
func NewServer(c Ctx, cancel context.F, rl relay.I, dbPath S, opts ...Option) (*Server, E) {
func NewServer(c Ctx, cancel context.F, rl relay.I, dbPath S, maxLimit N,
opts ...Option) (*Server, E) {
options := DefaultOptions()
for _, opt := range opts {
opt(options)
@@ -61,6 +63,7 @@ func NewServer(c Ctx, cancel context.F, rl relay.I, dbPath S, opts ...Option) (*
serveMux: http.NewServeMux(),
options: options,
authRequired: authRequired,
maxLimit: maxLimit,
}
if storage := rl.Storage(context.Bg()); storage != nil {

View File

@@ -8,9 +8,12 @@ import (
"github.com/gobwas/ws/wsutil"
"realy.lol/context"
"realy.lol/ratel"
"realy.lol/ws"
)
// todo: this needs updating
func TestServerStartShutdown(t *testing.T) {
var (
inited bool
@@ -28,7 +31,7 @@ func TestServerStartShutdown(t *testing.T) {
init: func() E { storeInited = true; return nil },
},
}
srv, _ := NewServer(context.Bg(), rl, "")
srv, _ := NewServer(context.Bg(), nil, rl, "", ratel.DefaultMaxLimit)
ready := make(chan bool)
done := make(chan E)
go func() {

View File

@@ -11,6 +11,8 @@ import (
eventstore "realy.lol/store"
)
// todo: this needs updating
func startTestRelay(c context.T, t *testing.T, tr *testRelay) *Server {
t.Helper()
srv, _ := NewServer(c, tr, "")
@@ -30,7 +32,7 @@ type testRelay struct {
func (tr *testRelay) Name() S { return tr.name }
func (tr *testRelay) Storage(context.T) eventstore.I { return tr.storage }
func (tr *testRelay) Origin() S { return "example.com" }
func (tr *testRelay) Init() E {
if fn := tr.init; fn != nil {
return fn()

View File

@@ -32,7 +32,7 @@ type I interface {
// messages, that are not on the mute list, that do not yet have a reply, should accept
// direct and group message events until there is three and thereafter will be restricted
// until the user adds them to their follow list.
AcceptEvent(c Ctx, ev *event.T, hr *http.Request, authedPubkey B) bool
AcceptEvent(c Ctx, ev *event.T, hr *http.Request, origin S, authedPubkey B) bool
// Storage returns the realy storage implementation.
Storage(Ctx) store.I
}
@@ -105,5 +105,5 @@ type AdvancedSaver interface {
}
type EventCounter interface {
CountEvents(c Ctx, f *filter.T) (count N, err E)
CountEvents(c Ctx, f *filter.T) (count N, approx bool, err E)
}

View File

@@ -50,7 +50,7 @@ func (w RelayWrapper) Publish(c Ctx, evt *event.T) (err E) {
if ev.CreatedAt.Int() > evt.CreatedAt.Int() {
return errorf.W(S(normalize.Invalid.F("not replacing newer event")))
}
log.I.F("%s\nreplacing\n%s", evt.Serialize(), ev.Serialize())
log.T.F("%s\nreplacing\n%s", evt.Serialize(), ev.Serialize())
if err = w.I.DeleteEvent(c, ev.EventID()); chk.E(err) {
continue
}
@@ -63,7 +63,8 @@ func (w RelayWrapper) Publish(c Ctx, evt *event.T) (err E) {
f := filter.New()
f.Authors = tag.New(evt.PubKey)
f.Kinds = kinds.New(evt.Kind)
log.I.F("filter for parameterized replaceable %v %s", f.Tags.ToStringSlice(), f.Serialize())
log.I.F("filter for parameterized replaceable %v %s", f.Tags.ToStringSlice(),
f.Serialize())
evs, err = w.I.QueryEvents(c, f)
if err != nil {
return fmt.Errorf("failed to query before replacing: %w", err)

View File

@@ -27,7 +27,7 @@ type I interface {
QueryEvents(c Ctx, f *filter.T) (evs []*event.T, err E)
// CountEvents performs the same work as QueryEvents but instead of delivering the events
// that were found it just returns the count of events
CountEvents(c Ctx, f *filter.T) (count N, err E)
CountEvents(c Ctx, f *filter.T) (count N, approx bool, err E)
// DeleteEvent is used to handle deletion events, as per NIP-09.
DeleteEvent(c Ctx, ev *eventid.T) (err E)
// SaveEvent is called once Relay.AcceptEvent reports true.