From 23a57eb1c5e7d36dfcb231eb4d8b2f1cc5194fc1 Mon Sep 17 00:00:00 2001 From: mleku Date: Sun, 8 Sep 2024 20:16:34 +0100 Subject: [PATCH] add the files from prior work --- .gitignore | 39 +++++ alias.go | 15 ++ errors.go | 8 + go.mod | 45 ++++++ go.sum | 193 +++++++++++++++++++++++++ helpers.go | 12 ++ ratel/close.go | 22 +++ ratel/countevents.go | 14 ++ ratel/del/del.go | 9 ++ ratel/deleteevent.go | 79 ++++++++++ ratel/getecounterkey.go | 14 ++ ratel/getindexkeysforevent.go | 93 ++++++++++++ ratel/gettagkeyelements.go | 69 +++++++++ ratel/gettagkeyprefix.go | 56 +++++++ ratel/init.go | 98 +++++++++++++ ratel/keys/arb/arb.go | 85 +++++++++++ ratel/keys/arb/arb_test.go | 22 +++ ratel/keys/count/count.go | 45 ++++++ ratel/keys/createdat/createdat.go | 47 ++++++ ratel/keys/createdat/createdat_test.go | 25 ++++ ratel/keys/id/id.go | 58 ++++++++ ratel/keys/id/id_test.go | 24 +++ ratel/keys/index/index.go | 48 ++++++ ratel/keys/index/index_test.go | 19 +++ ratel/keys/index/prefixes.go | 141 ++++++++++++++++++ ratel/keys/keys.go | 43 ++++++ ratel/keys/keys_test.go | 127 ++++++++++++++++ ratel/keys/kinder/kind.go | 43 ++++++ ratel/keys/kinder/kind_test.go | 21 +++ ratel/keys/pubkey/pubkey.go | 75 ++++++++++ ratel/keys/pubkey/pubkey_test.go | 28 ++++ ratel/keys/serial/serial.go | 81 +++++++++++ ratel/keys/serial/serial_test.go | 22 +++ ratel/log.go | 64 ++++++++ ratel/main.go | 125 ++++++++++++++++ ratel/nuke.go | 28 ++++ ratel/preparequeries.go | 184 +++++++++++++++++++++++ ratel/queryevents.go | 158 ++++++++++++++++++++ ratel/saveevent.go | 129 +++++++++++++++++ relay_interface.go | 90 ++++++++++++ store_interface.go | 43 ++++++ utils.go | 37 +++++ 42 files changed, 2578 insertions(+) create mode 100644 .gitignore create mode 100644 alias.go create mode 100644 errors.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 helpers.go create mode 100644 ratel/close.go create mode 100644 ratel/countevents.go create mode 100644 ratel/del/del.go create mode 100644 ratel/deleteevent.go create mode 100644 ratel/getecounterkey.go create mode 100644 ratel/getindexkeysforevent.go create mode 100644 ratel/gettagkeyelements.go create mode 100644 ratel/gettagkeyprefix.go create mode 100644 ratel/init.go create mode 100644 ratel/keys/arb/arb.go create mode 100644 ratel/keys/arb/arb_test.go create mode 100644 ratel/keys/count/count.go create mode 100644 ratel/keys/createdat/createdat.go create mode 100644 ratel/keys/createdat/createdat_test.go create mode 100644 ratel/keys/id/id.go create mode 100644 ratel/keys/id/id_test.go create mode 100644 ratel/keys/index/index.go create mode 100644 ratel/keys/index/index_test.go create mode 100644 ratel/keys/index/prefixes.go create mode 100644 ratel/keys/keys.go create mode 100644 ratel/keys/keys_test.go create mode 100644 ratel/keys/kinder/kind.go create mode 100644 ratel/keys/kinder/kind_test.go create mode 100644 ratel/keys/pubkey/pubkey.go create mode 100644 ratel/keys/pubkey/pubkey_test.go create mode 100644 ratel/keys/serial/serial.go create mode 100644 ratel/keys/serial/serial_test.go create mode 100644 ratel/log.go create mode 100644 ratel/main.go create mode 100644 ratel/nuke.go create mode 100644 ratel/preparequeries.go create mode 100644 ratel/queryevents.go create mode 100644 ratel/saveevent.go create mode 100644 relay_interface.go create mode 100644 store_interface.go create mode 100644 utils.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5bef01e --- /dev/null +++ b/.gitignore @@ -0,0 +1,39 @@ +# Allowlisting gitignore template for GO projects prevents us +# from adding various unwanted local files, such as generated +# files, developer configurations or IDE-specific files etc. +# +# Recommended: Go.AllowList.gitignore + +# Ignore everything +* +/.idea/ +.replicatr +# But not these files... +!/.gitignore +!*.go +!go.sum +!go.mod +!README.md +!LICENSE +!*.sh +!Makefile +!*.json +!*.pdf +!*.csv +!*.py +!*.mediawiki +!*.did +!*.rs +!*.toml +!*.png +!*.svg +!*.md +!*.txt +!*.jsonl +!*.bin +!*.tmpl +!.gitmodules +!*.yml +!*.yaml +# ...even if they are in subdirectories +!*/ diff --git a/alias.go b/alias.go new file mode 100644 index 0000000..e0f611e --- /dev/null +++ b/alias.go @@ -0,0 +1,15 @@ +package eventstore + +import ( + "net/http" + "nostr.mleku.dev/protocol/ws" + + "nostr.mleku.dev/codec/envelopes/okenvelope" + "nostr.mleku.dev/codec/subscriptionid" +) + +type SubID = subscriptionid.T +type WS = *ws.Serv +type Responder = http.ResponseWriter +type Req = *http.Request +type OK = okenvelope.T diff --git a/errors.go b/errors.go new file mode 100644 index 0000000..8f84c1e --- /dev/null +++ b/errors.go @@ -0,0 +1,8 @@ +package eventstore + +import "errors" + +var ( + ErrDupEvent = errors.New("duplicate: event already exists") + ErrEventNotExists = errors.New("unknown: event not known by any source of this relay") +) diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..f4bb25a --- /dev/null +++ b/go.mod @@ -0,0 +1,45 @@ +module store.mleku.dev + +go 1.23.0 + +require ( + ec.mleku.dev/v2 v2.3.5 + github.com/dgraph-io/badger/v4 v4.3.0 + github.com/minio/sha256-simd v1.0.1 + lukechampine.com/frand v1.4.2 + nostr.mleku.dev v1.0.14 + util.mleku.dev v1.0.4 +) + +require ( + github.com/aead/chacha20 v0.0.0-20180709150244-8b13a72661da // indirect + github.com/andybalholm/brotli v1.1.0 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/dgraph-io/ristretto v0.1.2-0.20240116140435-c67e07994f91 // indirect + github.com/dustin/go-humanize v1.0.1 // indirect + github.com/fasthttp/websocket v1.5.10 // indirect + github.com/fatih/color v1.17.0 // indirect + github.com/gobwas/httphead v0.1.0 // indirect + github.com/gobwas/pool v0.2.1 // indirect + github.com/gobwas/ws v1.4.0 // indirect + github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect + github.com/golang/protobuf v1.5.4 // indirect + github.com/google/flatbuffers v1.12.1 // indirect + github.com/klauspost/compress v1.17.9 // indirect + github.com/klauspost/cpuid/v2 v2.2.8 // indirect + github.com/mattn/go-colorable v0.1.13 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/puzpuzpuz/xsync/v3 v3.4.0 // indirect + github.com/savsgio/gotils v0.0.0-20240704082632-aef3928b8a38 // indirect + github.com/templexxx/cpu v0.1.1 // indirect + github.com/templexxx/xhex v0.0.0-20200614015412-aed53437177b // indirect + github.com/valyala/bytebufferpool v1.0.0 // indirect + github.com/valyala/fasthttp v1.55.0 // indirect + go.opencensus.io v0.24.0 // indirect + golang.org/x/net v0.29.0 // indirect + golang.org/x/sys v0.25.0 // indirect + google.golang.org/protobuf v1.33.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..76d941f --- /dev/null +++ b/go.sum @@ -0,0 +1,193 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +ec.mleku.dev/v2 v2.3.5 h1:95cTJn4EemxzvdejpdZSse6w79pYx1Ty3nL2zDLKqQU= +ec.mleku.dev/v2 v2.3.5/go.mod h1:4hK39Si4F2Aav4H4jBtzSUR7xlFxeuS4pPK3t0Ol8VQ= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/aead/chacha20 v0.0.0-20180709150244-8b13a72661da h1:KjTM2ks9d14ZYCvmHS9iAKVt9AyzRSqNU1qabPih5BY= +github.com/aead/chacha20 v0.0.0-20180709150244-8b13a72661da/go.mod h1:eHEWzANqSiWQsof+nXEI9bUVUyV6F53Fp89EuCh2EAA= +github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M= +github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY= +github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgraph-io/badger/v4 v4.3.0 h1:lcsCE1/1qrRhqP+zYx6xDZb8n7U+QlwNicpc676Ub40= +github.com/dgraph-io/badger/v4 v4.3.0/go.mod h1:Sc0T595g8zqAQRDf44n+z3wG4BOqLwceaFntt8KPxUM= +github.com/dgraph-io/ristretto v0.1.2-0.20240116140435-c67e07994f91 h1:Pux6+xANi0I7RRo5E1gflI4EZ2yx3BGZ75JkAIvGEOA= +github.com/dgraph-io/ristretto v0.1.2-0.20240116140435-c67e07994f91/go.mod h1:swkazRqnUf1N62d0Nutz7KIj2UKqsm/H8tD0nBJAXqM= +github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 h1:fAjc9m62+UWV/WAFKLNi6ZS0675eEUC9y3AlwSbQu1Y= +github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= +github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= +github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/fasthttp/websocket v1.5.10 h1:bc7NIGyrg1L6sd5pRzCIbXpro54SZLEluZCu0rOpcN4= +github.com/fasthttp/websocket v1.5.10/go.mod h1:BwHeuXGWzCW1/BIKUKD3+qfCl+cTdsHu/f243NcAI/Q= +github.com/fatih/color v1.17.0 h1:GlRw1BRJxkpqUCBKzKOw098ed57fEsKeNjpTe3cSjK4= +github.com/fatih/color v1.17.0/go.mod h1:YZ7TlrGPkiz6ku9fK3TLD/pl3CpsiFyu8N92HLgmosI= +github.com/gobwas/httphead v0.1.0 h1:exrUm0f4YX0L7EBwZHuCF4GDp8aJfVeBrlLQrs6NqWU= +github.com/gobwas/httphead v0.1.0/go.mod h1:O/RXo79gxV8G+RqlR/otEwx4Q36zl9rqC5u12GKvMCM= +github.com/gobwas/pool v0.2.1 h1:xfeeEhW7pwmX8nuLVlqbzVc7udMDrwetjEv+TZIz1og= +github.com/gobwas/pool v0.2.1/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw= +github.com/gobwas/ws v1.4.0 h1:CTaoG1tojrh4ucGPcoJFiAQUAsEWekEWvLy7GsVNqGs= +github.com/gobwas/ws v1.4.0/go.mod h1:G3gNqMNtPppf5XUz7O4shetPpcZ1VJ7zt18dlUeakrc= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e h1:1r7pUrabqp18hOBcwBwiTsbnFeTZHV9eER/QT5JVZxY= +github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= +github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/flatbuffers v1.12.1 h1:MVlul7pQNoDzWRLTw5imwYsl+usrS1TXG2H4jg6ImGw= +github.com/google/flatbuffers v1.12.1/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= +github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= +github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/klauspost/cpuid/v2 v2.2.8 h1:+StwCXwm9PdpiEkPyzBXIy+M9KUb4ODm0Zarf1kS5BM= +github.com/klauspost/cpuid/v2 v2.2.8/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/minio/sha256-simd v1.0.1 h1:6kaan5IFmwTNynnKKpDHe6FWHohJOHhCPchzK49dzMM= +github.com/minio/sha256-simd v1.0.1/go.mod h1:Pz6AKMiUdngCLpeTL/RJY1M9rUuPMYujV5xJjtbRSN8= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/puzpuzpuz/xsync/v3 v3.4.0 h1:DuVBAdXuGFHv8adVXjWWZ63pJq+NRXOWVXlKDBZ+mJ4= +github.com/puzpuzpuz/xsync/v3 v3.4.0/go.mod h1:VjzYrABPabuM4KyBh1Ftq6u8nhwY5tBPKP9jpmh0nnA= +github.com/savsgio/gotils v0.0.0-20240704082632-aef3928b8a38 h1:D0vL7YNisV2yqE55+q0lFuGse6U8lxlg7fYTctlT5Gc= +github.com/savsgio/gotils v0.0.0-20240704082632-aef3928b8a38/go.mod h1:sM7Mt7uEoCeFSCBM+qBrqvEo+/9vdmj19wzp3yzUhmg= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/templexxx/cpu v0.0.1/go.mod h1:w7Tb+7qgcAlIyX4NhLuDKt78AHA5SzPmq0Wj6HiEnnk= +github.com/templexxx/cpu v0.1.1 h1:isxHaxBXpYFWnk2DReuKkigaZyrjs2+9ypIdGP4h+HI= +github.com/templexxx/cpu v0.1.1/go.mod h1:w7Tb+7qgcAlIyX4NhLuDKt78AHA5SzPmq0Wj6HiEnnk= +github.com/templexxx/xhex v0.0.0-20200614015412-aed53437177b h1:XeDLE6c9mzHpdv3Wb1+pWBaWv/BlHK0ZYIu/KaL6eHg= +github.com/templexxx/xhex v0.0.0-20200614015412-aed53437177b/go.mod h1:7rwmCH0wC2fQvNEvPZ3sKXukhyCTyiaZ5VTZMQYpZKQ= +github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= +github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= +github.com/valyala/fasthttp v1.55.0 h1:Zkefzgt6a7+bVKHnu/YaYSOPfNYNisSVBo/unVCf8k8= +github.com/valyala/fasthttp v1.55.0/go.mod h1:NkY9JtkrpPKmgwV3HTaS2HWaJss9RSIsRVfcxxoHiOM= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= +go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo= +golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= +golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= +google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= +google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= +google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= +google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +lukechampine.com/frand v1.4.2 h1:RzFIpOvkMXuPMBb9maa4ND4wjBn71E1Jpf8BzJHMaVw= +lukechampine.com/frand v1.4.2/go.mod h1:4S/TM2ZgrKejMcKMbeLjISpJMO+/eZ1zu3vYX9dtj3s= +nostr.mleku.dev v1.0.14 h1:7F+LG5m1JXyrCCNv4E4R6UsDZv9NTuJcW7+pucLt2Uo= +nostr.mleku.dev v1.0.14/go.mod h1:xqzXOp6jwWEmVK0ueudf2nx8WKvGKzDKlIRRArJP9IQ= +util.mleku.dev v1.0.4 h1:MwWf6aMWUp/7MaLXPByHvqkBMo/v7s9qOe84pkC/dj0= +util.mleku.dev v1.0.4/go.mod h1:7XKjqFf40X+7TYYnacv0X82r/3EFRhYbeHZR/ENWuDo= diff --git a/helpers.go b/helpers.go new file mode 100644 index 0000000..0dd62ca --- /dev/null +++ b/helpers.go @@ -0,0 +1,12 @@ +package eventstore + +import ( + "bytes" + + "nostr.mleku.dev/codec/event" +) + +func isOlder(prev, next *event.T) bool { + return prev.CreatedAt.I64() < next.CreatedAt.I64() || + (prev.CreatedAt == next.CreatedAt && bytes.Compare(prev.ID, next.ID) < 0) +} diff --git a/ratel/close.go b/ratel/close.go new file mode 100644 index 0000000..b06cd70 --- /dev/null +++ b/ratel/close.go @@ -0,0 +1,22 @@ +package ratel + +import ( + . "nostr.mleku.dev" +) + +func (r *T) Close() (err E) { + Log.I.F("closing database %s", r.Path()) + if err = r.DB.Flatten(4); Chk.E(err) { + return + } + Log.D.F("database flattened") + if err = r.seq.Release(); Chk.E(err) { + return + } + Log.D.F("database released") + if err = r.DB.Close(); Chk.E(err) { + return + } + Log.I.F("database closed") + return +} diff --git a/ratel/countevents.go b/ratel/countevents.go new file mode 100644 index 0000000..ec5bfea --- /dev/null +++ b/ratel/countevents.go @@ -0,0 +1,14 @@ +package ratel + +import ( + . "nostr.mleku.dev" + "nostr.mleku.dev/codec/event" + "nostr.mleku.dev/codec/filter" +) + +func (r *T) CountEvents(c Ctx, f *filter.T) (count N, err E) { + var evs []*event.T + evs, err = r.QueryEvents(c, f) + count = len(evs) + return +} diff --git a/ratel/del/del.go b/ratel/del/del.go new file mode 100644 index 0000000..8e6fea1 --- /dev/null +++ b/ratel/del/del.go @@ -0,0 +1,9 @@ +package del + +import "bytes" + +type Items [][]byte + +func (c Items) Len() int { return len(c) } +func (c Items) Less(i, j int) bool { return bytes.Compare(c[i], c[j]) < 0 } +func (c Items) Swap(i, j int) { c[i], c[j] = c[j], c[i] } diff --git a/ratel/deleteevent.go b/ratel/deleteevent.go new file mode 100644 index 0000000..b2263dc --- /dev/null +++ b/ratel/deleteevent.go @@ -0,0 +1,79 @@ +package ratel + +import ( + "github.com/dgraph-io/badger/v4" + . "nostr.mleku.dev" + "nostr.mleku.dev/codec/event" + "nostr.mleku.dev/codec/eventid" + "store.mleku.dev/ratel/keys" + "store.mleku.dev/ratel/keys/id" + "store.mleku.dev/ratel/keys/index" + "store.mleku.dev/ratel/keys/serial" +) + +func (r *T) DeleteEvent(c Ctx, eid *eventid.T) (err E) { + var foundSerial []byte + seri := serial.New(nil) + err = r.View(func(txn *badger.Txn) (err error) { + // query event by id to ensure we don't try to save duplicates + prf := index.Id.Key(id.New(eid)) + it := txn.NewIterator(badger.IteratorOptions{}) + defer it.Close() + it.Seek(prf) + if it.ValidForPrefix(prf) { + var k []byte + // get the serial + k = it.Item().Key() + // copy serial out + keys.Read(k, index.Empty(), id.New(eventid.New()), seri) + // save into foundSerial + foundSerial = seri.Val + } + return + }) + if Chk.E(err) { + return + } + if foundSerial == nil { + return + } + var indexKeys []B + ev := &event.T{} + var evKey, evb, counterKey B + // fetch the event to get its index keys + err = r.View(func(txn *badger.Txn) (err error) { + // retrieve the event record + evKey = keys.Write(index.New(index.Event), seri) + it := txn.NewIterator(badger.IteratorOptions{}) + defer it.Close() + it.Seek(evKey) + if it.ValidForPrefix(evKey) { + if evb, err = it.Item().ValueCopy(evb); Chk.E(err) { + return + } + if _, err = ev.MarshalJSON(evb); Chk.E(err) { + return + } + indexKeys = GetIndexKeysForEvent(ev, seri) + counterKey = GetCounterKey(seri) + return + } + return + }) + if Chk.E(err) { + return + } + err = r.Update(func(txn *badger.Txn) (err E) { + if err = txn.Delete(evKey); Chk.E(err) { + } + for _, key := range indexKeys { + if err = txn.Delete(key); Chk.E(err) { + } + } + if err = txn.Delete(counterKey); Chk.E(err) { + return + } + return + }) + return +} diff --git a/ratel/getecounterkey.go b/ratel/getecounterkey.go new file mode 100644 index 0000000..64ff8ad --- /dev/null +++ b/ratel/getecounterkey.go @@ -0,0 +1,14 @@ +package ratel + +import ( + . "nostr.mleku.dev" + "store.mleku.dev/ratel/keys/index" + "store.mleku.dev/ratel/keys/serial" +) + +// GetCounterKey returns the proper counter key for a given event ID. +func GetCounterKey(ser *serial.T) (key B) { + key = index.Counter.Key(ser) + // Log.T.F("counter key %d %d", index.Counter, ser.Uint64()) + return +} diff --git a/ratel/getindexkeysforevent.go b/ratel/getindexkeysforevent.go new file mode 100644 index 0000000..f558727 --- /dev/null +++ b/ratel/getindexkeysforevent.go @@ -0,0 +1,93 @@ +package ratel + +import ( + . "nostr.mleku.dev" + "nostr.mleku.dev/codec/event" + "nostr.mleku.dev/codec/eventid" + "nostr.mleku.dev/codec/tag" + "store.mleku.dev/ratel/keys" + "store.mleku.dev/ratel/keys/createdat" + "store.mleku.dev/ratel/keys/id" + "store.mleku.dev/ratel/keys/index" + "store.mleku.dev/ratel/keys/kinder" + "store.mleku.dev/ratel/keys/pubkey" + "store.mleku.dev/ratel/keys/serial" +) + +// GetIndexKeysForEvent generates all the index keys required to filter for +// events. evtSerial should be the output of Serial() which gets a unique, +// monotonic counter value for each new event. +func GetIndexKeysForEvent(ev *event.T, ser *serial.T) (keyz [][]byte) { + + var err error + keyz = make([][]byte, 0, 18) + ID := id.New(eventid.NewWith(ev.ID)) + CA := createdat.New(ev.CreatedAt) + K := kinder.New(ev.Kind.ToU16()) + PK, _ := pubkey.New(ev.PubKey) + // indexes + { // ~ by id + k := index.Id.Key(ID, ser) + // Log.T.F("id key: %x %0x %0x", k[0], k[1:9], k[9:]) + keyz = append(keyz, k) + } + { // ~ by pubkey+date + k := index.Pubkey.Key(PK, CA, ser) + // Log.T.F("pubkey + date key: %x %0x %0x %0x", + // k[0], k[1:9], k[9:17], k[17:]) + keyz = append(keyz, k) + } + { // ~ by kind+date + k := index.Kind.Key(K, CA, ser) + Log.T.F("kind + date key: %x %0x %0x %0x", + k[0], k[1:3], k[3:11], k[11:]) + keyz = append(keyz, k) + } + { // ~ by pubkey+kind+date + k := index.PubkeyKind.Key(PK, K, CA, ser) + // Log.T.F("pubkey + kind + date key: %x %0x %0x %0x %0x", + // k[0], k[1:9], k[9:11], k[11:19], k[19:]) + keyz = append(keyz, k) + } + // ~ by tag value + date + for i, t := range ev.Tags.T { + // there is no value field + if len(t.Field) < 2 || + // the tag is not a-zA-Z probably (this would permit arbitrary other + // single byte chars) + len(t.Field[0]) != 1 || + // the second field is zero length + len(t.Field[1]) == 0 || + // the second field is more than 100 characters long + len(t.Field[1]) > 100 { + // any of the above is true then the tag is not indexable + continue + } + var firstIndex int + var tt *tag.T + for firstIndex, tt = range ev.Tags.T { + if len(tt.Field) >= 2 && Equals(tt.Field[1], t.Field[1]) { + break + } + } + if firstIndex != i { + // duplicate + continue + } + // get key prefix (with full length) and offset where to write the last + // parts + prf, elems := index.P(0), []keys.Element(nil) + if prf, elems, err = GetTagKeyElements(S(t.Field[1]), CA, ser); Chk.E(err) { + return + } + k := prf.Key(elems...) + Log.T.F("tag '%s': %s key %0x", t.Field[0], t.Field[1:], k) + keyz = append(keyz, k) + } + { // ~ by date only + k := index.CreatedAt.Key(CA, ser) + // Log.T.F("date key: %x %0x %0x", k[0], k[1:9], k[9:]) + keyz = append(keyz, k) + } + return +} diff --git a/ratel/gettagkeyelements.go b/ratel/gettagkeyelements.go new file mode 100644 index 0000000..2144156 --- /dev/null +++ b/ratel/gettagkeyelements.go @@ -0,0 +1,69 @@ +package ratel + +import ( + "strconv" + "strings" + + . "nostr.mleku.dev" + "store.mleku.dev/ratel/keys" + "store.mleku.dev/ratel/keys/arb" + "store.mleku.dev/ratel/keys/createdat" + "store.mleku.dev/ratel/keys/index" + "store.mleku.dev/ratel/keys/kinder" + "store.mleku.dev/ratel/keys/pubkey" + "store.mleku.dev/ratel/keys/serial" + + "ec.mleku.dev/v2/schnorr" + "util.mleku.dev/hex" +) + +func GetTagKeyElements(tagValue string, CA *createdat.T, + ser *serial.T) (prf index.P, + elems []keys.Element, err error) { + + var pkb []byte + // first check if it might be a public key, fastest test + if len(tagValue) == 2*schnorr.PubKeyBytesLen { + // this could be a pubkey + pkb, err = hex.Dec(tagValue) + if err == nil { + // it's a pubkey + var pkk keys.Element + if pkk, err = pubkey.NewFromBytes(pkb); Chk.E(err) { + return + } + prf, elems = index.Tag32, keys.Make(pkk, ser) + return + } + } + // check for a tag + if strings.Count(tagValue, ":") == 2 { + // this means we will get 3 pieces here + split := strings.Split(tagValue, ":") + // middle element should be a public key so must be 64 hex ciphers + if len(split[1]) != schnorr.PubKeyBytesLen*2 { + return + } + var k uint16 + var d string + if pkb, err = hex.Dec(split[1]); !Chk.E(err) { + var kin uint64 + if kin, err = strconv.ParseUint(split[0], 10, 16); err == nil { + k = uint16(kin) + d = split[2] + var pk *pubkey.T + if pk, err = pubkey.NewFromBytes(pkb); Chk.E(err) { + return + } + prf = index.TagAddr + elems = keys.Make(kinder.New(k), pk, arb.NewFromString(d), CA, + ser) + return + } + } + } + // store whatever as utf-8 + prf = index.Tag + elems = keys.Make(arb.NewFromString(tagValue), CA, ser) + return +} diff --git a/ratel/gettagkeyprefix.go b/ratel/gettagkeyprefix.go new file mode 100644 index 0000000..d9d8831 --- /dev/null +++ b/ratel/gettagkeyprefix.go @@ -0,0 +1,56 @@ +package ratel + +import ( + . "nostr.mleku.dev" + "store.mleku.dev" + "store.mleku.dev/ratel/keys" + "store.mleku.dev/ratel/keys/arb" + "store.mleku.dev/ratel/keys/index" + "store.mleku.dev/ratel/keys/kinder" + "store.mleku.dev/ratel/keys/pubkey" + "util.mleku.dev/hex" +) + +// GetTagKeyPrefix returns tag index prefixes based on the initial field of a +// tag. +// +// There is 3 types of index tag keys: +// +// - TagAddr: [ 8 ][ 2b Kind ][ 8b Pubkey ][ address/URL ][ 8b Serial ] +// +// - Tag32: [ 7 ][ 8b Pubkey ][ 8b Serial ] +// +// - Tag: [ 6 ][ address/URL ][ 8b Serial ] +// +// This function produces the initial bytes without the index. +func GetTagKeyPrefix(tagValue string) (key []byte, err error) { + if k, pkb, d := eventstore.GetAddrTagElements(tagValue); len(pkb) == 32 { + // store value in the new special "a" tag index + var pk *pubkey.T + if pk, err = pubkey.NewFromBytes(pkb); Chk.E(err) { + return + } + els := []keys.Element{kinder.New(k), pk} + if len(d) > 0 { + els = append(els, arb.NewFromString(d)) + } + key = index.TagAddr.Key(els...) + } else if pkb, _ := hex.Dec(tagValue); len(pkb) == 32 { + // store value as bytes + var pkk *pubkey.T + if pkk, err = pubkey.NewFromBytes(pkb); Chk.E(err) { + return + } + key = index.Tag32.Key(pkk) + } else { + // store whatever as utf-8 + if len(tagValue) > 0 { + var a *arb.T + a = arb.NewFromString(tagValue) + key = index.Tag.Key(a) + } else { + key = index.Tag.Key() + } + } + return +} diff --git a/ratel/init.go b/ratel/init.go new file mode 100644 index 0000000..eb6b6ea --- /dev/null +++ b/ratel/init.go @@ -0,0 +1,98 @@ +package ratel + +import ( + "encoding/binary" + "errors" + "fmt" + + . "nostr.mleku.dev" + "store.mleku.dev/ratel/keys/index" + + "github.com/dgraph-io/badger/v4" + "github.com/dgraph-io/badger/v4/options" + "util.mleku.dev/units" +) + +func (r *T) Init(path S) (err E) { + r.dataDir = path + Log.I.Ln("opening ratel event store at", r.Path()) + opts := badger.DefaultOptions(r.dataDir) + opts.BlockCacheSize = int64(r.BlockCacheSize) + opts.BlockSize = units.Mb + opts.CompactL0OnClose = true + opts.LmaxCompaction = true + opts.Compression = options.None + // opts.Compression = options.ZSTD + r.Logger = NewLogger(r.InitLogLevel, r.dataDir) + opts.Logger = r.Logger + if r.DB, err = badger.Open(opts); Chk.E(err) { + return err + } + Log.T.Ln("getting event store sequence index", r.dataDir) + if r.seq, err = r.DB.GetSequence([]byte("events"), 1000); Chk.E(err) { + return err + } + Log.T.Ln("running migrations", r.dataDir) + if err = r.runMigrations(); Chk.E(err) { + return Log.E.Err("error running migrations: %w; %s", err, r.dataDir) + } + if r.DBSizeLimit > 0 { + // go r.GarbageCollector() + } else { + // go r.GCCount() + } + return nil + +} + +const Version = 1 + +func (r *T) runMigrations() (err error) { + return r.Update(func(txn *badger.Txn) (err error) { + var version uint16 + var item *badger.Item + item, err = txn.Get([]byte{index.Version.B()}) + if errors.Is(err, badger.ErrKeyNotFound) { + version = 0 + } else if Chk.E(err) { + return err + } else { + Chk.E(item.Value(func(val []byte) (err error) { + version = binary.BigEndian.Uint16(val) + return + })) + } + // do the migrations in increasing steps (there is no rollback) + if version < Version { + // if there is any data in the relay we will stop and notify the user, otherwise we + // just set version to 1 and proceed + prefix := []byte{index.Id.B()} + it := txn.NewIterator(badger.IteratorOptions{ + PrefetchValues: true, + PrefetchSize: 100, + Prefix: prefix, + }) + defer it.Close() + hasAnyEntries := false + for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() { + hasAnyEntries = true + break + } + if hasAnyEntries { + return fmt.Errorf("your database is at version %d, but in order to migrate up "+ + "to version 1 you must manually export all the events and then import "+ + "again:\n"+ + "run an old version of this software, export the data, then delete the "+ + "database files, run the new version, import the data back it", version) + } + Chk.E(r.bumpVersion(txn, Version)) + } + return nil + }) +} + +func (r *T) bumpVersion(txn *badger.Txn, version uint16) error { + buf := make([]byte, 2) + binary.BigEndian.PutUint16(buf, version) + return txn.Set([]byte{index.Version.B()}, buf) +} diff --git a/ratel/keys/arb/arb.go b/ratel/keys/arb/arb.go new file mode 100644 index 0000000..0be5d67 --- /dev/null +++ b/ratel/keys/arb/arb.go @@ -0,0 +1,85 @@ +package arb + +import ( + "bytes" + + . "nostr.mleku.dev" + "store.mleku.dev/ratel/keys" +) + +// T is an arbitrary length byte string. In any construction there can only be one with arbitrary length. Custom lengths +// can be created by calling New with the custom length in it, both for Read and Write operations. +type T struct { + Val B +} + +var _ keys.Element = &T{} + +// New creates a new arb.T. This must have the expected length for the provided byte slice as this is what the Read +// method will aim to copy. In general this will be a bounded field, either the final or only arbitrary length field in +// a key. +func New(b B) (p *T) { + if len(b) == 0 { + Log.T.Ln("empty or nil slice is the same as zero value, " + + "use keys.ReadWithArbElem") + return &T{} + } + return &T{Val: b} +} + +func NewWithLen(l int) (p *T) { return &T{Val: make([]byte, l)} } +func NewFromString(s S) (p *T) { return New([]byte(s)) } + +func (p *T) Write(buf *bytes.Buffer) { + if len(p.Val) == 0 { + Log.T.Ln("empty slice has no effect") + return + } + buf.Write(p.Val) +} + +func (p *T) Read(buf *bytes.Buffer) (el keys.Element) { + if len(p.Val) < 1 { + Log.T.Ln("empty slice has no effect") + return + } + if _, err := buf.Read(p.Val); Chk.E(err) { + return nil + } + return p +} + +func (p *T) Len() int { + if p == nil { + panic("uninitialized pointer to arb.T") + } + return len(p.Val) +} + +// ReadWithArbElem is a variant of Read that recognises an arbitrary length element by its zero length and imputes its +// actual length by the byte buffer size and the lengths of the fixed length fields. +// +// For reasons of space efficiency, it is not practical to use TLVs for badger database key fields, so this will panic +// if there is more than one arbitrary length element. +func ReadWithArbElem(b B, elems ...keys.Element) { + var arbEl int + var arbSet bool + l := len(b) + for i, el := range elems { + elLen := el.Len() + l -= elLen + if elLen == 0 { + if arbSet { + panic("cannot have more than one arbitrary length field in a key") + } + arbEl = i + arbSet = true + } + } + // now we can say that the remainder is the correct length for the arb element + elems[arbEl] = New(make([]byte, l)) + buf := bytes.NewBuffer(b) + for _, el := range elems { + el.Read(buf) + } +} diff --git a/ratel/keys/arb/arb_test.go b/ratel/keys/arb/arb_test.go new file mode 100644 index 0000000..d713c6e --- /dev/null +++ b/ratel/keys/arb/arb_test.go @@ -0,0 +1,22 @@ +package arb + +import ( + "bytes" + "testing" + + "lukechampine.com/frand" +) + +func TestT(t *testing.T) { + randomBytes := frand.Bytes(frand.Intn(128)) + v := New(randomBytes) + buf := new(bytes.Buffer) + v.Write(buf) + randomCopy := make([]byte, len(randomBytes)) + buf2 := bytes.NewBuffer(buf.Bytes()) + v2 := New(randomCopy) + el := v2.Read(buf2).(*T) + if bytes.Compare(el.Val, v.Val) != 0 { + t.Fatalf("expected %x got %x", v.Val, el.Val) + } +} diff --git a/ratel/keys/count/count.go b/ratel/keys/count/count.go new file mode 100644 index 0000000..5e6c27c --- /dev/null +++ b/ratel/keys/count/count.go @@ -0,0 +1,45 @@ +package count + +import ( + "nostr.mleku.dev/codec/timestamp" +) + +type Item struct { + Serial uint64 + Size uint32 + Freshness *timestamp.T +} + +type Items []*Item + +func (c Items) Len() int { return len(c) } +func (c Items) Less(i, j int) bool { return c[i].Freshness.I64() < c[j].Freshness.I64() } +func (c Items) Swap(i, j int) { c[i], c[j] = c[j], c[i] } +func (c Items) Total() (total int) { + for i := range c { + total += int(c[i].Size) + } + return +} + +type ItemsBySerial []*Item + +func (c ItemsBySerial) Len() int { return len(c) } +func (c ItemsBySerial) Less(i, j int) bool { return c[i].Serial < c[j].Serial } +func (c ItemsBySerial) Swap(i, j int) { c[i], c[j] = c[j], c[i] } +func (c ItemsBySerial) Total() (total int) { + for i := range c { + total += int(c[i].Size) + } + return +} + +type Fresh struct { + Serial uint64 + Freshness *timestamp.T +} +type Freshes []*Fresh + +func (c Freshes) Len() int { return len(c) } +func (c Freshes) Less(i, j int) bool { return c[i].Serial < c[j].Serial } +func (c Freshes) Swap(i, j int) { c[i], c[j] = c[j], c[i] } diff --git a/ratel/keys/createdat/createdat.go b/ratel/keys/createdat/createdat.go new file mode 100644 index 0000000..9a1a859 --- /dev/null +++ b/ratel/keys/createdat/createdat.go @@ -0,0 +1,47 @@ +package createdat + +import ( + "bytes" + "encoding/binary" + + . "nostr.mleku.dev" + "nostr.mleku.dev/codec/timestamp" + "store.mleku.dev/ratel/keys" + "store.mleku.dev/ratel/keys/serial" +) + +const Len = 8 + +type T struct { + Val *timestamp.T +} + +var _ keys.Element = &T{} + +func New(c *timestamp.T) (p *T) { return &T{Val: c} } + +func (c *T) Write(buf *bytes.Buffer) { + buf.Write(c.Val.Bytes()) +} + +func (c *T) Read(buf *bytes.Buffer) (el keys.Element) { + b := make([]byte, Len) + if n, err := buf.Read(b); Chk.E(err) || n != Len { + return nil + } + c.Val = timestamp.FromUnix(int64(binary.BigEndian.Uint64(b))) + return c +} + +func (c *T) Len() int { return Len } + +// FromKey expects to find a datestamp in the 8 bytes before a serial in a key. +func FromKey(k []byte) (p *T) { + if len(k) < Len+serial.Len { + err := Errorf.F("cannot get a serial without at least %d bytes", Len+serial.Len) + panic(err) + } + key := make([]byte, 0, Len) + key = append(key, k[len(k)-Len-serial.Len:len(k)-serial.Len]...) + return &T{Val: timestamp.FromBytes(key)} +} diff --git a/ratel/keys/createdat/createdat_test.go b/ratel/keys/createdat/createdat_test.go new file mode 100644 index 0000000..469b607 --- /dev/null +++ b/ratel/keys/createdat/createdat_test.go @@ -0,0 +1,25 @@ +package createdat + +import ( + "bytes" + "math" + "testing" + + "lukechampine.com/frand" + "nostr.mleku.dev/codec/timestamp" +) + +func TestT(t *testing.T) { + for _ = range 1000000 { + n := timestamp.FromUnix(int64(frand.Intn(math.MaxInt64))) + v := New(n) + buf := new(bytes.Buffer) + v.Write(buf) + buf2 := bytes.NewBuffer(buf.Bytes()) + v2 := New(timestamp.New()) + el := v2.Read(buf2).(*T) + if el.Val.Int() != n.Int() { + t.Fatalf("expected %d got %d", n.Int(), el.Val.Int()) + } + } +} diff --git a/ratel/keys/id/id.go b/ratel/keys/id/id.go new file mode 100644 index 0000000..dac7303 --- /dev/null +++ b/ratel/keys/id/id.go @@ -0,0 +1,58 @@ +package id + +import ( + "bytes" + "fmt" + . "nostr.mleku.dev" + "store.mleku.dev/ratel/keys" + "strings" + + "nostr.mleku.dev/codec/eventid" + "util.mleku.dev/hex" +) + +const Len = 8 + +type T struct { + Val []byte +} + +var _ keys.Element = &T{} + +func New(evID ...*eventid.T) (p *T) { + if len(evID) < 1 || len(evID[0].String()) < 1 { + return &T{make([]byte, Len)} + } + evid := evID[0].String() + if len(evid) < 64 { + evid = strings.Repeat("0", 64-len(evid)) + evid + } + if len(evid) > 64 { + evid = evid[:64] + } + b, err := hex.Dec(evid[:Len*2]) + if Chk.E(err) { + return + } + return &T{Val: b} +} + +func (p *T) Write(buf *bytes.Buffer) { + if len(p.Val) != Len { + panic(fmt.Sprintln("must use New or initialize Val with len", Len)) + } + buf.Write(p.Val) +} + +func (p *T) Read(buf *bytes.Buffer) (el keys.Element) { + // allow uninitialized struct + if len(p.Val) != Len { + p.Val = make([]byte, Len) + } + if n, err := buf.Read(p.Val); Chk.E(err) || n != Len { + return nil + } + return p +} + +func (p *T) Len() int { return Len } diff --git a/ratel/keys/id/id_test.go b/ratel/keys/id/id_test.go new file mode 100644 index 0000000..23f0ec3 --- /dev/null +++ b/ratel/keys/id/id_test.go @@ -0,0 +1,24 @@ +package id + +import ( + "bytes" + "testing" + + sha256 "github.com/minio/sha256-simd" + "lukechampine.com/frand" + "nostr.mleku.dev/codec/eventid" +) + +func TestT(t *testing.T) { + fakeIdBytes := frand.Bytes(sha256.Size) + id := eventid.NewWith(fakeIdBytes) + v := New(id) + buf := new(bytes.Buffer) + v.Write(buf) + buf2 := bytes.NewBuffer(buf.Bytes()) + v2 := New() + el := v2.Read(buf2).(*T) + if bytes.Compare(el.Val, v.Val) != 0 { + t.Fatalf("expected %x got %x", v.Val, el.Val) + } +} diff --git a/ratel/keys/index/index.go b/ratel/keys/index/index.go new file mode 100644 index 0000000..c9b79d0 --- /dev/null +++ b/ratel/keys/index/index.go @@ -0,0 +1,48 @@ +package index + +import ( + "bytes" + "fmt" + . "nostr.mleku.dev" + "store.mleku.dev/ratel/keys" +) + +const Len = 1 + +type T struct { + Val []byte +} + +var _ keys.Element = &T{} + +func New[V byte | P | int](code ...V) (p *T) { + var cod []byte + switch len(code) { + case 0: + cod = []byte{0} + default: + cod = []byte{byte(code[0])} + } + return &T{Val: cod} +} + +func Empty() (p *T) { + return &T{Val: []byte{0}} +} + +func (p *T) Write(buf *bytes.Buffer) { + if len(p.Val) != Len { + panic(fmt.Sprintln("must use New or initialize Val with len", Len)) + } + buf.Write(p.Val) +} + +func (p *T) Read(buf *bytes.Buffer) (el keys.Element) { + p.Val = make([]byte, Len) + if n, err := buf.Read(p.Val); Chk.E(err) || n != Len { + return nil + } + return p +} + +func (p *T) Len() int { return Len } diff --git a/ratel/keys/index/index_test.go b/ratel/keys/index/index_test.go new file mode 100644 index 0000000..5f95f62 --- /dev/null +++ b/ratel/keys/index/index_test.go @@ -0,0 +1,19 @@ +package index + +import ( + "bytes" + "testing" +) + +func TestT(t *testing.T) { + v := Version.Key() + // v := New(n) + // buf := new(bytes.Buffer) + // v.Write(buf) + buf2 := bytes.NewBuffer(v) + v2 := New(0) + el := v2.Read(buf2).(*T) + if el.Val[0] != v[0] { + t.Fatalf("expected %d got %d", v[0], el.Val) + } +} diff --git a/ratel/keys/index/prefixes.go b/ratel/keys/index/prefixes.go new file mode 100644 index 0000000..5aa36bf --- /dev/null +++ b/ratel/keys/index/prefixes.go @@ -0,0 +1,141 @@ +package index + +import ( + "store.mleku.dev/ratel/keys" + "store.mleku.dev/ratel/keys/createdat" + "store.mleku.dev/ratel/keys/id" + "store.mleku.dev/ratel/keys/kinder" + "store.mleku.dev/ratel/keys/pubkey" + "store.mleku.dev/ratel/keys/serial" +) + +type P byte + +// Key writes a key with the P prefix byte and an arbitrary list of +// keys.Element. +func (p P) Key(element ...keys.Element) (b []byte) { + b = keys.Write( + append([]keys.Element{New(byte(p))}, element...)...) + // Log.T.F("key %x", b) + return +} + +// B returns the index.P as a byte. +func (p P) B() byte { return byte(p) } + +// I returns the index.P as an int (for use with the KeySizes. +func (p P) I() int { return int(p) } + +// GetAsBytes todo wat is dis? +func GetAsBytes(prf ...P) (b [][]byte) { + b = make([][]byte, len(prf)) + for i := range prf { + b[i] = []byte{byte(prf[i])} + } + return +} + +const ( + // Version is the key that stores the version number, the value is a 16-bit + // integer (2 bytes) + // + // [ 255 ][ 2 byte/16 bit version code ] + Version P = 255 +) +const ( + // Event is the prefix used with a Serial counter value provided by badgerDB to + // provide conflict-free 8 byte 64-bit unique keys for event records, which + // follows the prefix. + // + // [ 0 ][ 8 bytes Serial ] + Event P = iota + + // CreatedAt creates an index key that contains the unix + // timestamp of the event record serial. + // + // [ 1 ][ 8 bytes timestamp.T ][ 8 bytes Serial ] + CreatedAt + + // Id contains the first 8 bytes of the ID of the event and the 8 + // byte Serial of the event record. + // + // [ 2 ][ 8 bytes eventid.T prefix ][ 8 bytes Serial ] + Id + + // Kind contains the kind and datestamp. + // + // [ 3 ][ 2 bytes kind.T ][ 8 bytes timestamp.T ][ 8 bytes Serial ] + Kind + + // Pubkey contains pubkey prefix and timestamp. + // + // [ 4 ][ 8 bytes pubkey prefix ][ 8 bytes timestamp.T ][ 8 bytes Serial ] + Pubkey + + // PubkeyKind contains pubkey prefix, kind and timestamp. + // + // [ 5 ][ 8 bytes pubkey prefix ][ 2 bytes kind.T ][ 8 bytes timestamp.T ][ 8 bytes Serial ] + PubkeyKind + + // Tag is for miscellaneous arbitrary length tags, with timestamp and event + // serial after. + // + // [ 6 ][ tag string 1 <= 100 bytes ][ 8 bytes timestamp.T ][ 8 bytes Serial ] + Tag + + // Tag32 contains the 8 byte pubkey prefix, timestamp and serial. + // + // [ 7 ][ 8 bytes pubkey prefix ][ 8 bytes timestamp.T ][ 8 bytes Serial ] + Tag32 + + // TagAddr contains the kind, pubkey prefix, value (index 2) of address tag (eg + // relay address), followed by timestamp and serial. + // + // [ 8 ][ 2 byte kind.T][ 8 byte pubkey prefix ][ network address ][ 8 byte timestamp.T ][ 8 byte Serial ] + TagAddr + + // Counter is the eventid.T prefix, value stores the average time of access + // (average of all access timestamps) and the size of the record. + // + // [ 9 ][ 8 bytes Serial ] : value: [ 8 bytes timestamp ] + Counter +) + +// FilterPrefixes is a slice of the prefixes used by filter index to enable a loop +// for pulling events matching a serial +var FilterPrefixes = [][]byte{ + {CreatedAt.B()}, + {Id.B()}, + {Kind.B()}, + {Pubkey.B()}, + {PubkeyKind.B()}, + {Tag.B()}, + {Tag32.B()}, + {TagAddr.B()}, +} + +// KeySizes are the byte size of keys of each type of key prefix. int(P) or call the P.I() method +// corresponds to the index 1:1. For future index additions be sure to add the +// relevant KeySizes sum as it describes the data for a programmer. +var KeySizes = []int{ + // Event + 1 + serial.Len, + // CreatedAt + 1 + createdat.Len + serial.Len, + // Id + 1 + id.Len + serial.Len, + // Kind + 1 + kinder.Len + createdat.Len + serial.Len, + // Pubkey + 1 + pubkey.Len + createdat.Len + serial.Len, + // PubkeyKind + 1 + pubkey.Len + kinder.Len + createdat.Len + serial.Len, + // Tag (worst case scenario) + 1 + 100 + createdat.Len + serial.Len, + // Tag32 + 1 + pubkey.Len + createdat.Len + serial.Len, + // TagAddr + 1 + kinder.Len + pubkey.Len + 100 + createdat.Len + serial.Len, + // Counter + 1 + serial.Len, +} diff --git a/ratel/keys/keys.go b/ratel/keys/keys.go new file mode 100644 index 0000000..4537470 --- /dev/null +++ b/ratel/keys/keys.go @@ -0,0 +1,43 @@ +// Package keys is a composable framework for constructing badger keys from +// fields of events. +package keys + +import ( + "bytes" +) + +// Element is an enveloper for a type that can Read and Write its binary form. +type Element interface { + // Write the binary form of the field into the given bytes.Buffer. + Write(buf *bytes.Buffer) + // Read accepts a bytes.Buffer and decodes a field from it. + Read(buf *bytes.Buffer) Element + // Len gives the length of the bytes output by the type. + Len() int +} + +// Write the contents of each Element to a byte slice. +func Write(elems ...Element) []byte { + // get the length of the buffer required + var length int + for _, el := range elems { + length += el.Len() + } + buf := bytes.NewBuffer(make([]byte, 0, length)) + // write out the data from each element + for _, el := range elems { + el.Write(buf) + } + return buf.Bytes() +} + +// Read the contents of a byte slice into the provided list of Element types. +func Read(b []byte, elems ...Element) { + buf := bytes.NewBuffer(b) + for _, el := range elems { + el.Read(buf) + } +} + +// Make is a convenience method to wrap a list of Element into a slice. +func Make(elems ...Element) []Element { return elems } diff --git a/ratel/keys/keys_test.go b/ratel/keys/keys_test.go new file mode 100644 index 0000000..5fea743 --- /dev/null +++ b/ratel/keys/keys_test.go @@ -0,0 +1,127 @@ +// package keys_test needs to be a different package name or the implementation +// types imports will circular +package keys_test + +import ( + "bytes" + "crypto/sha256" + "testing" + + "store.mleku.dev/ratel/keys" + "store.mleku.dev/ratel/keys/createdat" + "store.mleku.dev/ratel/keys/id" + "store.mleku.dev/ratel/keys/index" + "store.mleku.dev/ratel/keys/kinder" + "store.mleku.dev/ratel/keys/pubkey" + "store.mleku.dev/ratel/keys/serial" + + "ec.mleku.dev/v2/schnorr" + "lukechampine.com/frand" + "nostr.mleku.dev/codec/eventid" + "nostr.mleku.dev/codec/kind" + "nostr.mleku.dev/codec/timestamp" +) + +func TestElement(t *testing.T) { + for _ = range 100000 { + var failed bool + { // construct a typical key type of structure + // a prefix + np := index.Version + vp := index.New(byte(np)) + // an id + fakeIdBytes := frand.Bytes(sha256.Size) + i := eventid.NewWith(fakeIdBytes) + vid := id.New(i) + // a kinder + n := kind.New(1059) + vk := kinder.New(n.K) + // a pubkey + fakePubkeyBytes := frand.Bytes(schnorr.PubKeyBytesLen) + var vpk *pubkey.T + var err error + vpk, err = pubkey.NewFromBytes(fakePubkeyBytes) + if err != nil { + t.Fatal(err) + } + // a createdat + ts := timestamp.Now() + vca := createdat.New(ts) + // a serial + fakeSerialBytes := frand.Bytes(serial.Len) + vs := serial.New(fakeSerialBytes) + // write Element list into buffer + b := keys.Write(vp, vid, vk, vpk, vca, vs) + // check that values decoded all correctly + // we expect the following types, so we must create them: + var vp2 = index.New(0) + var vid2 = id.New() + var vk2 = kinder.New(0) + var vpk2 *pubkey.T + vpk2, err = pubkey.New() + if err != nil { + t.Fatal(err) + } + var vca2 = createdat.New(timestamp.New()) + var vs2 = serial.New(nil) + // read it in + keys.Read(b, vp2, vid2, vk2, vpk2, vca2, vs2) + // this is a lot of tests, so use switch syntax + switch { + case bytes.Compare(vp.Val, vp2.Val) != 0: + t.Logf("failed to decode correctly got %v expected %v", vp2.Val, + vp.Val) + failed = true + fallthrough + case bytes.Compare(vid.Val, vid2.Val) != 0: + t.Logf("failed to decode correctly got %v expected %v", vid2.Val, + vid.Val) + failed = true + fallthrough + case vk.Val.ToU16() != vk2.Val.ToU16(): + t.Logf("failed to decode correctly got %v expected %v", vk2.Val, + vk.Val) + failed = true + fallthrough + case !bytes.Equal(vpk.Val, vpk2.Val): + t.Logf("failed to decode correctly got %v expected %v", vpk2.Val, + vpk.Val) + failed = true + fallthrough + case vca.Val.I64() != vca2.Val.I64(): + t.Logf("failed to decode correctly got %v expected %v", vca2.Val, + vca.Val) + failed = true + fallthrough + case !bytes.Equal(vs.Val, vs2.Val): + t.Logf("failed to decode correctly got %v expected %v", vpk2.Val, + vpk.Val) + failed = true + } + } + { // construct a counter value + // a createdat + ts := timestamp.Now() + vca := createdat.New(ts) + // a sizer + // n := uint32(frand.Uint64n(math.MaxUint32)) + // write out values + b := keys.Write(vca) + // check that values decoded all correctly + // we expect the following types, so we must create them: + var vca2 = createdat.New(timestamp.New()) + // read it in + keys.Read(b, vca2) + // check they match + + if vca.Val.I64() != vca2.Val.I64() { + t.Logf("failed to decode correctly got %v expected %v", vca2.Val, + vca.Val) + failed = true + } + } + if failed { + t.FailNow() + } + } +} diff --git a/ratel/keys/kinder/kind.go b/ratel/keys/kinder/kind.go new file mode 100644 index 0000000..4415966 --- /dev/null +++ b/ratel/keys/kinder/kind.go @@ -0,0 +1,43 @@ +package kinder + +import ( + "bytes" + "encoding/binary" + . "nostr.mleku.dev" + "store.mleku.dev/ratel/keys" + + "nostr.mleku.dev/codec/kind" +) + +const Len = 2 + +type T struct { + Val *kind.T +} + +var _ keys.Element = &T{} + +// New creates a new kinder.T for reading/writing kind.T values. +func New[V uint16 | int](c V) (p *T) { return &T{Val: kind.New(c)} } + +func Make(c *kind.T) (v []byte) { + v = make([]byte, Len) + binary.BigEndian.PutUint16(v, c.K) + return +} + +func (c *T) Write(buf *bytes.Buffer) { + buf.Write(Make(c.Val)) +} + +func (c *T) Read(buf *bytes.Buffer) (el keys.Element) { + b := make([]byte, Len) + if n, err := buf.Read(b); Chk.E(err) || n != Len { + return nil + } + v := binary.BigEndian.Uint16(b) + c.Val = kind.New(v) + return c +} + +func (c *T) Len() int { return Len } diff --git a/ratel/keys/kinder/kind_test.go b/ratel/keys/kinder/kind_test.go new file mode 100644 index 0000000..4e02bf3 --- /dev/null +++ b/ratel/keys/kinder/kind_test.go @@ -0,0 +1,21 @@ +package kinder + +import ( + "bytes" + "testing" + + "nostr.mleku.dev/codec/kind" +) + +func TestT(t *testing.T) { + n := kind.New(1059) + v := New(n.ToU16()) + buf := new(bytes.Buffer) + v.Write(buf) + buf2 := bytes.NewBuffer(buf.Bytes()) + v2 := New(0) + el := v2.Read(buf2).(*T) + if el.Val.ToU16() != n.ToU16() { + t.Fatalf("expected %d got %d", n, el.Val) + } +} diff --git a/ratel/keys/pubkey/pubkey.go b/ratel/keys/pubkey/pubkey.go new file mode 100644 index 0000000..2df5c1b --- /dev/null +++ b/ratel/keys/pubkey/pubkey.go @@ -0,0 +1,75 @@ +package pubkey + +import ( + "bytes" + "fmt" + + . "nostr.mleku.dev" + "store.mleku.dev/ratel/keys" + + "ec.mleku.dev/v2/schnorr" +) + +const Len = 8 + +type T struct { + Val []byte +} + +var _ keys.Element = &T{} + +// New creates a new pubkey prefix, if parameter is omitted, new one is +// allocated (for read) if more than one is given, only the first is used, and +// if the first one is not the correct hexadecimal length of 64, return error. +func New(pk ...B) (p *T, err E) { + if len(pk) < 1 { + // allows init with no parameter + return &T{make([]byte, Len)}, nil + } + // // only the first pubkey will be used + // if len(pk[0]) != schnorr.PubKeyBytesLen*2 { + // err = Log.E.Err("pubkey hex must be 64 chars, got", len(pk[0])) + // return + // } + // b, err := hex.Dec(pk[0][:Len*2]) + // if Chk.E(err) { + // return + // } + return &T{Val: pk[0][:Len]}, nil +} + +func NewFromBytes(pkb []byte) (p *T, err error) { + if len(pkb) != schnorr.PubKeyBytesLen { + err = Log.E.Err("provided key not correct length, got %d expected %d", + len(pkb), schnorr.PubKeyBytesLen) + Log.T.S(pkb) + return + } + b := make([]byte, Len) + copy(b, pkb[:Len]) + p = &T{Val: b} + return +} + +func (p *T) Write(buf *bytes.Buffer) { + if p == nil { + panic("nil pubkey") + } + if p.Val == nil || len(p.Val) != Len { + panic(fmt.Sprintln("must use New or initialize Val with len", Len)) + } + buf.Write(p.Val) +} + +func (p *T) Read(buf *bytes.Buffer) (el keys.Element) { + // allow uninitialized struct + if len(p.Val) != Len { + p.Val = make([]byte, Len) + } + if n, err := buf.Read(p.Val); Chk.E(err) || n != Len { + return nil + } + return p +} + +func (p *T) Len() int { return Len } diff --git a/ratel/keys/pubkey/pubkey_test.go b/ratel/keys/pubkey/pubkey_test.go new file mode 100644 index 0000000..62d3e45 --- /dev/null +++ b/ratel/keys/pubkey/pubkey_test.go @@ -0,0 +1,28 @@ +package pubkey + +import ( + "bytes" + . "nostr.mleku.dev" + "testing" + + "ec.mleku.dev/v2/schnorr" + "lukechampine.com/frand" +) + +func TestT(t *testing.T) { + for _ = range 10000000 { + fakePubkeyBytes := frand.Bytes(schnorr.PubKeyBytesLen) + v, err := New(fakePubkeyBytes) + if Chk.E(err) { + t.FailNow() + } + buf := new(bytes.Buffer) + v.Write(buf) + buf2 := bytes.NewBuffer(buf.Bytes()) + v2, _ := New() + el := v2.Read(buf2).(*T) + if bytes.Compare(el.Val, v.Val) != 0 { + t.Fatalf("expected %x got %x", v.Val, el.Val) + } + } +} diff --git a/ratel/keys/serial/serial.go b/ratel/keys/serial/serial.go new file mode 100644 index 0000000..bac3928 --- /dev/null +++ b/ratel/keys/serial/serial.go @@ -0,0 +1,81 @@ +package serial + +import ( + "bytes" + "encoding/binary" + "fmt" + . "nostr.mleku.dev" + "store.mleku.dev/ratel/keys" +) + +const Len = 8 + +// T is a badger DB serial number used for conflict free event record keys. +type T struct { + Val []byte +} + +var _ keys.Element = &T{} + +// New returns a new serial record key.Element - if nil or short slice is given, +// initialize a fresh one with Len (for reading), otherwise if equal or longer, +// trim if long and store into struct (for writing). +func New(ser []byte) (p *T) { + switch { + case len(ser) < Len: + // Log.I.Ln("empty serial") + // allows use of nil to init + ser = make([]byte, Len) + default: + ser = ser[:Len] + } + return &T{Val: ser} +} + +// FromKey expects the last Len bytes of the given slice to be the serial. +func FromKey(k []byte) (p *T) { + if len(k) < Len { + panic(fmt.Sprintf("cannot get a serial without at least 8 bytes %x", k)) + } + key := make([]byte, Len) + copy(key, k[len(k)-Len:]) + return &T{Val: key} +} + +func Make(s uint64) (ser []byte) { + ser = make([]byte, 8) + binary.BigEndian.PutUint64(ser, s) + return +} + +func (p *T) Write(buf *bytes.Buffer) { + if len(p.Val) != Len { + panic(fmt.Sprintln("must use New or initialize Val with len", Len)) + } + buf.Write(p.Val) +} + +func (p *T) Read(buf *bytes.Buffer) (el keys.Element) { + // allow uninitialized struct + if len(p.Val) != Len { + p.Val = make([]byte, Len) + } + if n, err := buf.Read(p.Val); Chk.E(err) || n != Len { + return nil + } + return p +} + +func (p *T) Len() int { return Len } +func (p *T) Uint64() (u uint64) { return binary.BigEndian.Uint64(p.Val) } + +// Match compares a key bytes to a serial, all indexes have the serial at +// the end indicating the event record they refer to, and if they match returns +// true. +func Match(index, ser []byte) bool { + l := len(index) + if l < Len { + return false + } + return bytes.Compare(index[l-Len:], ser) == 0 +} diff --git a/ratel/keys/serial/serial_test.go b/ratel/keys/serial/serial_test.go new file mode 100644 index 0000000..dcc6b89 --- /dev/null +++ b/ratel/keys/serial/serial_test.go @@ -0,0 +1,22 @@ +package serial_test + +import ( + "bytes" + "store.mleku.dev/ratel/keys/serial" + "testing" + + "lukechampine.com/frand" +) + +func TestT(t *testing.T) { + fakeSerialBytes := frand.Bytes(serial.Len) + v := serial.New(fakeSerialBytes) + buf := new(bytes.Buffer) + v.Write(buf) + buf2 := bytes.NewBuffer(buf.Bytes()) + v2 := &serial.T{} // or can use New(nil) + el := v2.Read(buf2).(*serial.T) + if bytes.Compare(el.Val, v.Val) != 0 { + t.Fatalf("expected %x got %x", v.Val, el.Val) + } +} diff --git a/ratel/log.go b/ratel/log.go new file mode 100644 index 0000000..8e9b590 --- /dev/null +++ b/ratel/log.go @@ -0,0 +1,64 @@ +package ratel + +import ( + "fmt" + "runtime" + "strings" + + . "nostr.mleku.dev" + + "util.mleku.dev/atomic" + "util.mleku.dev/lol" +) + +func NewLogger(logLevel int, label string) (l *logger) { + Log.T.Ln("getting logger for", label) + l = &logger{Label: label} + l.Level.Store(int32(logLevel)) + return +} + +type logger struct { + Level atomic.Int32 + Label string +} + +func (l *logger) SetLogLevel(level int) { + l.Level.Store(int32(level)) +} + +func (l *logger) Errorf(s string, i ...interface{}) { + if l.Level.Load() >= lol.Error { + s = l.Label + ": " + s + txt := fmt.Sprintf(s, i...) + _, file, line, _ := runtime.Caller(2) + Log.E.F("%s %s:%d", strings.TrimSpace(txt), file, line) + } +} + +func (l *logger) Warningf(s string, i ...interface{}) { + if l.Level.Load() >= lol.Warn { + s = l.Label + ": " + s + txt := fmt.Sprintf(s, i...) + _, file, line, _ := runtime.Caller(2) + Log.W.F("%s %s:%d", strings.TrimSpace(txt), file, line) + } +} + +func (l *logger) Infof(s string, i ...interface{}) { + if l.Level.Load() >= lol.Info { + s = l.Label + ": " + s + txt := fmt.Sprintf(s, i...) + _, file, line, _ := runtime.Caller(2) + Log.T.F("%s %s:%d", strings.TrimSpace(txt), file, line) + } +} + +func (l *logger) Debugf(s string, i ...interface{}) { + if l.Level.Load() >= lol.Debug { + s = l.Label + ": " + s + txt := fmt.Sprintf(s, i...) + _, file, line, _ := runtime.Caller(2) + Log.T.F("%s %s:%d", strings.TrimSpace(txt), file, line) + } +} diff --git a/ratel/main.go b/ratel/main.go new file mode 100644 index 0000000..e191fad --- /dev/null +++ b/ratel/main.go @@ -0,0 +1,125 @@ +package ratel + +import ( + "encoding/binary" + "sync" + "time" + + "github.com/dgraph-io/badger/v4" + . "nostr.mleku.dev" + "store.mleku.dev" + "store.mleku.dev/ratel/keys/index" + "store.mleku.dev/ratel/keys/serial" + "util.mleku.dev/context" +) + +type T struct { + Ctx context.T + WG *sync.WaitGroup + dataDir string + // DBSizeLimit is the number of bytes we want to keep the data store from exceeding. + DBSizeLimit int + // DBLowWater is the percentage of DBSizeLimit a GC run will reduce the used storage down + // to. + DBLowWater int + // DBHighWater is the trigger point at which a GC run should start if exceeded. + DBHighWater int + // GCFrequency is the frequency of checks of the current utilisation. + GCFrequency time.Duration + HasL2 bool + BlockCacheSize int + InitLogLevel int + Logger *logger + // DB is the badger db enveloper + *badger.DB + // seq is the monotonic collision free index for raw event storage. + seq *badger.Sequence + // Threads is how many CPU threads we dedicate to concurrent actions, flatten and GC mark + Threads int + // MaxLimit is a default limit that applies to a query without a limit, to avoid sending out + // too many events to a client from a malformed or excessively broad filter. + MaxLimit int + // ActuallyDelete sets whether we actually delete or rewrite deleted entries with a modified + // deleted prefix value (8th bit set) + ActuallyDelete bool +} + +var _ eventstore.I = (*T)(nil) + +// GetBackend returns a reasonably configured badger.Backend. +// +// The variadic params correspond to DBSizeLimit, DBLowWater, DBHighWater and +// GCFrequency as an integer multiplier of number of seconds. +// +// Note that the cancel function for the context needs to be managed by the +// caller. +func GetBackend(Ctx context.T, WG *sync.WaitGroup, path S, hasL2 bool, + blockCacheSize, logLevel, + maxLimit int, params ...int) (b *T) { + var sizeLimit, lw, hw, freq = 0, 86, 92, 60 + switch len(params) { + case 4: + freq = params[3] + fallthrough + case 3: + hw = params[2] + fallthrough + case 2: + lw = params[1] + fallthrough + case 1: + sizeLimit = params[0] + } + // if unset, assume a safe maximum limit for unlimited filters. + if maxLimit == 0 { + maxLimit = 512 + } + b = &T{ + Ctx: Ctx, + WG: WG, + DBSizeLimit: sizeLimit, + DBLowWater: lw, + DBHighWater: hw, + GCFrequency: time.Duration(freq) * time.Second, + HasL2: hasL2, + BlockCacheSize: blockCacheSize, + InitLogLevel: logLevel, + MaxLimit: maxLimit, + dataDir: path, + } + return +} + +func (r *T) Path() S { return r.dataDir } + +// SerialKey returns a key used for storing events, and the raw serial counter +// bytes to copy into index keys. +func (r *T) SerialKey() (idx []byte, ser *serial.T) { + var err error + var s []byte + if s, err = r.SerialBytes(); Chk.E(err) { + panic(err) + } + ser = serial.New(s) + return index.Event.Key(ser), ser +} + +// Serial returns the next monotonic conflict free unique serial on the database. +func (r *T) Serial() (ser uint64, err error) { + if ser, err = r.seq.Next(); Chk.E(err) { + } + // Log.T.F("serial %x", ser) + return +} + +// SerialBytes returns a new serial value, used to store an event record with a +// conflict-free unique code (it is a monotonic, atomic, ascending counter). +func (r *T) SerialBytes() (ser []byte, err error) { + var serU64 uint64 + if serU64, err = r.Serial(); Chk.E(err) { + panic(err) + } + ser = make([]byte, serial.Len) + binary.BigEndian.PutUint64(ser, serU64) + return +} diff --git a/ratel/nuke.go b/ratel/nuke.go new file mode 100644 index 0000000..b222d2b --- /dev/null +++ b/ratel/nuke.go @@ -0,0 +1,28 @@ +package ratel + +import ( + . "nostr.mleku.dev" + "store.mleku.dev/ratel/keys/index" +) + +func (r *T) Nuke() (err E) { + Log.W.F("nukening database at %s", r.dataDir) + if err = r.DB.DropPrefix([][]byte{ + {index.Event.B()}, + {index.CreatedAt.B()}, + {index.Id.B()}, + {index.Kind.B()}, + {index.Pubkey.B()}, + {index.PubkeyKind.B()}, + {index.Tag.B()}, + {index.Tag32.B()}, + {index.TagAddr.B()}, + {index.Counter.B()}, + }...); Chk.E(err) { + return + } + if err = r.DB.RunValueLogGC(0.8); Chk.E(err) { + return + } + return +} diff --git a/ratel/preparequeries.go b/ratel/preparequeries.go new file mode 100644 index 0000000..1170717 --- /dev/null +++ b/ratel/preparequeries.go @@ -0,0 +1,184 @@ +package ratel + +import ( + "encoding/binary" + "fmt" + "math" + + . "nostr.mleku.dev" + "store.mleku.dev/ratel/keys/id" + "store.mleku.dev/ratel/keys/index" + "store.mleku.dev/ratel/keys/kinder" + "store.mleku.dev/ratel/keys/pubkey" + "store.mleku.dev/ratel/keys/serial" + + "nostr.mleku.dev/codec/event" + "nostr.mleku.dev/codec/eventid" + "nostr.mleku.dev/codec/filter" + "nostr.mleku.dev/codec/timestamp" +) + +type Results struct { + Ev *event.T + TS *timestamp.T + Ser *serial.T +} + +type query struct { + index int + queryFilter *filter.T + searchPrefix []byte + start []byte + skipTS bool +} + +// PrepareQueries analyses a filter and generates a set of query specs that produce +// key prefixes to search for in the badger key indexes. +func PrepareQueries(f *filter.T) ( + qs []query, + ext *filter.T, + since uint64, + err error, +) { + if f == nil { + err = Errorf.E("filter cannot be nil") + } + switch { + // first if there is IDs, just search for them, this overrides all other filters + case len(f.IDs.Field) > 0: + qs = make([]query, f.IDs.Len()) + for i, idHex := range f.IDs.Field { + ih := id.New(eventid.NewWith(B(idHex))) + if ih == nil { + Log.E.F("failed to decode event ID: %s", idHex) + // just ignore it, clients will be clients + continue + } + prf := index.Id.Key(ih) + // Log.T.F("id prefix to search on %0x from key %0x", prf, ih.Val) + qs[i] = query{ + index: i, + queryFilter: f, + searchPrefix: prf, + skipTS: true, + } + } + // Log.T.S("ids", qs) + // second we make a set of queries based on author pubkeys, optionally with kinds + case f.Authors.Len() > 0: + // if there is no kinds, we just make the queries based on the author pub keys + if f.Kinds.Len() == 0 { + qs = make([]query, f.Authors.Len()) + for i, pubkeyHex := range f.Authors.Field { + var pk *pubkey.T + if pk, err = pubkey.New(pubkeyHex); Chk.E(err) { + // bogus filter, continue anyway + continue + } + sp := index.Pubkey.Key(pk) + // Log.I.F("search only for authors %0x from pub key %0x", sp, pk.Val) + qs[i] = query{ + index: i, + queryFilter: f, + searchPrefix: sp, + } + } + // Log.I.S("authors", qs) + } else { + // if there is kinds as well, we are searching via the kind/pubkey prefixes + qs = make([]query, f.Authors.Len()*f.Kinds.Len()) + i := 0 + authors: + for _, pubkeyHex := range f.Authors.Field { + for _, kind := range f.Kinds.K { + var pk *pubkey.T + if pk, err = pubkey.New(pubkeyHex); Chk.E(err) { + // skip this dodgy thing + continue authors + } + ki := kinder.New(kind.K) + sp := index.PubkeyKind.Key(pk, ki) + // Log.T.F("search for authors from pub key %0x and kind %0x", pk.Val, ki.Val) + qs[i] = query{index: i, queryFilter: f, searchPrefix: sp} + i++ + } + } + // Log.T.S("authors/kinds", qs) + } + if f.Tags != nil && f.Tags.T != nil || f.Tags.Len() > 0 { + ext = &filter.T{Tags: f.Tags} + // Log.T.S("extra filter", ext) + } + case f.Tags.Len() > 0: + // determine the size of the queries array by inspecting all tags sizes + size := 0 + for _, values := range f.Tags.T { + size += values.Len() - 1 + } + if size == 0 { + return nil, nil, 0, fmt.Errorf("empty tag filters") + } + // we need a query for each tag search + qs = make([]query, size) + // and any kinds mentioned as well in extra filter + ext = &filter.T{Kinds: f.Kinds} + i := 0 + Log.T.S(f.Tags.T) + for _, values := range f.Tags.T { + Log.T.S(values.Field) + for _, value := range values.Field[1:] { + // get key prefix (with full length) and offset where to write the last parts + var prf []byte + if prf, err = GetTagKeyPrefix(S(value)); Chk.E(err) { + continue + } + // remove the last part to get just the prefix we want here + Log.T.F("search for tags from %0x", prf) + qs[i] = query{index: i, queryFilter: f, searchPrefix: prf} + i++ + } + } + // Log.T.S("tags", qs) + case f.Kinds.Len() > 0: + // if there is no ids, pubs or tags, we are just searching for kinds + qs = make([]query, f.Kinds.Len()) + for i, kind := range f.Kinds.K { + kk := kinder.New(kind.K) + ki := index.Kind.Key(kk) + qs[i] = query{ + index: i, + queryFilter: f, + searchPrefix: ki, + } + } + // Log.T.S("kinds", qs) + default: + if len(qs) > 0 { + qs[0] = query{index: 0, queryFilter: f, + searchPrefix: index.CreatedAt.Key()} + ext = nil + } + // Log.T.S("other", qs) + } + var until uint64 = math.MaxUint64 + if f.Until != nil { + if fu := uint64(*f.Until); fu < until { + until = fu - 1 + } + } + for i, q := range qs { + qs[i].start = binary.BigEndian.AppendUint64(q.searchPrefix, until) + } + // this is where we'll end the iteration + if f.Since != nil { + if fs := uint64(*f.Since); fs > since { + since = fs + } + } + // if we got an empty filter, we still need a query for scraping the newest + if len(qs) == 0 { + qs = append(qs, query{index: 0, queryFilter: f, searchPrefix: B{1}, + start: B{1, 255, 255, 255, 255, 255, 255, 255, 255}}) + } + return +} diff --git a/ratel/queryevents.go b/ratel/queryevents.go new file mode 100644 index 0000000..617bdce --- /dev/null +++ b/ratel/queryevents.go @@ -0,0 +1,158 @@ +package ratel + +import ( + "github.com/dgraph-io/badger/v4" + . "nostr.mleku.dev" + "nostr.mleku.dev/codec/event" + "nostr.mleku.dev/codec/filter" + "nostr.mleku.dev/codec/tag" + "store.mleku.dev/ratel/keys/createdat" + "store.mleku.dev/ratel/keys/index" + "store.mleku.dev/ratel/keys/serial" +) + +func (r *T) QueryEvents(c Ctx, f *filter.T) (evs []*event.T, err E) { + Log.T.F("query for events\n%s", f) + var queries []query + var extraFilter *filter.T + var since uint64 + if queries, extraFilter, since, err = PrepareQueries(f); Chk.E(err) { + return + } + var limit bool + if f.Limit != 0 { + Log.T.S("query has a limit") + limit = true + } + Log.T.S(queries, extraFilter) + // search for the keys generated from the filter + var eventKeys [][]byte + for _, q := range queries { + Log.T.S(q, extraFilter) + err = r.View(func(txn *badger.Txn) (err E) { + // iterate only through keys and in reverse order + opts := badger.IteratorOptions{ + Reverse: true, + } + it := txn.NewIterator(opts) + defer it.Close() + // for it.Rewind(); it.Valid(); it.Next() { + for it.Seek(q.start); it.ValidForPrefix(q.searchPrefix); it.Next() { + item := it.Item() + k := item.KeyCopy(nil) + Log.T.S(k) + if !q.skipTS { + if len(k) < createdat.Len+serial.Len { + continue + } + createdAt := createdat.FromKey(k) + Log.T.F("%d < %d", createdAt.Val.U64(), since) + if createdAt.Val.U64() < since { + break + } + } + ser := serial.FromKey(k) + eventKeys = append(eventKeys, index.Event.Key(ser)) + } + return + }) + if Chk.E(err) { + // this can't actually happen because the View function above does not set err. + } + search: + for _, eventKey := range eventKeys { + // Log.I.S(eventKey) + var v B + err = r.View(func(txn *badger.Txn) (err E) { + opts := badger.IteratorOptions{Reverse: true} + it := txn.NewIterator(opts) + defer it.Close() + // for it.Rewind(); it.Valid(); it.Next() { + for it.Seek(eventKey); it.ValidForPrefix(eventKey); it.Next() { + item := it.Item() + k := item.KeyCopy(nil) + Log.T.S(k) + if v, err = item.ValueCopy(nil); Chk.E(err) { + continue + } + if r.HasL2 && len(v) == sha256.Size { + // this is a stub entry that indicates an L2 needs to be accessed for it, so + // we populate only the event.T.ID and return the result, the caller will + // expect this as a signal to query the L2 event store. + ev := &event.T{} + Log.T.F("found event stub %0x must seek in L2", v) + ev.ID = v + select { + case <-c.Done(): + return + case <-r.Ctx.Done(): + Log.T.Ln("backend context canceled") + return + default: + } + evs = append(evs, ev) + return + } + } + return + }) + if v == nil { + continue + } + ev := &event.T{} + var rem B + if rem, err = ev.UnmarshalBinary(v); Chk.E(err) { + return + } + Log.T.S(ev) + if len(rem) > 0 { + Log.T.S(rem) + } + // check if this matches the other filters that were not part of the index. + if extraFilter == nil || extraFilter.Matches(ev) { + // check if this event is replaced by one we already have in the result. + if ev.Kind.IsReplaceable() { + for _, evc := range evs { + // replaceable means there should be only the newest for the pubkey and + // kind. + if Equals(ev.PubKey, evc.PubKey) && ev.Kind.Equal(evc.Kind) { + // we won't add it to the results slice + continue search + } + } + } + if ev.Kind.IsParameterizedReplaceable() && + ev.Tags.GetFirst(tag.New("d")) != nil { + for _, evc := range evs { + // parameterized replaceable means there should only be the newest for a + // pubkey, kind and the value field of the `d` tag. + if ev.Kind.Equal(evc.Kind) && Equals(ev.PubKey, evc.PubKey) && + Equals(ev.Tags.GetFirst(tag.New("d")).Value(), + ev.Tags.GetFirst(tag.New("d")).Value()) { + // we won't add it to the results slice + continue search + } + } + } + Log.T.F("sending back result\n%s\n", ev) + evs = append(evs, ev) + if limit { + f.Limit-- + if f.Limit == 0 { + return + } + } else { + // if there is no limit, cap it at the MaxLimit, assume this was the intent + // or the client is erroneous, if any limit greater is requested this will + // be used instead as the previous clause. + if len(evs) > r.MaxLimit { + return + } + } + } + } + } + Log.T.S(evs) + Log.T.Ln("query complete") + return +} diff --git a/ratel/saveevent.go b/ratel/saveevent.go new file mode 100644 index 0000000..7424690 --- /dev/null +++ b/ratel/saveevent.go @@ -0,0 +1,129 @@ +package ratel + +import ( + "fmt" + + "github.com/dgraph-io/badger/v4" + . "nostr.mleku.dev" + "nostr.mleku.dev/codec/event" + "nostr.mleku.dev/codec/eventid" + "nostr.mleku.dev/codec/timestamp" + "store.mleku.dev" + "store.mleku.dev/ratel/keys" + "store.mleku.dev/ratel/keys/createdat" + "store.mleku.dev/ratel/keys/id" + "store.mleku.dev/ratel/keys/index" + "store.mleku.dev/ratel/keys/serial" +) + +func (r *T) SaveEvent(c Ctx, ev *event.T) (err E) { + if ev.Kind.IsEphemeral() { + Log.T.F("not saving ephemeral event\n%s", ev.Serialize()) + // send it out + return + } + Log.T.C(func() S { + evs, _ := ev.MarshalJSON(nil) + return fmt.Sprintf("saving event\n%d %s", len(evs), evs) + }) + // make sure Close waits for this to complete + r.WG.Add(1) + defer r.WG.Done() + // first, search to see if the event ID already exists. + var foundSerial []byte + seri := serial.New(nil) + err = r.View(func(txn *badger.Txn) (err error) { + // query event by id to ensure we don't try to save duplicates + prf := index.Id.Key(id.New(eventid.NewWith(ev.ID))) + it := txn.NewIterator(badger.IteratorOptions{}) + defer it.Close() + it.Seek(prf) + if it.ValidForPrefix(prf) { + var k []byte + // get the serial + k = it.Item().Key() + // copy serial out + keys.Read(k, index.Empty(), id.New(eventid.New()), seri) + // save into foundSerial + foundSerial = seri.Val + } + return + }) + if Chk.E(err) { + return + } + if foundSerial != nil { + Log.T.Ln("found possible duplicate or stub for %s", ev) + err = r.Update(func(txn *badger.Txn) (err error) { + // retrieve the event record + evKey := keys.Write(index.New(index.Event), seri) + it := txn.NewIterator(badger.IteratorOptions{}) + defer it.Close() + it.Seek(evKey) + if it.ValidForPrefix(evKey) { + if it.Item().ValueSize() != sha256.Size { + // not a stub, we already have it + Log.T.Ln("duplicate event", ev.ID) + return eventstore.ErrDupEvent + } + // we only need to restore the event binary and write the access counter key + // encode to binary + var bin B + if bin, err = ev.MarshalBinary(bin); Chk.E(err) { + return + } + if err = txn.Set(it.Item().Key(), bin); Chk.E(err) { + return + } + // bump counter key + counterKey := GetCounterKey(seri) + val := keys.Write(createdat.New(timestamp.Now())) + if err = txn.Set(counterKey, val); Chk.E(err) { + return + } + return + } + return + }) + // if it was a dupe, we are done. + if err != nil { + return + } + return + } + var bin B + if bin, err = ev.MarshalBinary(bin); Chk.E(err) { + return + } + Log.T.F("saving event to badger %s", ev) + // otherwise, save new event record. + if err = r.Update(func(txn *badger.Txn) (err error) { + var idx []byte + var ser *serial.T + idx, ser = r.SerialKey() + // encode to binary + // raw event store + if err = txn.Set(idx, bin); Chk.E(err) { + return + } + // add the indexes + var indexKeys [][]byte + indexKeys = GetIndexKeysForEvent(ev, ser) + for _, k := range indexKeys { + if err = txn.Set(k, nil); Chk.E(err) { + return + } + } + // initialise access counter key + counterKey := GetCounterKey(ser) + val := keys.Write(createdat.New(timestamp.Now())) + if err = txn.Set(counterKey, val); Chk.E(err) { + return + } + Log.T.F("event saved %0x %s", ev.ID, r.dataDir) + return + }); Chk.E(err) { + return + } + return +} diff --git a/relay_interface.go b/relay_interface.go new file mode 100644 index 0000000..2097fde --- /dev/null +++ b/relay_interface.go @@ -0,0 +1,90 @@ +package eventstore + +import ( + "errors" + "fmt" + "sort" + + . "nostr.mleku.dev" + "nostr.mleku.dev/codec/event" + "nostr.mleku.dev/codec/filter" + "nostr.mleku.dev/protocol/ws" +) + +// RelayInterface is a wrapper thing that unifies Store and Relay under a// common API. +type RelayInterface interface { + Publish(c Ctx, evt *event.T) E + QuerySync(c Ctx, f *filter.T, + opts ...ws.SubscriptionOption) ([]*event.T, E) +} + +type RelayWrapper struct { + I +} + +var _ RelayInterface = (*RelayWrapper)(nil) + +func (w RelayWrapper) Publish(c Ctx, evt *event.T) (err E) { + // var ch event.C + // defer close(ch) + if evt.Kind.IsEphemeral() { + // do not store ephemeral events + return nil + // todo: rewrite to fit new API + // } else if evt.Kind.IsReplaceable() { + // // replaceable event, delete before storing + // ch, err = w.Store.QueryEvents(c, &filter.T{ + // Authors: []string{evt.PubKey}, + // Kinds: kinds.T{evt.Kind}, + // }) + // if err != nil { + // return fmt.Errorf("failed to query before replacing: %w", err) + // } + // if previous := <-ch; previous != nil && isOlder(previous, evt) { + // if err = w.Store.DeleteEvent(c, previous); err != nil { + // return fmt.Errorf("failed to delete event for replacing: %w", err) + // } + // } + // } else if evt.Kind.IsParameterizedReplaceable() { + // parameterized replaceable event, delete before storing + // d := evt.Tags.GetFirst([]string{"d", ""}) + // if d != nil { + // ch, err = w.Store.QueryEvents(c, &filter.T{ + // Authors: []string{evt.PubKey}, + // Kinds: kinds.T{evt.Kind}, + // Tags: filter.TagMap{"d": []string{d.Value()}}, + // }) + // if err != nil { + // return fmt.Errorf("failed to query before parameterized replacing: %w", err) + // } + // if previous := <-ch; previous != nil && isOlder(previous, evt) { + // if err = w.Store.DeleteEvent(c, previous); Chk.D(err) { + // return fmt.Errorf("failed to delete event for parameterized replacing: %w", err) + // } + // } + // } + } + if err = w.SaveEvent(c, evt); Chk.E(err) && !errors.Is(err, ErrDupEvent) { + return Errorf.E("failed to save: %w", err) + } + return nil +} + +func (w RelayWrapper) QuerySync(c Ctx, f *filter.T, + opts ...ws.SubscriptionOption) ([]*event.T, E) { + + evs, err := w.I.QueryEvents(c, f) + if Chk.E(err) { + return nil, fmt.Errorf("failed to query: %w", err) + } + n := f.Limit + if n != 0 { + results := make(event.Descending, 0, n) + for _, ev := range evs { + results = append(results, ev) + } + sort.Sort(results) + return results, nil + } + return nil, nil +} diff --git a/store_interface.go b/store_interface.go new file mode 100644 index 0000000..dcf463f --- /dev/null +++ b/store_interface.go @@ -0,0 +1,43 @@ +package eventstore + +import ( + . "nostr.mleku.dev" + "nostr.mleku.dev/codec/event" + "nostr.mleku.dev/codec/eventid" + "nostr.mleku.dev/codec/filter" + "store.mleku.dev/ratel/del" +) + +// I is an types for a persistence layer for nostr events handled by a relay. +type I interface { + // Init is called at the very beginning by [Server.Start], after [Relay.Init], allowing a + // storage to initialize its internal resources. The parameters can be used by the database + // implementations to set custom parameters such as cache management and other relevant + // parameters to the specific implementation. + Init(path S) (err E) + // Path returns the directory of the database. + Path() S + // Close must be called after you're done using the store, to free up resources and so on. + Close() (err E) + // Nuke deletes everything in the database. + Nuke() (err E) + // QueryEvents is invoked upon a client's REQ as described in NIP-01. it should return a + // channel with the events as they're recovered from a database. the channel should be + // closed after the events are all delivered. + QueryEvents(c Ctx, f *filter.T) (evs []*event.T, err E) + // CountEvents performs the same work as QueryEvents but instead of delivering the events + // that were found it just returns the count of events + CountEvents(c Ctx, f *filter.T) (count N, err E) + // DeleteEvent is used to handle deletion events, as per NIP-09. + DeleteEvent(c Ctx, ev *eventid.T) (err E) + // SaveEvent is called once Relay.AcceptEvent reports true. + SaveEvent(c Ctx, ev *event.T) (err E) +} + +// Cache is a sketch of an expanded enveloper that might be used for a size-constrained event +// store. +type Cache interface { + I + GCCount() (deleteItems del.Items, err E) + Delete(serials del.Items) (err E) +} diff --git a/utils.go b/utils.go new file mode 100644 index 0000000..b0cd9a2 --- /dev/null +++ b/utils.go @@ -0,0 +1,37 @@ +package eventstore + +import ( + "bytes" + "strconv" + "strings" + + . "nostr.mleku.dev" + + "nostr.mleku.dev/codec/tag" + "util.mleku.dev/hex" +) + +func GetAddrTagElements(tagValue S) (k uint16, pkb B, d S) { + split := strings.Split(tagValue, ":") + if len(split) == 3 { + if pkb, _ = hex.Dec(split[1]); len(pkb) == 32 { + if key, err := strconv.ParseUint(split[0], 10, 16); err == nil { + return uint16(key), pkb, split[2] + } + } + } + return 0, nil, "" +} + +func TagSorter(a, b tag.T) int { + if len(a.Field) < 2 { + if len(b.Field) < 2 { + return 0 + } + return -1 + } + if len(b.Field) < 2 { + return 1 + } + return bytes.Compare(a.Field[1], b.Field[1]) +}