now doesn't crash on bulk upload but big speed problem

This commit is contained in:
2024-10-25 08:26:35 +01:00
parent a13f9bbc46
commit 1958658ecb
23 changed files with 148 additions and 58 deletions

View File

@@ -78,7 +78,7 @@ func (r *Relay) AcceptEvent(c context.T, evt *event.T, hr *http.Request, authedP
// they come from a pubkey that is on the follow list.
for pk := range r.Muted {
if equals(evt.PubKey, B(pk)) {
log.I.F("rejecting event with pubkey %v because on owner mute list",
log.I.F("rejecting event with pubkey %0x because on owner mute list",
evt.PubKey)
return false
}
@@ -86,7 +86,8 @@ func (r *Relay) AcceptEvent(c context.T, evt *event.T, hr *http.Request, authedP
// for all else, check the authed pubkey is in the follow list
for pk := range r.Followed {
if equals(authedPubkey, B(pk)) {
log.I.F("accepting event %0x because on owner follow list", evt.ID)
log.I.F("accepting event %0x because %0x on owner follow list",
evt.ID, B(pk))
return true
}
}

View File

@@ -2,10 +2,13 @@ package main
import (
"fmt"
"net/http"
_ "net/http/pprof"
"os"
"path/filepath"
"sync"
"github.com/pkg/profile"
"realy.lol/cmd/realy/app"
"realy.lol/context"
"realy.lol/interrupt"
@@ -16,6 +19,12 @@ import (
)
func main() {
defer profile.Start(profile.MemProfile).Stop()
go func() {
http.ListenAndServe(":8080", nil)
}()
var err E
var cfg *app.Config
if cfg, err = app.NewConfig(); err != nil || app.HelpRequested() {
@@ -38,15 +47,15 @@ func main() {
r := &app.Relay{Config: cfg, Store: storage}
go app.MonitorResources(c)
var server *realy.Server
if server, err = realy.NewServer(r, path); chk.E(err) {
if server, err = realy.NewServer(c, cancel, r, path); chk.E(err) {
os.Exit(1)
}
if err != nil {
log.F.F("failed to create server: %v", err)
}
interrupt.AddHandler(func() { server.Shutdown(c) })
interrupt.AddHandler(func() { server.Shutdown() })
if err = server.Start(cfg.Listen, cfg.Port, cfg.AdminListen, cfg.AdminPort); chk.E(err) {
log.F.F("server terminated: %v", err)
}
cancel()
// cancel()
}

View File

@@ -157,9 +157,8 @@ func (w *Writer) WriteTags(t *tags.T) (err E) {
switch {
case secondIsHex:
w.Buf = appendUvarint(w.Buf, uint64(32))
if w.Buf, err = hex.DecAppend(w.Buf, ts); chk.E(err) {
if w.Buf, err = hex.DecAppend(w.Buf, ts); err != nil {
// the value MUST be hex by the spec
log.W.Ln(t.N(i))
return
}
continue scanning
@@ -169,7 +168,7 @@ func (w *Writer) WriteTags(t *tags.T) (err E) {
// first is 2 bytes size
var n int
k := kind.New(0)
if _, err = k.UnmarshalJSON(split[0]); chk.E(err) {
if _, err = k.UnmarshalJSON(split[0]); err != nil {
return
}
if len(split) > 1 {
@@ -232,7 +231,7 @@ func (w *Writer) WriteEvent(ev *T) (err error) {
if err = w.WriteKind(ev.Kind); chk.E(err) {
return
}
if err = w.WriteTags(ev.Tags); chk.E(err) {
if err = w.WriteTags(ev.Tags); err != nil {
return
}
if err = w.WriteContent(ev.Content); chk.E(err) {

View File

@@ -93,6 +93,9 @@ func (r *Reader) ReadTags() (t *tags.T, err error) {
nTags := int(vi)
var end int
r.Pos += read
// if nTags > 500 {
// log.I.F("new tags with %d elements (follow list probably)", nTags)
// }
t = tags.NewWithCap(nTags)
// t = &tags.T{T: make([]*tag.T, nTags)}
// t = make(tags.T, nTags)
@@ -105,6 +108,7 @@ func (r *Reader) ReadTags() (t *tags.T, err error) {
}
lenTag := int(vi)
r.Pos += read
// log.I.F("adding capacity %d at tag %d", lenTag, i)
t.AddCap(i, lenTag)
// t.T[i] = tag.NewWithCap(lenTag)
// extract the individual tag strings
@@ -172,11 +176,12 @@ func (r *Reader) ReadTags() (t *tags.T, err error) {
r.Pos = fieldEnd
t.AppendTo(i, B(fmt.Sprintf("%d:%0x:%s",
k, hex.Enc(pk), string(r.Buf[r.Pos:end]))))
r.Pos = end
// t.N(i).Field = append(t.N(i).Field, B(fmt.Sprintf("%d:%0x:%s",
// k,
// hex.Enc(pk),
// string(r.Buf[r.Pos:end]))))
r.Pos = end
continue reading
}
}
t.AppendTo(i, r.Buf[r.Pos:r.Pos+int(vi)])

View File

@@ -43,9 +43,9 @@ type T struct {
func New() (f *T) {
return &T{
IDs: tag.NewWithCap(100),
IDs: tag.NewWithCap(10),
Kinds: kinds.NewWithCap(10),
Authors: tag.NewWithCap(100),
Authors: tag.NewWithCap(10),
Tags: tags.New(),
Since: timestamp.New(),
Until: timestamp.New(),

3
go.mod
View File

@@ -13,6 +13,7 @@ require (
github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0
github.com/klauspost/cpuid/v2 v2.2.8
github.com/kopoli/go-terminal-size v0.0.0-20170219200355-5c97524c8b54
github.com/pkg/profile v1.7.0
github.com/puzpuzpuz/xsync/v3 v3.4.0
github.com/rakyll/globalconf v0.0.0-20180912185831-87f8127c421f
github.com/rs/cors v1.11.1
@@ -36,12 +37,14 @@ require (
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/dgraph-io/ristretto v0.1.2-0.20240116140435-c67e07994f91 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/felixge/fgprof v0.9.3 // indirect
github.com/glacjay/goini v0.0.0-20161120062552-fd3024d87ee2 // indirect
github.com/gobwas/pool v0.2.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/flatbuffers v23.5.26+incompatible // indirect
github.com/google/pprof v0.0.0-20211214055906-6f57359322fd // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect

11
go.sum
View File

@@ -13,6 +13,9 @@ github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer5
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@@ -34,6 +37,8 @@ github.com/fasthttp/websocket v1.5.10 h1:bc7NIGyrg1L6sd5pRzCIbXpro54SZLEluZCu0rO
github.com/fasthttp/websocket v1.5.10/go.mod h1:BwHeuXGWzCW1/BIKUKD3+qfCl+cTdsHu/f243NcAI/Q=
github.com/fatih/color v1.17.0 h1:GlRw1BRJxkpqUCBKzKOw098ed57fEsKeNjpTe3cSjK4=
github.com/fatih/color v1.17.0/go.mod h1:YZ7TlrGPkiz6ku9fK3TLD/pl3CpsiFyu8N92HLgmosI=
github.com/felixge/fgprof v0.9.3 h1:VvyZxILNuCiUCSXtPtYmmtGvb65nqXh2QFWc0Wpf2/g=
github.com/felixge/fgprof v0.9.3/go.mod h1:RdbpDgzqYVh/T9fPELJyV7EYJuHB55UTEULNun8eiPw=
github.com/glacjay/goini v0.0.0-20161120062552-fd3024d87ee2 h1:+SEORW3KptcFnlhTbn7N0drG3AFnrcmBDWDyQ3Bt06o=
github.com/glacjay/goini v0.0.0-20161120062552-fd3024d87ee2/go.mod h1:1vW2LGZb8uLSqmYBOdxvhiwATuLtmyUTMezM3cHrIHQ=
github.com/gobwas/httphead v0.1.0 h1:exrUm0f4YX0L7EBwZHuCF4GDp8aJfVeBrlLQrs6NqWU=
@@ -70,7 +75,10 @@ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/pprof v0.0.0-20211214055906-6f57359322fd h1:1FjCyPC+syAzJ5/2S8fqdZK1R22vvA0J7JZKcuOIQ7Y=
github.com/google/pprof v0.0.0-20211214055906-6f57359322fd/go.mod h1:KgnwoLYCZ8IQu3XUZ8Nc/bM9CCZFOyjUNOSygVozoDg=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/ianlancetaylor/demangle v0.0.0-20210905161508-09a460cdf81d/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w=
github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0 h1:iQTw/8FWTuc7uiaSepXwyf3o52HaUYcV+Tu66S3F5GA=
github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0/go.mod h1:1NbS8ALrpOvjt0rHPNLyCIeMtbizbir8U//inJ+zuB8=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
@@ -88,6 +96,8 @@ github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWE
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/profile v1.7.0 h1:hnbDkaNWPCLMO9wGLdBFTIZvzDrDfBM2072E1S9gJkA=
github.com/pkg/profile v1.7.0/go.mod h1:8Uer0jas47ZQMJ7VD+OHknK4YDY07LPUC6dEvqDjvNo=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
@@ -164,6 +174,7 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=

View File

@@ -44,6 +44,9 @@ github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chzyer/logex v1.1.10 h1:Swpa1K6QvQznwJRcfTfQJmTE72DqScAa40E+fbHEXEE=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e h1:fY5BOSpyZCqRo5OhCuC+XN+r/bBCmeuuJtjz+bCNIf8=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1 h1:q763qf9huN11kDQavWsoZXJNW3xEE4JJyHa5Q25/sd8=
github.com/client9/misspell v0.3.4 h1:ta993UF76GwbvJcIo3Y68y/M3WxlpEHPWIGDkJYwzJI=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f h1:WBZRG4aNOuI15bLRrCgN8fCq8E5Xuty6jGbmSNEvSsU=
github.com/dgraph-io/badger/v4 v4.2.0 h1:kJrlajbXXL9DFTNuhhu9yCx7JJa4qpYWxtE8BzuWsEs=
@@ -84,6 +87,7 @@ github.com/graph-gophers/dataloader/v7 v7.1.0 h1:Wn8HGF/q7MNXcvfaBnLEPEFJttVHR8z
github.com/graph-gophers/dataloader/v7 v7.1.0/go.mod h1:1bKE0Dm6OUcTB/OAuYVOZctgIz7Q3d0XrYtlIzTgg6Q=
github.com/greatroar/blobloom v0.8.0 h1:I9RlEkfqK9/6f1v9mFmDYegDQ/x0mISCpiNpAm23Pt4=
github.com/greatroar/blobloom v0.8.0/go.mod h1:mjMJ1hh1wjGVfr93QIHJ6FfDNVrA0IELv8OvMHJxHKs=
github.com/ianlancetaylor/demangle v0.0.0-20210905161508-09a460cdf81d h1:uGg2frlt3IcT7kbV6LEp5ONv4vmoO2FW4qSO+my/aoM=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/jmoiron/sqlx v1.3.5 h1:vFFPA71p1o5gAeqtEAwLU4dnX2napprKtHr7PYIcN3g=
@@ -147,4 +151,3 @@ google.golang.org/grpc v1.56.3 h1:8I4C0Yq1EjstUzUJzpcRVbuYA2mODtEmpWiQoN/b2nc=
google.golang.org/grpc v1.56.3/go.mod h1:I9bI3vqKfayGqPUAwGdOSu7kt6oIJLixfffKrpXqQ9s=
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=

View File

@@ -36,7 +36,7 @@ func DecAppend(dst, src B) (b B, err error) {
l := len(dst)
b = dst
b = append(b, make(B, len(src)/2)...)
if err = xhex.Decode(b[l:], src); chk.E(err) {
if err = xhex.Decode(b[l:], src); err != nil {
return
}
return

View File

@@ -67,7 +67,7 @@ func (k *T) MarshalJSON(dst B) (b B, err E) { return ints.New(k.ToU64()).Marshal
func (k *T) UnmarshalJSON(b B) (r B, err E) {
n := ints.New(0)
if r, err = n.UnmarshalJSON(b); chk.E(err) {
if r, err = n.UnmarshalJSON(b); err != nil {
return
}
k.K = n.Uint16()

View File

@@ -5,6 +5,7 @@ import (
"io"
"github.com/dgraph-io/badger/v4"
"realy.lol/context"
"realy.lol/event"
"realy.lol/ratel/keys/index"
)
@@ -19,11 +20,12 @@ func (r *T) Import(rr io.Reader) {
var err E
for scan.Scan() {
b := scan.Bytes()
if len(b) > 8192 {
log.I.F("saving,%s", b)
}
// if len(b) > 8192 {
// log.I.F("saving,%s", b)
// }
ev := &event.T{}
if _, err = ev.UnmarshalJSON(b); chk.E(err) {
log.I.F("%s", b)
continue
}
if err = r.SaveEvent(r.Ctx, ev); err != nil {
@@ -36,13 +38,18 @@ func (r *T) Import(rr io.Reader) {
return
}
func (r *T) Export(w io.Writer) {
func (r *T) Export(c context.T, w io.Writer) {
var counter int
err := r.View(func(txn *badger.Txn) (err error) {
it := txn.NewIterator(badger.IteratorOptions{Prefix: index.Event.Key()})
defer it.Close()
var started bool
for it.Rewind(); it.Valid(); it.Next() {
select {
case <-r.Ctx.Done():
return
default:
}
item := it.Item()
if started {
_, _ = w.Write(B{'\n'})
@@ -64,8 +71,9 @@ func (r *T) Export(w io.Writer) {
log.T.S(rem)
}
if _, err = w.Write(ev.Serialize()); chk.E(err) {
err = nil
continue
// err = nil
// continue
return
}
counter++
}

View File

@@ -1,7 +1,7 @@
package ratel
func (r *T) Close() (err E) {
chk.E(r.DB.Sync())
// chk.E(r.DB.Sync())
log.I.F("closing database %s", r.Path())
if err = r.DB.Flatten(4); chk.E(err) {
return

View File

@@ -77,6 +77,7 @@ func GetIndexKeysForEvent(ev *event.T, ser *serial.T) (keyz [][]byte) {
// parts
prf, elems := index.P(0), []keys.Element(nil)
if prf, elems, err = GetTagKeyElements(S(t.F()[1]), CA, ser); chk.E(err) {
log.I.F("%v", t.ToStringSlice())
return
}
k := prf.Key(elems...)

View File

@@ -32,6 +32,8 @@ func GetTagKeyElements(tagValue string, CA *createdat.T,
}
prf, elems = index.Tag32, keys.Make(pkk, ser)
return
} else {
err = nil
}
}
// check for a tag

View File

@@ -1,6 +1,8 @@
package ratel
import (
"errors"
"github.com/dgraph-io/badger/v4"
"realy.lol/event"
"realy.lol/filter"
@@ -24,6 +26,11 @@ func (r *T) QueryEvents(c Ctx, f *filter.T) (evs []*event.T, err E) {
// search for the keys generated from the filter
var eventKeys [][]byte
for _, q := range queries {
select {
case <-c.Done():
return
default:
}
err = r.View(func(txn *badger.Txn) (err E) {
// iterate only through keys and in reverse order
opts := badger.IteratorOptions{
@@ -52,11 +59,24 @@ func (r *T) QueryEvents(c Ctx, f *filter.T) (evs []*event.T, err E) {
return
})
if chk.E(err) {
// this can't actually happen because the View function above does not set err.
// this means shutdown, probably
if errors.Is(err, badger.ErrDBClosed) {
return
}
}
select {
case <-c.Done():
return
default:
}
search:
for _, eventKey := range eventKeys {
var v B
select {
case <-c.Done():
return
default:
}
err = r.View(func(txn *badger.Txn) (err E) {
opts := badger.IteratorOptions{Reverse: true}
it := txn.NewIterator(opts)
@@ -66,10 +86,11 @@ func (r *T) QueryEvents(c Ctx, f *filter.T) (evs []*event.T, err E) {
item := it.Item()
// k := item.KeyCopy(nil)
// log.T.S(k)
if v, err = item.ValueCopy(nil); chk.E(err) {
continue
}
if r.HasL2 && len(v) == sha256.Size {
// if v, err = item.ValueCopy(nil); chk.E(err) {
// continue
// }
if r.HasL2 && item.ValueSize() == sha256.Size {
// this is a stub entry that indicates an L2 needs to be accessed for it, so
// we populate only the event.T.ID and return the result, the caller will
// expect this as a signal to query the L2 event store.
@@ -90,9 +111,20 @@ func (r *T) QueryEvents(c Ctx, f *filter.T) (evs []*event.T, err E) {
}
return
})
if err != nil {
// this means shutdown, probably
if errors.Is(err, badger.ErrDBClosed) {
return
}
}
if v == nil {
continue
}
select {
case <-c.Done():
return
default:
}
ev := &event.T{}
var rem B
if rem, err = ev.UnmarshalBinary(v); chk.E(err) {
@@ -148,6 +180,7 @@ func (r *T) QueryEvents(c Ctx, f *filter.T) (evs []*event.T, err E) {
}
}
}
// todo: this should filter out mutes
// if len(evs) > 0 {
// log.T.C(func() (o string) {
// o = "sending events\n"

View File

@@ -96,7 +96,7 @@ func (r *T) SaveEvent(c Ctx, ev *event.T) (err E) {
return
}
var bin B
if bin, err = ev.MarshalBinary(bin); chk.E(err) {
if bin, err = ev.MarshalBinary(bin); err != nil {
return
}
// otherwise, save new event record.

View File

@@ -14,7 +14,7 @@ func (s *Server) HandleAdmin(w http.ResponseWriter, r *http.Request) {
case "/export":
log.I.F("export of event data requested on admin port")
store := s.relay.Storage(context.Bg())
store.Export(w)
store.Export(s.Ctx, w)
case "/import":
log.I.F("import of event data requested on admin port %s", r.RequestURI)
store := s.relay.Storage(context.Bg())
@@ -23,6 +23,6 @@ func (s *Server) HandleAdmin(w http.ResponseWriter, r *http.Request) {
case "/shutdown":
fmt.Fprintf(w, "shutting down")
defer r.Body.Close()
s.Shutdown(context.Bg())
s.Shutdown()
}
}

View File

@@ -4,11 +4,13 @@ import (
"bytes"
"context"
"crypto/rand"
"errors"
"fmt"
"net/http"
"sort"
"time"
"github.com/dgraph-io/badger/v4"
"github.com/fasthttp/websocket"
"golang.org/x/time/rate"
"realy.lol/auth"
@@ -458,7 +460,8 @@ func (s *Server) doReq(c Ctx, ws *web.Socket, req B, sto store.I) (r B) {
"this realy does not serve kind-4 to unauthenticated users," +
" does your client implement NIP-42?")
return notice
case senders.Contains(ws.AuthedBytes()) || receivers.ContainsAny(B("#p"), tag.New(ws.AuthedBytes())):
case senders.Contains(ws.AuthedBytes()) || receivers.ContainsAny(B("#p"),
tag.New(ws.AuthedBytes())):
log.T.F("user %0x allowed to query for DM", ws.AuthedBytes())
// allowed filter: ws.authed is sole receiver (filter specifies one or all senders)
default:
@@ -474,6 +477,9 @@ func (s *Server) doReq(c Ctx, ws *web.Socket, req B, sto store.I) (r B) {
events, err = sto.QueryEvents(c, f)
if err != nil {
log.E.F("eventstore: %v", err)
if errors.Is(err, badger.ErrDBClosed) {
return
}
continue
}

View File

@@ -28,6 +28,8 @@ import (
// The basic usage is to call Start or StartConf, which starts serving immediately.
// For a more fine-grained control, use NewServer.
type Server struct {
Ctx
Cancel context.F
options *Options
relay relay.I
clientsMu sync.Mutex
@@ -38,13 +40,11 @@ type Server struct {
authRequired bool
}
func (s *Server) Router() *http.ServeMux {
return s.serveMux
}
func (s *Server) Router() *http.ServeMux { return s.serveMux }
// NewServer initializes the realy and its storage using their respective Init methods,
// returning any non-nil errors, and returns a Server ready to listen for HTTP requests.
func NewServer(rl relay.I, dbPath S, opts ...Option) (*Server, E) {
func NewServer(c Ctx, cancel context.F, rl relay.I, dbPath S, opts ...Option) (*Server, E) {
options := DefaultOptions()
for _, opt := range opts {
opt(options)
@@ -54,6 +54,8 @@ func NewServer(rl relay.I, dbPath S, opts ...Option) (*Server, E) {
authRequired = ar.AuthEnabled()
}
srv := &Server{
Ctx: c,
Cancel: cancel,
relay: rl,
clients: make(map[*websocket.Conn]struct{}),
serveMux: http.NewServeMux(),
@@ -150,8 +152,10 @@ func (s *Server) Start(host S, port int, adminHost S, adminPort int, started ...
// If the realy is ShutdownAware, Shutdown calls its OnShutdown, passing the context as is.
// Note that the HTTP server make some time to shutdown and so the context deadline,
// if any, may have been shortened by the time OnShutdown is called.
func (s *Server) Shutdown(c context.T) {
func (s *Server) Shutdown() {
c := s.Ctx
log.I.Ln("shutting down relay")
s.Cancel()
s.clientsMu.Lock()
defer s.clientsMu.Unlock()
for conn := range s.clients {

View File

@@ -1,13 +1,13 @@
package realy
import (
"context"
"errors"
"net/http"
"testing"
"time"
"github.com/gobwas/ws/wsutil"
"realy.lol/context"
"realy.lol/ws"
)
@@ -23,15 +23,19 @@ func TestServerStartShutdown(t *testing.T) {
inited = true
return nil
},
onShutdown: func(context.Context) { shutdown = true },
onShutdown: func(context.T) { shutdown = true },
storage: &testStorage{
init: func() E { storeInited = true; return nil },
},
}
srv, _ := NewServer(rl, "")
srv, _ := NewServer(context.Bg(), rl, "")
ready := make(chan bool)
done := make(chan E)
go func() { done <- srv.Start("127.0.0.1", 0, ready); close(done) }()
go func() {
done <- srv.Start("127.0.0.1", 0,
"127.0.0.1", 9999, ready)
close(done)
}()
<-ready
// verify everything's initialized
@@ -48,7 +52,7 @@ func TestServerStartShutdown(t *testing.T) {
}
// verify server shuts down
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
ctx, cancel := context.Timeout(context.Bg(), 3*time.Second)
defer cancel()
srv.Shutdown(ctx)
if !shutdown {
@@ -66,10 +70,10 @@ func TestServerStartShutdown(t *testing.T) {
func TestServerShutdownWebsocket(t *testing.T) {
// set up a new relay server
srv := startTestRelay(t, &testRelay{storage: &testStorage{}})
srv := startTestRelay(context.Bg(), t, &testRelay{storage: &testStorage{}})
// connect a client to it
ctx1, cancel := context.WithTimeout(context.Background(), 2*time.Second)
ctx1, cancel := context.Timeout(context.Bg(), 2*time.Second)
defer cancel()
client, err := ws.RelayConnect(ctx1, "ws://"+srv.Addr)
if err != nil {
@@ -77,7 +81,7 @@ func TestServerShutdownWebsocket(t *testing.T) {
}
// now, shut down the server
ctx2, cancel := context.WithTimeout(context.Background(), 2*time.Second)
ctx2, cancel := context.Timeout(context.Bg(), 2*time.Second)
defer cancel()
srv.Shutdown(ctx2)

View File

@@ -11,11 +11,11 @@ import (
eventstore "realy.lol/store"
)
func startTestRelay(t *testing.T, tr *testRelay) *Server {
func startTestRelay(c context.T, t *testing.T, tr *testRelay) *Server {
t.Helper()
srv, _ := NewServer(tr, "")
srv, _ := NewServer(c, tr, "")
started := make(chan bool)
go srv.Start("127.0.0.1", 0, started)
go srv.Start("127.0.0.1", 0, "127.0.0.1", 0, started)
<-started
return srv
}
@@ -71,7 +71,7 @@ func (st *testStorage) Path() eventstore.S {
panic("implement me")
}
func (st *testStorage) Init(path S) E {
func (st *testStorage) Init() E {
if fn := st.init; fn != nil {
return fn()
}
@@ -85,23 +85,23 @@ func (st *testStorage) Close() (err E) {
return
}
func (st *testStorage) QueryEvents(ctx context.T, f *filter.T) (evs []*event.T, err E) {
func (st *testStorage) QueryEvents(c context.T, f *filter.T) (evs []*event.T, err E) {
if fn := st.queryEvents; fn != nil {
return fn(ctx, f)
return fn(c, f)
}
return nil, nil
}
func (st *testStorage) DeleteEvent(ctx context.T, evt *eventid.T) E {
func (st *testStorage) DeleteEvent(c context.T, evt *eventid.T) E {
if fn := st.deleteEvent; fn != nil {
return fn(ctx, evt)
return fn(c, evt)
}
return nil
}
func (st *testStorage) SaveEvent(ctx context.T, e *event.T) E {
func (st *testStorage) SaveEvent(c context.T, e *event.T) E {
if fn := st.saveEvent; fn != nil {
return fn(ctx, e)
return fn(c, e)
}
return nil
}

View File

@@ -35,7 +35,7 @@ type I interface {
// Import reads in a stream of line structured JSON of events to save into the store.
Import(r io.Reader)
// Export writes a stream of line structured JSON of all events in the store.
Export(w io.Writer)
Export(c Ctx, w io.Writer)
// Sync signals the event store to flush its buffers.
Sync() (err E)
}

View File

@@ -74,10 +74,11 @@ func (t *T) AddCap(i, c int) (tt *T) {
fmt.Fprint(os.Stderr, lol.GetNLoc(7))
return t
}
for len(t.t) <= i {
t.t = append(t.t, tag.NewWithCap(c))
n := i - len(t.t) + 1
for range n {
t.t = append(t.t, &tag.T{})
}
t.t[i] = tag.NewWithCap(c)
return t
}