From 6688f6ffc8d9d8d8995894d362c64974882848f0 Mon Sep 17 00:00:00 2001 From: mleku Date: Wed, 11 Sep 2024 11:19:58 +0100 Subject: [PATCH] initial commit --- .idea/.gitignore | 8 ++ .idea/material_theme_project_new.xml | 17 +++ .idea/modules.xml | 8 ++ .idea/ratel.mleku.dev.iml | 13 ++ .idea/vcs.xml | 6 + close.go | 22 ++++ countevents.go | 14 ++ del/del.go | 9 ++ deleteevent.go | 79 ++++++++++++ getecounterkey.go | 14 ++ getindexkeysforevent.go | 93 ++++++++++++++ gettagkeyelements.go | 69 ++++++++++ gettagkeyprefix.go | 56 ++++++++ go.mod | 3 + go.work | 9 ++ init.go | 98 ++++++++++++++ keys/arb/arb.go | 85 +++++++++++++ keys/arb/arb_test.go | 22 ++++ keys/count/count.go | 45 +++++++ keys/createdat/createdat.go | 47 +++++++ keys/createdat/createdat_test.go | 25 ++++ keys/id/id.go | 58 +++++++++ keys/id/id_test.go | 24 ++++ keys/index/index.go | 48 +++++++ keys/index/index_test.go | 19 +++ keys/index/prefixes.go | 141 ++++++++++++++++++++ keys/keys.go | 43 +++++++ keys/keys_test.go | 127 ++++++++++++++++++ keys/kinder/kind.go | 43 +++++++ keys/kinder/kind_test.go | 21 +++ keys/pubkey/pubkey.go | 75 +++++++++++ keys/pubkey/pubkey_test.go | 28 ++++ keys/serial/serial.go | 81 ++++++++++++ keys/serial/serial_test.go | 22 ++++ log.go | 64 ++++++++++ main.go | 125 ++++++++++++++++++ nuke.go | 28 ++++ preparequeries.go | 184 +++++++++++++++++++++++++++ queryevents.go | 159 +++++++++++++++++++++++ saveevent.go | 130 +++++++++++++++++++ 40 files changed, 2162 insertions(+) create mode 100644 .idea/.gitignore create mode 100644 .idea/material_theme_project_new.xml create mode 100644 .idea/modules.xml create mode 100644 .idea/ratel.mleku.dev.iml create mode 100644 .idea/vcs.xml create mode 100644 close.go create mode 100644 countevents.go create mode 100644 del/del.go create mode 100644 deleteevent.go create mode 100644 getecounterkey.go create mode 100644 getindexkeysforevent.go create mode 100644 gettagkeyelements.go create mode 100644 gettagkeyprefix.go create mode 100644 go.mod create mode 100644 go.work create mode 100644 init.go create mode 100644 keys/arb/arb.go create mode 100644 keys/arb/arb_test.go create mode 100644 keys/count/count.go create mode 100644 keys/createdat/createdat.go create mode 100644 keys/createdat/createdat_test.go create mode 100644 keys/id/id.go create mode 100644 keys/id/id_test.go create mode 100644 keys/index/index.go create mode 100644 keys/index/index_test.go create mode 100644 keys/index/prefixes.go create mode 100644 keys/keys.go create mode 100644 keys/keys_test.go create mode 100644 keys/kinder/kind.go create mode 100644 keys/kinder/kind_test.go create mode 100644 keys/pubkey/pubkey.go create mode 100644 keys/pubkey/pubkey_test.go create mode 100644 keys/serial/serial.go create mode 100644 keys/serial/serial_test.go create mode 100644 log.go create mode 100644 main.go create mode 100644 nuke.go create mode 100644 preparequeries.go create mode 100644 queryevents.go create mode 100644 saveevent.go diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..13566b8 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/material_theme_project_new.xml b/.idea/material_theme_project_new.xml new file mode 100644 index 0000000..59b792d --- /dev/null +++ b/.idea/material_theme_project_new.xml @@ -0,0 +1,17 @@ + + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..0f5a816 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/ratel.mleku.dev.iml b/.idea/ratel.mleku.dev.iml new file mode 100644 index 0000000..2986543 --- /dev/null +++ b/.idea/ratel.mleku.dev.iml @@ -0,0 +1,13 @@ + + + + + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..9661ac7 --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/close.go b/close.go new file mode 100644 index 0000000..b06cd70 --- /dev/null +++ b/close.go @@ -0,0 +1,22 @@ +package ratel + +import ( + . "nostr.mleku.dev" +) + +func (r *T) Close() (err E) { + Log.I.F("closing database %s", r.Path()) + if err = r.DB.Flatten(4); Chk.E(err) { + return + } + Log.D.F("database flattened") + if err = r.seq.Release(); Chk.E(err) { + return + } + Log.D.F("database released") + if err = r.DB.Close(); Chk.E(err) { + return + } + Log.I.F("database closed") + return +} diff --git a/countevents.go b/countevents.go new file mode 100644 index 0000000..ec5bfea --- /dev/null +++ b/countevents.go @@ -0,0 +1,14 @@ +package ratel + +import ( + . "nostr.mleku.dev" + "nostr.mleku.dev/codec/event" + "nostr.mleku.dev/codec/filter" +) + +func (r *T) CountEvents(c Ctx, f *filter.T) (count N, err E) { + var evs []*event.T + evs, err = r.QueryEvents(c, f) + count = len(evs) + return +} diff --git a/del/del.go b/del/del.go new file mode 100644 index 0000000..8e6fea1 --- /dev/null +++ b/del/del.go @@ -0,0 +1,9 @@ +package del + +import "bytes" + +type Items [][]byte + +func (c Items) Len() int { return len(c) } +func (c Items) Less(i, j int) bool { return bytes.Compare(c[i], c[j]) < 0 } +func (c Items) Swap(i, j int) { c[i], c[j] = c[j], c[i] } diff --git a/deleteevent.go b/deleteevent.go new file mode 100644 index 0000000..a06acfa --- /dev/null +++ b/deleteevent.go @@ -0,0 +1,79 @@ +package ratel + +import ( + "github.com/dgraph-io/badger/v4" + . "nostr.mleku.dev" + "nostr.mleku.dev/codec/event" + "nostr.mleku.dev/codec/eventid" + "ratel.mleku.dev/keys" + "ratel.mleku.dev/keys/id" + "ratel.mleku.dev/keys/index" + "ratel.mleku.dev/keys/serial" +) + +func (r *T) DeleteEvent(c Ctx, eid *eventid.T) (err E) { + var foundSerial []byte + seri := serial.New(nil) + err = r.View(func(txn *badger.Txn) (err error) { + // query event by id to ensure we don't try to save duplicates + prf := index.Id.Key(id.New(eid)) + it := txn.NewIterator(badger.IteratorOptions{}) + defer it.Close() + it.Seek(prf) + if it.ValidForPrefix(prf) { + var k []byte + // get the serial + k = it.Item().Key() + // copy serial out + keys.Read(k, index.Empty(), id.New(eventid.New()), seri) + // save into foundSerial + foundSerial = seri.Val + } + return + }) + if Chk.E(err) { + return + } + if foundSerial == nil { + return + } + var indexKeys []B + ev := &event.T{} + var evKey, evb, counterKey B + // fetch the event to get its index keys + err = r.View(func(txn *badger.Txn) (err error) { + // retrieve the event record + evKey = keys.Write(index.New(index.Event), seri) + it := txn.NewIterator(badger.IteratorOptions{}) + defer it.Close() + it.Seek(evKey) + if it.ValidForPrefix(evKey) { + if evb, err = it.Item().ValueCopy(evb); Chk.E(err) { + return + } + if _, err = ev.MarshalJSON(evb); Chk.E(err) { + return + } + indexKeys = GetIndexKeysForEvent(ev, seri) + counterKey = GetCounterKey(seri) + return + } + return + }) + if Chk.E(err) { + return + } + err = r.Update(func(txn *badger.Txn) (err E) { + if err = txn.Delete(evKey); Chk.E(err) { + } + for _, key := range indexKeys { + if err = txn.Delete(key); Chk.E(err) { + } + } + if err = txn.Delete(counterKey); Chk.E(err) { + return + } + return + }) + return +} diff --git a/getecounterkey.go b/getecounterkey.go new file mode 100644 index 0000000..c3c8be0 --- /dev/null +++ b/getecounterkey.go @@ -0,0 +1,14 @@ +package ratel + +import ( + . "nostr.mleku.dev" + "ratel.mleku.dev/keys/index" + "ratel.mleku.dev/keys/serial" +) + +// GetCounterKey returns the proper counter key for a given event ID. +func GetCounterKey(ser *serial.T) (key B) { + key = index.Counter.Key(ser) + // Log.T.F("counter key %d %d", index.Counter, ser.Uint64()) + return +} diff --git a/getindexkeysforevent.go b/getindexkeysforevent.go new file mode 100644 index 0000000..6d4e3e6 --- /dev/null +++ b/getindexkeysforevent.go @@ -0,0 +1,93 @@ +package ratel + +import ( + . "nostr.mleku.dev" + "nostr.mleku.dev/codec/event" + "nostr.mleku.dev/codec/eventid" + "nostr.mleku.dev/codec/tag" + "ratel.mleku.dev/keys" + "ratel.mleku.dev/keys/createdat" + "ratel.mleku.dev/keys/id" + "ratel.mleku.dev/keys/index" + "ratel.mleku.dev/keys/kinder" + "ratel.mleku.dev/keys/pubkey" + "ratel.mleku.dev/keys/serial" +) + +// GetIndexKeysForEvent generates all the index keys required to filter for +// events. evtSerial should be the output of Serial() which gets a unique, +// monotonic counter value for each new event. +func GetIndexKeysForEvent(ev *event.T, ser *serial.T) (keyz [][]byte) { + + var err error + keyz = make([][]byte, 0, 18) + ID := id.New(eventid.NewWith(ev.ID)) + CA := createdat.New(ev.CreatedAt) + K := kinder.New(ev.Kind.ToU16()) + PK, _ := pubkey.New(ev.PubKey) + // indexes + { // ~ by id + k := index.Id.Key(ID, ser) + // Log.T.F("id key: %x %0x %0x", k[0], k[1:9], k[9:]) + keyz = append(keyz, k) + } + { // ~ by pubkey+date + k := index.Pubkey.Key(PK, CA, ser) + // Log.T.F("pubkey + date key: %x %0x %0x %0x", + // k[0], k[1:9], k[9:17], k[17:]) + keyz = append(keyz, k) + } + { // ~ by kind+date + k := index.Kind.Key(K, CA, ser) + Log.T.F("kind + date key: %x %0x %0x %0x", + k[0], k[1:3], k[3:11], k[11:]) + keyz = append(keyz, k) + } + { // ~ by pubkey+kind+date + k := index.PubkeyKind.Key(PK, K, CA, ser) + // Log.T.F("pubkey + kind + date key: %x %0x %0x %0x %0x", + // k[0], k[1:9], k[9:11], k[11:19], k[19:]) + keyz = append(keyz, k) + } + // ~ by tag value + date + for i, t := range ev.Tags.T { + // there is no value field + if len(t.Field) < 2 || + // the tag is not a-zA-Z probably (this would permit arbitrary other + // single byte chars) + len(t.Field[0]) != 1 || + // the second field is zero length + len(t.Field[1]) == 0 || + // the second field is more than 100 characters long + len(t.Field[1]) > 100 { + // any of the above is true then the tag is not indexable + continue + } + var firstIndex int + var tt *tag.T + for firstIndex, tt = range ev.Tags.T { + if len(tt.Field) >= 2 && Equals(tt.Field[1], t.Field[1]) { + break + } + } + if firstIndex != i { + // duplicate + continue + } + // get key prefix (with full length) and offset where to write the last + // parts + prf, elems := index.P(0), []keys.Element(nil) + if prf, elems, err = GetTagKeyElements(S(t.Field[1]), CA, ser); Chk.E(err) { + return + } + k := prf.Key(elems...) + Log.T.F("tag '%s': %s key %0x", t.Field[0], t.Field[1:], k) + keyz = append(keyz, k) + } + { // ~ by date only + k := index.CreatedAt.Key(CA, ser) + // Log.T.F("date key: %x %0x %0x", k[0], k[1:9], k[9:]) + keyz = append(keyz, k) + } + return +} diff --git a/gettagkeyelements.go b/gettagkeyelements.go new file mode 100644 index 0000000..4deb995 --- /dev/null +++ b/gettagkeyelements.go @@ -0,0 +1,69 @@ +package ratel + +import ( + "strconv" + "strings" + + . "nostr.mleku.dev" + "ratel.mleku.dev/keys" + "ratel.mleku.dev/keys/arb" + "ratel.mleku.dev/keys/createdat" + "ratel.mleku.dev/keys/index" + "ratel.mleku.dev/keys/kinder" + "ratel.mleku.dev/keys/pubkey" + "ratel.mleku.dev/keys/serial" + + "ec.mleku.dev/v2/schnorr" + "util.mleku.dev/hex" +) + +func GetTagKeyElements(tagValue string, CA *createdat.T, + ser *serial.T) (prf index.P, + elems []keys.Element, err error) { + + var pkb []byte + // first check if it might be a public key, fastest test + if len(tagValue) == 2*schnorr.PubKeyBytesLen { + // this could be a pubkey + pkb, err = hex.Dec(tagValue) + if err == nil { + // it's a pubkey + var pkk keys.Element + if pkk, err = pubkey.NewFromBytes(pkb); Chk.E(err) { + return + } + prf, elems = index.Tag32, keys.Make(pkk, ser) + return + } + } + // check for a tag + if strings.Count(tagValue, ":") == 2 { + // this means we will get 3 pieces here + split := strings.Split(tagValue, ":") + // middle element should be a public key so must be 64 hex ciphers + if len(split[1]) != schnorr.PubKeyBytesLen*2 { + return + } + var k uint16 + var d string + if pkb, err = hex.Dec(split[1]); !Chk.E(err) { + var kin uint64 + if kin, err = strconv.ParseUint(split[0], 10, 16); err == nil { + k = uint16(kin) + d = split[2] + var pk *pubkey.T + if pk, err = pubkey.NewFromBytes(pkb); Chk.E(err) { + return + } + prf = index.TagAddr + elems = keys.Make(kinder.New(k), pk, arb.NewFromString(d), CA, + ser) + return + } + } + } + // store whatever as utf-8 + prf = index.Tag + elems = keys.Make(arb.NewFromString(tagValue), CA, ser) + return +} diff --git a/gettagkeyprefix.go b/gettagkeyprefix.go new file mode 100644 index 0000000..3095864 --- /dev/null +++ b/gettagkeyprefix.go @@ -0,0 +1,56 @@ +package ratel + +import ( + "eventstore.mleku.dev" + . "nostr.mleku.dev" + "ratel.mleku.dev/keys" + "ratel.mleku.dev/keys/arb" + "ratel.mleku.dev/keys/index" + "ratel.mleku.dev/keys/kinder" + "ratel.mleku.dev/keys/pubkey" + "util.mleku.dev/hex" +) + +// GetTagKeyPrefix returns tag index prefixes based on the initial field of a +// tag. +// +// There is 3 types of index tag keys: +// +// - TagAddr: [ 8 ][ 2b Kind ][ 8b Pubkey ][ address/URL ][ 8b Serial ] +// +// - Tag32: [ 7 ][ 8b Pubkey ][ 8b Serial ] +// +// - Tag: [ 6 ][ address/URL ][ 8b Serial ] +// +// This function produces the initial bytes without the index. +func GetTagKeyPrefix(tagValue string) (key []byte, err error) { + if k, pkb, d := eventstore.GetAddrTagElements(tagValue); len(pkb) == 32 { + // store value in the new special "a" tag index + var pk *pubkey.T + if pk, err = pubkey.NewFromBytes(pkb); Chk.E(err) { + return + } + els := []keys.Element{kinder.New(k), pk} + if len(d) > 0 { + els = append(els, arb.NewFromString(d)) + } + key = index.TagAddr.Key(els...) + } else if pkb, _ := hex.Dec(tagValue); len(pkb) == 32 { + // store value as bytes + var pkk *pubkey.T + if pkk, err = pubkey.NewFromBytes(pkb); Chk.E(err) { + return + } + key = index.Tag32.Key(pkk) + } else { + // store whatever as utf-8 + if len(tagValue) > 0 { + var a *arb.T + a = arb.NewFromString(tagValue) + key = index.Tag.Key(a) + } else { + key = index.Tag.Key() + } + } + return +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..a011c19 --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module ratel.mleku.dev + +go 1.22.7 diff --git a/go.work b/go.work new file mode 100644 index 0000000..91a579b --- /dev/null +++ b/go.work @@ -0,0 +1,9 @@ +go 1.22.7 + +use ( + . +) + +replace nostr.mleku.dev => ../nostr.mleku.dev +replace util.mleku.dev => ../util.mleku.dev +replace eventstore.mleku.dev => ../eventstore.mleku.dev \ No newline at end of file diff --git a/init.go b/init.go new file mode 100644 index 0000000..1137019 --- /dev/null +++ b/init.go @@ -0,0 +1,98 @@ +package ratel + +import ( + "encoding/binary" + "errors" + "fmt" + + . "nostr.mleku.dev" + "ratel.mleku.dev/keys/index" + + "github.com/dgraph-io/badger/v4" + "github.com/dgraph-io/badger/v4/options" + "util.mleku.dev/units" +) + +func (r *T) Init(path S) (err E) { + r.dataDir = path + Log.I.Ln("opening ratel event store at", r.Path()) + opts := badger.DefaultOptions(r.dataDir) + opts.BlockCacheSize = int64(r.BlockCacheSize) + opts.BlockSize = units.Mb + opts.CompactL0OnClose = true + opts.LmaxCompaction = true + opts.Compression = options.None + // opts.Compression = options.ZSTD + r.Logger = NewLogger(r.InitLogLevel, r.dataDir) + opts.Logger = r.Logger + if r.DB, err = badger.Open(opts); Chk.E(err) { + return err + } + Log.T.Ln("getting event store sequence index", r.dataDir) + if r.seq, err = r.DB.GetSequence([]byte("events"), 1000); Chk.E(err) { + return err + } + Log.T.Ln("running migrations", r.dataDir) + if err = r.runMigrations(); Chk.E(err) { + return Log.E.Err("error running migrations: %w; %s", err, r.dataDir) + } + if r.DBSizeLimit > 0 { + // go r.GarbageCollector() + } else { + // go r.GCCount() + } + return nil + +} + +const Version = 1 + +func (r *T) runMigrations() (err error) { + return r.Update(func(txn *badger.Txn) (err error) { + var version uint16 + var item *badger.Item + item, err = txn.Get([]byte{index.Version.B()}) + if errors.Is(err, badger.ErrKeyNotFound) { + version = 0 + } else if Chk.E(err) { + return err + } else { + Chk.E(item.Value(func(val []byte) (err error) { + version = binary.BigEndian.Uint16(val) + return + })) + } + // do the migrations in increasing steps (there is no rollback) + if version < Version { + // if there is any data in the relay we will stop and notify the user, otherwise we + // just set version to 1 and proceed + prefix := []byte{index.Id.B()} + it := txn.NewIterator(badger.IteratorOptions{ + PrefetchValues: true, + PrefetchSize: 100, + Prefix: prefix, + }) + defer it.Close() + hasAnyEntries := false + for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() { + hasAnyEntries = true + break + } + if hasAnyEntries { + return fmt.Errorf("your database is at version %d, but in order to migrate up "+ + "to version 1 you must manually export all the events and then import "+ + "again:\n"+ + "run an old version of this software, export the data, then delete the "+ + "database files, run the new version, import the data back it", version) + } + Chk.E(r.bumpVersion(txn, Version)) + } + return nil + }) +} + +func (r *T) bumpVersion(txn *badger.Txn, version uint16) error { + buf := make([]byte, 2) + binary.BigEndian.PutUint16(buf, version) + return txn.Set([]byte{index.Version.B()}, buf) +} diff --git a/keys/arb/arb.go b/keys/arb/arb.go new file mode 100644 index 0000000..1acb316 --- /dev/null +++ b/keys/arb/arb.go @@ -0,0 +1,85 @@ +package arb + +import ( + "bytes" + + . "nostr.mleku.dev" + "ratel.mleku.dev/keys" +) + +// T is an arbitrary length byte string. In any construction there can only be one with arbitrary length. Custom lengths +// can be created by calling New with the custom length in it, both for Read and Write operations. +type T struct { + Val B +} + +var _ keys.Element = &T{} + +// New creates a new arb.T. This must have the expected length for the provided byte slice as this is what the Read +// method will aim to copy. In general this will be a bounded field, either the final or only arbitrary length field in +// a key. +func New(b B) (p *T) { + if len(b) == 0 { + Log.T.Ln("empty or nil slice is the same as zero value, " + + "use keys.ReadWithArbElem") + return &T{} + } + return &T{Val: b} +} + +func NewWithLen(l int) (p *T) { return &T{Val: make([]byte, l)} } +func NewFromString(s S) (p *T) { return New([]byte(s)) } + +func (p *T) Write(buf *bytes.Buffer) { + if len(p.Val) == 0 { + Log.T.Ln("empty slice has no effect") + return + } + buf.Write(p.Val) +} + +func (p *T) Read(buf *bytes.Buffer) (el keys.Element) { + if len(p.Val) < 1 { + Log.T.Ln("empty slice has no effect") + return + } + if _, err := buf.Read(p.Val); Chk.E(err) { + return nil + } + return p +} + +func (p *T) Len() int { + if p == nil { + panic("uninitialized pointer to arb.T") + } + return len(p.Val) +} + +// ReadWithArbElem is a variant of Read that recognises an arbitrary length element by its zero length and imputes its +// actual length by the byte buffer size and the lengths of the fixed length fields. +// +// For reasons of space efficiency, it is not practical to use TLVs for badger database key fields, so this will panic +// if there is more than one arbitrary length element. +func ReadWithArbElem(b B, elems ...keys.Element) { + var arbEl int + var arbSet bool + l := len(b) + for i, el := range elems { + elLen := el.Len() + l -= elLen + if elLen == 0 { + if arbSet { + panic("cannot have more than one arbitrary length field in a key") + } + arbEl = i + arbSet = true + } + } + // now we can say that the remainder is the correct length for the arb element + elems[arbEl] = New(make([]byte, l)) + buf := bytes.NewBuffer(b) + for _, el := range elems { + el.Read(buf) + } +} diff --git a/keys/arb/arb_test.go b/keys/arb/arb_test.go new file mode 100644 index 0000000..d713c6e --- /dev/null +++ b/keys/arb/arb_test.go @@ -0,0 +1,22 @@ +package arb + +import ( + "bytes" + "testing" + + "lukechampine.com/frand" +) + +func TestT(t *testing.T) { + randomBytes := frand.Bytes(frand.Intn(128)) + v := New(randomBytes) + buf := new(bytes.Buffer) + v.Write(buf) + randomCopy := make([]byte, len(randomBytes)) + buf2 := bytes.NewBuffer(buf.Bytes()) + v2 := New(randomCopy) + el := v2.Read(buf2).(*T) + if bytes.Compare(el.Val, v.Val) != 0 { + t.Fatalf("expected %x got %x", v.Val, el.Val) + } +} diff --git a/keys/count/count.go b/keys/count/count.go new file mode 100644 index 0000000..5e6c27c --- /dev/null +++ b/keys/count/count.go @@ -0,0 +1,45 @@ +package count + +import ( + "nostr.mleku.dev/codec/timestamp" +) + +type Item struct { + Serial uint64 + Size uint32 + Freshness *timestamp.T +} + +type Items []*Item + +func (c Items) Len() int { return len(c) } +func (c Items) Less(i, j int) bool { return c[i].Freshness.I64() < c[j].Freshness.I64() } +func (c Items) Swap(i, j int) { c[i], c[j] = c[j], c[i] } +func (c Items) Total() (total int) { + for i := range c { + total += int(c[i].Size) + } + return +} + +type ItemsBySerial []*Item + +func (c ItemsBySerial) Len() int { return len(c) } +func (c ItemsBySerial) Less(i, j int) bool { return c[i].Serial < c[j].Serial } +func (c ItemsBySerial) Swap(i, j int) { c[i], c[j] = c[j], c[i] } +func (c ItemsBySerial) Total() (total int) { + for i := range c { + total += int(c[i].Size) + } + return +} + +type Fresh struct { + Serial uint64 + Freshness *timestamp.T +} +type Freshes []*Fresh + +func (c Freshes) Len() int { return len(c) } +func (c Freshes) Less(i, j int) bool { return c[i].Serial < c[j].Serial } +func (c Freshes) Swap(i, j int) { c[i], c[j] = c[j], c[i] } diff --git a/keys/createdat/createdat.go b/keys/createdat/createdat.go new file mode 100644 index 0000000..d59c369 --- /dev/null +++ b/keys/createdat/createdat.go @@ -0,0 +1,47 @@ +package createdat + +import ( + "bytes" + "encoding/binary" + + . "nostr.mleku.dev" + "nostr.mleku.dev/codec/timestamp" + "ratel.mleku.dev/keys" + "ratel.mleku.dev/keys/serial" +) + +const Len = 8 + +type T struct { + Val *timestamp.T +} + +var _ keys.Element = &T{} + +func New(c *timestamp.T) (p *T) { return &T{Val: c} } + +func (c *T) Write(buf *bytes.Buffer) { + buf.Write(c.Val.Bytes()) +} + +func (c *T) Read(buf *bytes.Buffer) (el keys.Element) { + b := make([]byte, Len) + if n, err := buf.Read(b); Chk.E(err) || n != Len { + return nil + } + c.Val = timestamp.FromUnix(int64(binary.BigEndian.Uint64(b))) + return c +} + +func (c *T) Len() int { return Len } + +// FromKey expects to find a datestamp in the 8 bytes before a serial in a key. +func FromKey(k []byte) (p *T) { + if len(k) < Len+serial.Len { + err := Errorf.F("cannot get a serial without at least %d bytes", Len+serial.Len) + panic(err) + } + key := make([]byte, 0, Len) + key = append(key, k[len(k)-Len-serial.Len:len(k)-serial.Len]...) + return &T{Val: timestamp.FromBytes(key)} +} diff --git a/keys/createdat/createdat_test.go b/keys/createdat/createdat_test.go new file mode 100644 index 0000000..469b607 --- /dev/null +++ b/keys/createdat/createdat_test.go @@ -0,0 +1,25 @@ +package createdat + +import ( + "bytes" + "math" + "testing" + + "lukechampine.com/frand" + "nostr.mleku.dev/codec/timestamp" +) + +func TestT(t *testing.T) { + for _ = range 1000000 { + n := timestamp.FromUnix(int64(frand.Intn(math.MaxInt64))) + v := New(n) + buf := new(bytes.Buffer) + v.Write(buf) + buf2 := bytes.NewBuffer(buf.Bytes()) + v2 := New(timestamp.New()) + el := v2.Read(buf2).(*T) + if el.Val.Int() != n.Int() { + t.Fatalf("expected %d got %d", n.Int(), el.Val.Int()) + } + } +} diff --git a/keys/id/id.go b/keys/id/id.go new file mode 100644 index 0000000..2e124ec --- /dev/null +++ b/keys/id/id.go @@ -0,0 +1,58 @@ +package id + +import ( + "bytes" + "fmt" + . "nostr.mleku.dev" + "ratel.mleku.dev/keys" + "strings" + + "nostr.mleku.dev/codec/eventid" + "util.mleku.dev/hex" +) + +const Len = 8 + +type T struct { + Val []byte +} + +var _ keys.Element = &T{} + +func New(evID ...*eventid.T) (p *T) { + if len(evID) < 1 || len(evID[0].String()) < 1 { + return &T{make([]byte, Len)} + } + evid := evID[0].String() + if len(evid) < 64 { + evid = strings.Repeat("0", 64-len(evid)) + evid + } + if len(evid) > 64 { + evid = evid[:64] + } + b, err := hex.Dec(evid[:Len*2]) + if Chk.E(err) { + return + } + return &T{Val: b} +} + +func (p *T) Write(buf *bytes.Buffer) { + if len(p.Val) != Len { + panic(fmt.Sprintln("must use New or initialize Val with len", Len)) + } + buf.Write(p.Val) +} + +func (p *T) Read(buf *bytes.Buffer) (el keys.Element) { + // allow uninitialized struct + if len(p.Val) != Len { + p.Val = make([]byte, Len) + } + if n, err := buf.Read(p.Val); Chk.E(err) || n != Len { + return nil + } + return p +} + +func (p *T) Len() int { return Len } diff --git a/keys/id/id_test.go b/keys/id/id_test.go new file mode 100644 index 0000000..23f0ec3 --- /dev/null +++ b/keys/id/id_test.go @@ -0,0 +1,24 @@ +package id + +import ( + "bytes" + "testing" + + sha256 "github.com/minio/sha256-simd" + "lukechampine.com/frand" + "nostr.mleku.dev/codec/eventid" +) + +func TestT(t *testing.T) { + fakeIdBytes := frand.Bytes(sha256.Size) + id := eventid.NewWith(fakeIdBytes) + v := New(id) + buf := new(bytes.Buffer) + v.Write(buf) + buf2 := bytes.NewBuffer(buf.Bytes()) + v2 := New() + el := v2.Read(buf2).(*T) + if bytes.Compare(el.Val, v.Val) != 0 { + t.Fatalf("expected %x got %x", v.Val, el.Val) + } +} diff --git a/keys/index/index.go b/keys/index/index.go new file mode 100644 index 0000000..1365659 --- /dev/null +++ b/keys/index/index.go @@ -0,0 +1,48 @@ +package index + +import ( + "bytes" + "fmt" + . "nostr.mleku.dev" + "ratel.mleku.dev/keys" +) + +const Len = 1 + +type T struct { + Val []byte +} + +var _ keys.Element = &T{} + +func New[V byte | P | int](code ...V) (p *T) { + var cod []byte + switch len(code) { + case 0: + cod = []byte{0} + default: + cod = []byte{byte(code[0])} + } + return &T{Val: cod} +} + +func Empty() (p *T) { + return &T{Val: []byte{0}} +} + +func (p *T) Write(buf *bytes.Buffer) { + if len(p.Val) != Len { + panic(fmt.Sprintln("must use New or initialize Val with len", Len)) + } + buf.Write(p.Val) +} + +func (p *T) Read(buf *bytes.Buffer) (el keys.Element) { + p.Val = make([]byte, Len) + if n, err := buf.Read(p.Val); Chk.E(err) || n != Len { + return nil + } + return p +} + +func (p *T) Len() int { return Len } diff --git a/keys/index/index_test.go b/keys/index/index_test.go new file mode 100644 index 0000000..5f95f62 --- /dev/null +++ b/keys/index/index_test.go @@ -0,0 +1,19 @@ +package index + +import ( + "bytes" + "testing" +) + +func TestT(t *testing.T) { + v := Version.Key() + // v := New(n) + // buf := new(bytes.Buffer) + // v.Write(buf) + buf2 := bytes.NewBuffer(v) + v2 := New(0) + el := v2.Read(buf2).(*T) + if el.Val[0] != v[0] { + t.Fatalf("expected %d got %d", v[0], el.Val) + } +} diff --git a/keys/index/prefixes.go b/keys/index/prefixes.go new file mode 100644 index 0000000..2269fd2 --- /dev/null +++ b/keys/index/prefixes.go @@ -0,0 +1,141 @@ +package index + +import ( + "ratel.mleku.dev/keys" + "ratel.mleku.dev/keys/createdat" + "ratel.mleku.dev/keys/id" + "ratel.mleku.dev/keys/kinder" + "ratel.mleku.dev/keys/pubkey" + "ratel.mleku.dev/keys/serial" +) + +type P byte + +// Key writes a key with the P prefix byte and an arbitrary list of +// keys.Element. +func (p P) Key(element ...keys.Element) (b []byte) { + b = keys.Write( + append([]keys.Element{New(byte(p))}, element...)...) + // Log.T.F("key %x", b) + return +} + +// B returns the index.P as a byte. +func (p P) B() byte { return byte(p) } + +// I returns the index.P as an int (for use with the KeySizes. +func (p P) I() int { return int(p) } + +// GetAsBytes todo wat is dis? +func GetAsBytes(prf ...P) (b [][]byte) { + b = make([][]byte, len(prf)) + for i := range prf { + b[i] = []byte{byte(prf[i])} + } + return +} + +const ( + // Version is the key that stores the version number, the value is a 16-bit + // integer (2 bytes) + // + // [ 255 ][ 2 byte/16 bit version code ] + Version P = 255 +) +const ( + // Event is the prefix used with a Serial counter value provided by badgerDB to + // provide conflict-free 8 byte 64-bit unique keys for event records, which + // follows the prefix. + // + // [ 0 ][ 8 bytes Serial ] + Event P = iota + + // CreatedAt creates an index key that contains the unix + // timestamp of the event record serial. + // + // [ 1 ][ 8 bytes timestamp.T ][ 8 bytes Serial ] + CreatedAt + + // Id contains the first 8 bytes of the ID of the event and the 8 + // byte Serial of the event record. + // + // [ 2 ][ 8 bytes eventid.T prefix ][ 8 bytes Serial ] + Id + + // Kind contains the kind and datestamp. + // + // [ 3 ][ 2 bytes kind.T ][ 8 bytes timestamp.T ][ 8 bytes Serial ] + Kind + + // Pubkey contains pubkey prefix and timestamp. + // + // [ 4 ][ 8 bytes pubkey prefix ][ 8 bytes timestamp.T ][ 8 bytes Serial ] + Pubkey + + // PubkeyKind contains pubkey prefix, kind and timestamp. + // + // [ 5 ][ 8 bytes pubkey prefix ][ 2 bytes kind.T ][ 8 bytes timestamp.T ][ 8 bytes Serial ] + PubkeyKind + + // Tag is for miscellaneous arbitrary length tags, with timestamp and event + // serial after. + // + // [ 6 ][ tag string 1 <= 100 bytes ][ 8 bytes timestamp.T ][ 8 bytes Serial ] + Tag + + // Tag32 contains the 8 byte pubkey prefix, timestamp and serial. + // + // [ 7 ][ 8 bytes pubkey prefix ][ 8 bytes timestamp.T ][ 8 bytes Serial ] + Tag32 + + // TagAddr contains the kind, pubkey prefix, value (index 2) of address tag (eg + // relay address), followed by timestamp and serial. + // + // [ 8 ][ 2 byte kind.T][ 8 byte pubkey prefix ][ network address ][ 8 byte timestamp.T ][ 8 byte Serial ] + TagAddr + + // Counter is the eventid.T prefix, value stores the average time of access + // (average of all access timestamps) and the size of the record. + // + // [ 9 ][ 8 bytes Serial ] : value: [ 8 bytes timestamp ] + Counter +) + +// FilterPrefixes is a slice of the prefixes used by filter index to enable a loop +// for pulling events matching a serial +var FilterPrefixes = [][]byte{ + {CreatedAt.B()}, + {Id.B()}, + {Kind.B()}, + {Pubkey.B()}, + {PubkeyKind.B()}, + {Tag.B()}, + {Tag32.B()}, + {TagAddr.B()}, +} + +// KeySizes are the byte size of keys of each type of key prefix. int(P) or call the P.I() method +// corresponds to the index 1:1. For future index additions be sure to add the +// relevant KeySizes sum as it describes the data for a programmer. +var KeySizes = []int{ + // Event + 1 + serial.Len, + // CreatedAt + 1 + createdat.Len + serial.Len, + // Id + 1 + id.Len + serial.Len, + // Kind + 1 + kinder.Len + createdat.Len + serial.Len, + // Pubkey + 1 + pubkey.Len + createdat.Len + serial.Len, + // PubkeyKind + 1 + pubkey.Len + kinder.Len + createdat.Len + serial.Len, + // Tag (worst case scenario) + 1 + 100 + createdat.Len + serial.Len, + // Tag32 + 1 + pubkey.Len + createdat.Len + serial.Len, + // TagAddr + 1 + kinder.Len + pubkey.Len + 100 + createdat.Len + serial.Len, + // Counter + 1 + serial.Len, +} diff --git a/keys/keys.go b/keys/keys.go new file mode 100644 index 0000000..4537470 --- /dev/null +++ b/keys/keys.go @@ -0,0 +1,43 @@ +// Package keys is a composable framework for constructing badger keys from +// fields of events. +package keys + +import ( + "bytes" +) + +// Element is an enveloper for a type that can Read and Write its binary form. +type Element interface { + // Write the binary form of the field into the given bytes.Buffer. + Write(buf *bytes.Buffer) + // Read accepts a bytes.Buffer and decodes a field from it. + Read(buf *bytes.Buffer) Element + // Len gives the length of the bytes output by the type. + Len() int +} + +// Write the contents of each Element to a byte slice. +func Write(elems ...Element) []byte { + // get the length of the buffer required + var length int + for _, el := range elems { + length += el.Len() + } + buf := bytes.NewBuffer(make([]byte, 0, length)) + // write out the data from each element + for _, el := range elems { + el.Write(buf) + } + return buf.Bytes() +} + +// Read the contents of a byte slice into the provided list of Element types. +func Read(b []byte, elems ...Element) { + buf := bytes.NewBuffer(b) + for _, el := range elems { + el.Read(buf) + } +} + +// Make is a convenience method to wrap a list of Element into a slice. +func Make(elems ...Element) []Element { return elems } diff --git a/keys/keys_test.go b/keys/keys_test.go new file mode 100644 index 0000000..c9fe557 --- /dev/null +++ b/keys/keys_test.go @@ -0,0 +1,127 @@ +// package keys_test needs to be a different package name or the implementation +// types imports will circular +package keys_test + +import ( + "bytes" + "crypto/sha256" + "testing" + + "ratel.mleku.dev/keys" + "ratel.mleku.dev/keys/createdat" + "ratel.mleku.dev/keys/id" + "ratel.mleku.dev/keys/index" + "ratel.mleku.dev/keys/kinder" + "ratel.mleku.dev/keys/pubkey" + "ratel.mleku.dev/keys/serial" + + "ec.mleku.dev/v2/schnorr" + "lukechampine.com/frand" + "nostr.mleku.dev/codec/eventid" + "nostr.mleku.dev/codec/kind" + "nostr.mleku.dev/codec/timestamp" +) + +func TestElement(t *testing.T) { + for _ = range 100000 { + var failed bool + { // construct a typical key type of structure + // a prefix + np := index.Version + vp := index.New(byte(np)) + // an id + fakeIdBytes := frand.Bytes(sha256.Size) + i := eventid.NewWith(fakeIdBytes) + vid := id.New(i) + // a kinder + n := kind.New(1059) + vk := kinder.New(n.K) + // a pubkey + fakePubkeyBytes := frand.Bytes(schnorr.PubKeyBytesLen) + var vpk *pubkey.T + var err error + vpk, err = pubkey.NewFromBytes(fakePubkeyBytes) + if err != nil { + t.Fatal(err) + } + // a createdat + ts := timestamp.Now() + vca := createdat.New(ts) + // a serial + fakeSerialBytes := frand.Bytes(serial.Len) + vs := serial.New(fakeSerialBytes) + // write Element list into buffer + b := keys.Write(vp, vid, vk, vpk, vca, vs) + // check that values decoded all correctly + // we expect the following types, so we must create them: + var vp2 = index.New(0) + var vid2 = id.New() + var vk2 = kinder.New(0) + var vpk2 *pubkey.T + vpk2, err = pubkey.New() + if err != nil { + t.Fatal(err) + } + var vca2 = createdat.New(timestamp.New()) + var vs2 = serial.New(nil) + // read it in + keys.Read(b, vp2, vid2, vk2, vpk2, vca2, vs2) + // this is a lot of tests, so use switch syntax + switch { + case bytes.Compare(vp.Val, vp2.Val) != 0: + t.Logf("failed to decode correctly got %v expected %v", vp2.Val, + vp.Val) + failed = true + fallthrough + case bytes.Compare(vid.Val, vid2.Val) != 0: + t.Logf("failed to decode correctly got %v expected %v", vid2.Val, + vid.Val) + failed = true + fallthrough + case vk.Val.ToU16() != vk2.Val.ToU16(): + t.Logf("failed to decode correctly got %v expected %v", vk2.Val, + vk.Val) + failed = true + fallthrough + case !bytes.Equal(vpk.Val, vpk2.Val): + t.Logf("failed to decode correctly got %v expected %v", vpk2.Val, + vpk.Val) + failed = true + fallthrough + case vca.Val.I64() != vca2.Val.I64(): + t.Logf("failed to decode correctly got %v expected %v", vca2.Val, + vca.Val) + failed = true + fallthrough + case !bytes.Equal(vs.Val, vs2.Val): + t.Logf("failed to decode correctly got %v expected %v", vpk2.Val, + vpk.Val) + failed = true + } + } + { // construct a counter value + // a createdat + ts := timestamp.Now() + vca := createdat.New(ts) + // a sizer + // n := uint32(frand.Uint64n(math.MaxUint32)) + // write out values + b := keys.Write(vca) + // check that values decoded all correctly + // we expect the following types, so we must create them: + var vca2 = createdat.New(timestamp.New()) + // read it in + keys.Read(b, vca2) + // check they match + + if vca.Val.I64() != vca2.Val.I64() { + t.Logf("failed to decode correctly got %v expected %v", vca2.Val, + vca.Val) + failed = true + } + } + if failed { + t.FailNow() + } + } +} diff --git a/keys/kinder/kind.go b/keys/kinder/kind.go new file mode 100644 index 0000000..abe3982 --- /dev/null +++ b/keys/kinder/kind.go @@ -0,0 +1,43 @@ +package kinder + +import ( + "bytes" + "encoding/binary" + . "nostr.mleku.dev" + "ratel.mleku.dev/keys" + + "nostr.mleku.dev/codec/kind" +) + +const Len = 2 + +type T struct { + Val *kind.T +} + +var _ keys.Element = &T{} + +// New creates a new kinder.T for reading/writing kind.T values. +func New[V uint16 | int](c V) (p *T) { return &T{Val: kind.New(c)} } + +func Make(c *kind.T) (v []byte) { + v = make([]byte, Len) + binary.BigEndian.PutUint16(v, c.K) + return +} + +func (c *T) Write(buf *bytes.Buffer) { + buf.Write(Make(c.Val)) +} + +func (c *T) Read(buf *bytes.Buffer) (el keys.Element) { + b := make([]byte, Len) + if n, err := buf.Read(b); Chk.E(err) || n != Len { + return nil + } + v := binary.BigEndian.Uint16(b) + c.Val = kind.New(v) + return c +} + +func (c *T) Len() int { return Len } diff --git a/keys/kinder/kind_test.go b/keys/kinder/kind_test.go new file mode 100644 index 0000000..4e02bf3 --- /dev/null +++ b/keys/kinder/kind_test.go @@ -0,0 +1,21 @@ +package kinder + +import ( + "bytes" + "testing" + + "nostr.mleku.dev/codec/kind" +) + +func TestT(t *testing.T) { + n := kind.New(1059) + v := New(n.ToU16()) + buf := new(bytes.Buffer) + v.Write(buf) + buf2 := bytes.NewBuffer(buf.Bytes()) + v2 := New(0) + el := v2.Read(buf2).(*T) + if el.Val.ToU16() != n.ToU16() { + t.Fatalf("expected %d got %d", n, el.Val) + } +} diff --git a/keys/pubkey/pubkey.go b/keys/pubkey/pubkey.go new file mode 100644 index 0000000..bbc22f2 --- /dev/null +++ b/keys/pubkey/pubkey.go @@ -0,0 +1,75 @@ +package pubkey + +import ( + "bytes" + "fmt" + + . "nostr.mleku.dev" + "ratel.mleku.dev/keys" + + "ec.mleku.dev/v2/schnorr" +) + +const Len = 8 + +type T struct { + Val []byte +} + +var _ keys.Element = &T{} + +// New creates a new pubkey prefix, if parameter is omitted, new one is +// allocated (for read) if more than one is given, only the first is used, and +// if the first one is not the correct hexadecimal length of 64, return error. +func New(pk ...B) (p *T, err E) { + if len(pk) < 1 { + // allows init with no parameter + return &T{make([]byte, Len)}, nil + } + // // only the first pubkey will be used + // if len(pk[0]) != schnorr.PubKeyBytesLen*2 { + // err = Log.E.Err("pubkey hex must be 64 chars, got", len(pk[0])) + // return + // } + // b, err := hex.Dec(pk[0][:Len*2]) + // if Chk.E(err) { + // return + // } + return &T{Val: pk[0][:Len]}, nil +} + +func NewFromBytes(pkb []byte) (p *T, err error) { + if len(pkb) != schnorr.PubKeyBytesLen { + err = Log.E.Err("provided key not correct length, got %d expected %d", + len(pkb), schnorr.PubKeyBytesLen) + Log.T.S(pkb) + return + } + b := make([]byte, Len) + copy(b, pkb[:Len]) + p = &T{Val: b} + return +} + +func (p *T) Write(buf *bytes.Buffer) { + if p == nil { + panic("nil pubkey") + } + if p.Val == nil || len(p.Val) != Len { + panic(fmt.Sprintln("must use New or initialize Val with len", Len)) + } + buf.Write(p.Val) +} + +func (p *T) Read(buf *bytes.Buffer) (el keys.Element) { + // allow uninitialized struct + if len(p.Val) != Len { + p.Val = make([]byte, Len) + } + if n, err := buf.Read(p.Val); Chk.E(err) || n != Len { + return nil + } + return p +} + +func (p *T) Len() int { return Len } diff --git a/keys/pubkey/pubkey_test.go b/keys/pubkey/pubkey_test.go new file mode 100644 index 0000000..62d3e45 --- /dev/null +++ b/keys/pubkey/pubkey_test.go @@ -0,0 +1,28 @@ +package pubkey + +import ( + "bytes" + . "nostr.mleku.dev" + "testing" + + "ec.mleku.dev/v2/schnorr" + "lukechampine.com/frand" +) + +func TestT(t *testing.T) { + for _ = range 10000000 { + fakePubkeyBytes := frand.Bytes(schnorr.PubKeyBytesLen) + v, err := New(fakePubkeyBytes) + if Chk.E(err) { + t.FailNow() + } + buf := new(bytes.Buffer) + v.Write(buf) + buf2 := bytes.NewBuffer(buf.Bytes()) + v2, _ := New() + el := v2.Read(buf2).(*T) + if bytes.Compare(el.Val, v.Val) != 0 { + t.Fatalf("expected %x got %x", v.Val, el.Val) + } + } +} diff --git a/keys/serial/serial.go b/keys/serial/serial.go new file mode 100644 index 0000000..32946d4 --- /dev/null +++ b/keys/serial/serial.go @@ -0,0 +1,81 @@ +package serial + +import ( + "bytes" + "encoding/binary" + "fmt" + . "nostr.mleku.dev" + "ratel.mleku.dev/keys" +) + +const Len = 8 + +// T is a badger DB serial number used for conflict free event record keys. +type T struct { + Val []byte +} + +var _ keys.Element = &T{} + +// New returns a new serial record key.Element - if nil or short slice is given, +// initialize a fresh one with Len (for reading), otherwise if equal or longer, +// trim if long and store into struct (for writing). +func New(ser []byte) (p *T) { + switch { + case len(ser) < Len: + // Log.I.Ln("empty serial") + // allows use of nil to init + ser = make([]byte, Len) + default: + ser = ser[:Len] + } + return &T{Val: ser} +} + +// FromKey expects the last Len bytes of the given slice to be the serial. +func FromKey(k []byte) (p *T) { + if len(k) < Len { + panic(fmt.Sprintf("cannot get a serial without at least 8 bytes %x", k)) + } + key := make([]byte, Len) + copy(key, k[len(k)-Len:]) + return &T{Val: key} +} + +func Make(s uint64) (ser []byte) { + ser = make([]byte, 8) + binary.BigEndian.PutUint64(ser, s) + return +} + +func (p *T) Write(buf *bytes.Buffer) { + if len(p.Val) != Len { + panic(fmt.Sprintln("must use New or initialize Val with len", Len)) + } + buf.Write(p.Val) +} + +func (p *T) Read(buf *bytes.Buffer) (el keys.Element) { + // allow uninitialized struct + if len(p.Val) != Len { + p.Val = make([]byte, Len) + } + if n, err := buf.Read(p.Val); Chk.E(err) || n != Len { + return nil + } + return p +} + +func (p *T) Len() int { return Len } +func (p *T) Uint64() (u uint64) { return binary.BigEndian.Uint64(p.Val) } + +// Match compares a key bytes to a serial, all indexes have the serial at +// the end indicating the event record they refer to, and if they match returns +// true. +func Match(index, ser []byte) bool { + l := len(index) + if l < Len { + return false + } + return bytes.Compare(index[l-Len:], ser) == 0 +} diff --git a/keys/serial/serial_test.go b/keys/serial/serial_test.go new file mode 100644 index 0000000..6be9f38 --- /dev/null +++ b/keys/serial/serial_test.go @@ -0,0 +1,22 @@ +package serial_test + +import ( + "bytes" + "ratel.mleku.dev/keys/serial" + "testing" + + "lukechampine.com/frand" +) + +func TestT(t *testing.T) { + fakeSerialBytes := frand.Bytes(serial.Len) + v := serial.New(fakeSerialBytes) + buf := new(bytes.Buffer) + v.Write(buf) + buf2 := bytes.NewBuffer(buf.Bytes()) + v2 := &serial.T{} // or can use New(nil) + el := v2.Read(buf2).(*serial.T) + if bytes.Compare(el.Val, v.Val) != 0 { + t.Fatalf("expected %x got %x", v.Val, el.Val) + } +} diff --git a/log.go b/log.go new file mode 100644 index 0000000..8e9b590 --- /dev/null +++ b/log.go @@ -0,0 +1,64 @@ +package ratel + +import ( + "fmt" + "runtime" + "strings" + + . "nostr.mleku.dev" + + "util.mleku.dev/atomic" + "util.mleku.dev/lol" +) + +func NewLogger(logLevel int, label string) (l *logger) { + Log.T.Ln("getting logger for", label) + l = &logger{Label: label} + l.Level.Store(int32(logLevel)) + return +} + +type logger struct { + Level atomic.Int32 + Label string +} + +func (l *logger) SetLogLevel(level int) { + l.Level.Store(int32(level)) +} + +func (l *logger) Errorf(s string, i ...interface{}) { + if l.Level.Load() >= lol.Error { + s = l.Label + ": " + s + txt := fmt.Sprintf(s, i...) + _, file, line, _ := runtime.Caller(2) + Log.E.F("%s %s:%d", strings.TrimSpace(txt), file, line) + } +} + +func (l *logger) Warningf(s string, i ...interface{}) { + if l.Level.Load() >= lol.Warn { + s = l.Label + ": " + s + txt := fmt.Sprintf(s, i...) + _, file, line, _ := runtime.Caller(2) + Log.W.F("%s %s:%d", strings.TrimSpace(txt), file, line) + } +} + +func (l *logger) Infof(s string, i ...interface{}) { + if l.Level.Load() >= lol.Info { + s = l.Label + ": " + s + txt := fmt.Sprintf(s, i...) + _, file, line, _ := runtime.Caller(2) + Log.T.F("%s %s:%d", strings.TrimSpace(txt), file, line) + } +} + +func (l *logger) Debugf(s string, i ...interface{}) { + if l.Level.Load() >= lol.Debug { + s = l.Label + ": " + s + txt := fmt.Sprintf(s, i...) + _, file, line, _ := runtime.Caller(2) + Log.T.F("%s %s:%d", strings.TrimSpace(txt), file, line) + } +} diff --git a/main.go b/main.go new file mode 100644 index 0000000..693d697 --- /dev/null +++ b/main.go @@ -0,0 +1,125 @@ +package ratel + +import ( + "encoding/binary" + "sync" + "time" + + "eventstore.mleku.dev" + "github.com/dgraph-io/badger/v4" + . "nostr.mleku.dev" + "ratel.mleku.dev/keys/index" + "ratel.mleku.dev/keys/serial" + "util.mleku.dev/context" +) + +type T struct { + Ctx context.T + WG *sync.WaitGroup + dataDir string + // DBSizeLimit is the number of bytes we want to keep the data store from exceeding. + DBSizeLimit int + // DBLowWater is the percentage of DBSizeLimit a GC run will reduce the used storage down + // to. + DBLowWater int + // DBHighWater is the trigger point at which a GC run should start if exceeded. + DBHighWater int + // GCFrequency is the frequency of checks of the current utilisation. + GCFrequency time.Duration + HasL2 bool + BlockCacheSize int + InitLogLevel int + Logger *logger + // DB is the badger db enveloper + *badger.DB + // seq is the monotonic collision free index for raw event storage. + seq *badger.Sequence + // Threads is how many CPU threads we dedicate to concurrent actions, flatten and GC mark + Threads int + // MaxLimit is a default limit that applies to a query without a limit, to avoid sending out + // too many events to a client from a malformed or excessively broad filter. + MaxLimit int + // ActuallyDelete sets whether we actually delete or rewrite deleted entries with a modified + // deleted prefix value (8th bit set) + ActuallyDelete bool +} + +var _ eventstore.I = (*T)(nil) + +// GetBackend returns a reasonably configured badger.Backend. +// +// The variadic params correspond to DBSizeLimit, DBLowWater, DBHighWater and +// GCFrequency as an integer multiplier of number of seconds. +// +// Note that the cancel function for the context needs to be managed by the +// caller. +func GetBackend(Ctx context.T, WG *sync.WaitGroup, path S, hasL2 bool, + blockCacheSize, logLevel, + maxLimit int, params ...int) (b *T) { + var sizeLimit, lw, hw, freq = 0, 86, 92, 60 + switch len(params) { + case 4: + freq = params[3] + fallthrough + case 3: + hw = params[2] + fallthrough + case 2: + lw = params[1] + fallthrough + case 1: + sizeLimit = params[0] + } + // if unset, assume a safe maximum limit for unlimited filters. + if maxLimit == 0 { + maxLimit = 512 + } + b = &T{ + Ctx: Ctx, + WG: WG, + DBSizeLimit: sizeLimit, + DBLowWater: lw, + DBHighWater: hw, + GCFrequency: time.Duration(freq) * time.Second, + HasL2: hasL2, + BlockCacheSize: blockCacheSize, + InitLogLevel: logLevel, + MaxLimit: maxLimit, + dataDir: path, + } + return +} + +func (r *T) Path() S { return r.dataDir } + +// SerialKey returns a key used for storing events, and the raw serial counter +// bytes to copy into index keys. +func (r *T) SerialKey() (idx []byte, ser *serial.T) { + var err error + var s []byte + if s, err = r.SerialBytes(); Chk.E(err) { + panic(err) + } + ser = serial.New(s) + return index.Event.Key(ser), ser +} + +// Serial returns the next monotonic conflict free unique serial on the database. +func (r *T) Serial() (ser uint64, err error) { + if ser, err = r.seq.Next(); Chk.E(err) { + } + // Log.T.F("serial %x", ser) + return +} + +// SerialBytes returns a new serial value, used to store an event record with a +// conflict-free unique code (it is a monotonic, atomic, ascending counter). +func (r *T) SerialBytes() (ser []byte, err error) { + var serU64 uint64 + if serU64, err = r.Serial(); Chk.E(err) { + panic(err) + } + ser = make([]byte, serial.Len) + binary.BigEndian.PutUint64(ser, serU64) + return +} diff --git a/nuke.go b/nuke.go new file mode 100644 index 0000000..4eb18a5 --- /dev/null +++ b/nuke.go @@ -0,0 +1,28 @@ +package ratel + +import ( + . "nostr.mleku.dev" + "ratel.mleku.dev/keys/index" +) + +func (r *T) Nuke() (err E) { + Log.W.F("nukening database at %s", r.dataDir) + if err = r.DB.DropPrefix([][]byte{ + {index.Event.B()}, + {index.CreatedAt.B()}, + {index.Id.B()}, + {index.Kind.B()}, + {index.Pubkey.B()}, + {index.PubkeyKind.B()}, + {index.Tag.B()}, + {index.Tag32.B()}, + {index.TagAddr.B()}, + {index.Counter.B()}, + }...); Chk.E(err) { + return + } + if err = r.DB.RunValueLogGC(0.8); Chk.E(err) { + return + } + return +} diff --git a/preparequeries.go b/preparequeries.go new file mode 100644 index 0000000..a6d7b6e --- /dev/null +++ b/preparequeries.go @@ -0,0 +1,184 @@ +package ratel + +import ( + "encoding/binary" + "fmt" + "math" + + . "nostr.mleku.dev" + "ratel.mleku.dev/keys/id" + "ratel.mleku.dev/keys/index" + "ratel.mleku.dev/keys/kinder" + "ratel.mleku.dev/keys/pubkey" + "ratel.mleku.dev/keys/serial" + + "nostr.mleku.dev/codec/event" + "nostr.mleku.dev/codec/eventid" + "nostr.mleku.dev/codec/filter" + "nostr.mleku.dev/codec/timestamp" +) + +type Results struct { + Ev *event.T + TS *timestamp.T + Ser *serial.T +} + +type query struct { + index int + queryFilter *filter.T + searchPrefix []byte + start []byte + skipTS bool +} + +// PrepareQueries analyses a filter and generates a set of query specs that produce +// key prefixes to search for in the badger key indexes. +func PrepareQueries(f *filter.T) ( + qs []query, + ext *filter.T, + since uint64, + err error, +) { + if f == nil { + err = Errorf.E("filter cannot be nil") + } + switch { + // first if there is IDs, just search for them, this overrides all other filters + case len(f.IDs.Field) > 0: + qs = make([]query, f.IDs.Len()) + for i, idHex := range f.IDs.Field { + ih := id.New(eventid.NewWith(B(idHex))) + if ih == nil { + Log.E.F("failed to decode event ID: %s", idHex) + // just ignore it, clients will be clients + continue + } + prf := index.Id.Key(ih) + // Log.T.F("id prefix to search on %0x from key %0x", prf, ih.Val) + qs[i] = query{ + index: i, + queryFilter: f, + searchPrefix: prf, + skipTS: true, + } + } + // Log.T.S("ids", qs) + // second we make a set of queries based on author pubkeys, optionally with kinds + case f.Authors.Len() > 0: + // if there is no kinds, we just make the queries based on the author pub keys + if f.Kinds.Len() == 0 { + qs = make([]query, f.Authors.Len()) + for i, pubkeyHex := range f.Authors.Field { + var pk *pubkey.T + if pk, err = pubkey.New(pubkeyHex); Chk.E(err) { + // bogus filter, continue anyway + continue + } + sp := index.Pubkey.Key(pk) + // Log.I.F("search only for authors %0x from pub key %0x", sp, pk.Val) + qs[i] = query{ + index: i, + queryFilter: f, + searchPrefix: sp, + } + } + // Log.I.S("authors", qs) + } else { + // if there is kinds as well, we are searching via the kind/pubkey prefixes + qs = make([]query, f.Authors.Len()*f.Kinds.Len()) + i := 0 + authors: + for _, pubkeyHex := range f.Authors.Field { + for _, kind := range f.Kinds.K { + var pk *pubkey.T + if pk, err = pubkey.New(pubkeyHex); Chk.E(err) { + // skip this dodgy thing + continue authors + } + ki := kinder.New(kind.K) + sp := index.PubkeyKind.Key(pk, ki) + // Log.T.F("search for authors from pub key %0x and kind %0x", pk.Val, ki.Val) + qs[i] = query{index: i, queryFilter: f, searchPrefix: sp} + i++ + } + } + // Log.T.S("authors/kinds", qs) + } + if f.Tags != nil && f.Tags.T != nil || f.Tags.Len() > 0 { + ext = &filter.T{Tags: f.Tags} + // Log.T.S("extra filter", ext) + } + case f.Tags.Len() > 0: + // determine the size of the queries array by inspecting all tags sizes + size := 0 + for _, values := range f.Tags.T { + size += values.Len() - 1 + } + if size == 0 { + return nil, nil, 0, fmt.Errorf("empty tag filters") + } + // we need a query for each tag search + qs = make([]query, size) + // and any kinds mentioned as well in extra filter + ext = &filter.T{Kinds: f.Kinds} + i := 0 + Log.T.S(f.Tags.T) + for _, values := range f.Tags.T { + Log.T.S(values.Field) + for _, value := range values.Field[1:] { + // get key prefix (with full length) and offset where to write the last parts + var prf []byte + if prf, err = GetTagKeyPrefix(S(value)); Chk.E(err) { + continue + } + // remove the last part to get just the prefix we want here + Log.T.F("search for tags from %0x", prf) + qs[i] = query{index: i, queryFilter: f, searchPrefix: prf} + i++ + } + } + // Log.T.S("tags", qs) + case f.Kinds.Len() > 0: + // if there is no ids, pubs or tags, we are just searching for kinds + qs = make([]query, f.Kinds.Len()) + for i, kind := range f.Kinds.K { + kk := kinder.New(kind.K) + ki := index.Kind.Key(kk) + qs[i] = query{ + index: i, + queryFilter: f, + searchPrefix: ki, + } + } + // Log.T.S("kinds", qs) + default: + if len(qs) > 0 { + qs[0] = query{index: 0, queryFilter: f, + searchPrefix: index.CreatedAt.Key()} + ext = nil + } + // Log.T.S("other", qs) + } + var until uint64 = math.MaxUint64 + if f.Until != nil { + if fu := uint64(*f.Until); fu < until { + until = fu - 1 + } + } + for i, q := range qs { + qs[i].start = binary.BigEndian.AppendUint64(q.searchPrefix, until) + } + // this is where we'll end the iteration + if f.Since != nil { + if fs := uint64(*f.Since); fs > since { + since = fs + } + } + // if we got an empty filter, we still need a query for scraping the newest + if len(qs) == 0 { + qs = append(qs, query{index: 0, queryFilter: f, searchPrefix: B{1}, + start: B{1, 255, 255, 255, 255, 255, 255, 255, 255}}) + } + return +} diff --git a/queryevents.go b/queryevents.go new file mode 100644 index 0000000..7ebdd75 --- /dev/null +++ b/queryevents.go @@ -0,0 +1,159 @@ +package ratel + +import ( + "github.com/dgraph-io/badger/v4" + sha256 "github.com/minio/sha256-simd" + . "nostr.mleku.dev" + "nostr.mleku.dev/codec/event" + "nostr.mleku.dev/codec/filter" + "nostr.mleku.dev/codec/tag" + "ratel.mleku.dev/keys/createdat" + "ratel.mleku.dev/keys/index" + "ratel.mleku.dev/keys/serial" +) + +func (r *T) QueryEvents(c Ctx, f *filter.T) (evs []*event.T, err E) { + Log.T.F("query for events\n%s", f) + var queries []query + var extraFilter *filter.T + var since uint64 + if queries, extraFilter, since, err = PrepareQueries(f); Chk.E(err) { + return + } + var limit bool + if f.Limit != 0 { + Log.T.S("query has a limit") + limit = true + } + Log.T.S(queries, extraFilter) + // search for the keys generated from the filter + var eventKeys [][]byte + for _, q := range queries { + Log.T.S(q, extraFilter) + err = r.View(func(txn *badger.Txn) (err E) { + // iterate only through keys and in reverse order + opts := badger.IteratorOptions{ + Reverse: true, + } + it := txn.NewIterator(opts) + defer it.Close() + // for it.Rewind(); it.Valid(); it.Next() { + for it.Seek(q.start); it.ValidForPrefix(q.searchPrefix); it.Next() { + item := it.Item() + k := item.KeyCopy(nil) + Log.T.S(k) + if !q.skipTS { + if len(k) < createdat.Len+serial.Len { + continue + } + createdAt := createdat.FromKey(k) + Log.T.F("%d < %d", createdAt.Val.U64(), since) + if createdAt.Val.U64() < since { + break + } + } + ser := serial.FromKey(k) + eventKeys = append(eventKeys, index.Event.Key(ser)) + } + return + }) + if Chk.E(err) { + // this can't actually happen because the View function above does not set err. + } + search: + for _, eventKey := range eventKeys { + // Log.I.S(eventKey) + var v B + err = r.View(func(txn *badger.Txn) (err E) { + opts := badger.IteratorOptions{Reverse: true} + it := txn.NewIterator(opts) + defer it.Close() + // for it.Rewind(); it.Valid(); it.Next() { + for it.Seek(eventKey); it.ValidForPrefix(eventKey); it.Next() { + item := it.Item() + k := item.KeyCopy(nil) + Log.T.S(k) + if v, err = item.ValueCopy(nil); Chk.E(err) { + continue + } + if r.HasL2 && len(v) == sha256.Size { + // this is a stub entry that indicates an L2 needs to be accessed for it, so + // we populate only the event.T.ID and return the result, the caller will + // expect this as a signal to query the L2 event store. + ev := &event.T{} + Log.T.F("found event stub %0x must seek in L2", v) + ev.ID = v + select { + case <-c.Done(): + return + case <-r.Ctx.Done(): + Log.T.Ln("backend context canceled") + return + default: + } + evs = append(evs, ev) + return + } + } + return + }) + if v == nil { + continue + } + ev := &event.T{} + var rem B + if rem, err = ev.UnmarshalBinary(v); Chk.E(err) { + return + } + Log.T.S(ev) + if len(rem) > 0 { + Log.T.S(rem) + } + // check if this matches the other filters that were not part of the index. + if extraFilter == nil || extraFilter.Matches(ev) { + // check if this event is replaced by one we already have in the result. + if ev.Kind.IsReplaceable() { + for _, evc := range evs { + // replaceable means there should be only the newest for the pubkey and + // kind. + if Equals(ev.PubKey, evc.PubKey) && ev.Kind.Equal(evc.Kind) { + // we won't add it to the results slice + continue search + } + } + } + if ev.Kind.IsParameterizedReplaceable() && + ev.Tags.GetFirst(tag.New("d")) != nil { + for _, evc := range evs { + // parameterized replaceable means there should only be the newest for a + // pubkey, kind and the value field of the `d` tag. + if ev.Kind.Equal(evc.Kind) && Equals(ev.PubKey, evc.PubKey) && + Equals(ev.Tags.GetFirst(tag.New("d")).Value(), + ev.Tags.GetFirst(tag.New("d")).Value()) { + // we won't add it to the results slice + continue search + } + } + } + Log.T.F("sending back result\n%s\n", ev) + evs = append(evs, ev) + if limit { + f.Limit-- + if f.Limit == 0 { + return + } + } else { + // if there is no limit, cap it at the MaxLimit, assume this was the intent + // or the client is erroneous, if any limit greater is requested this will + // be used instead as the previous clause. + if len(evs) > r.MaxLimit { + return + } + } + } + } + } + Log.T.S(evs) + Log.T.Ln("query complete") + return +} diff --git a/saveevent.go b/saveevent.go new file mode 100644 index 0000000..fe489b4 --- /dev/null +++ b/saveevent.go @@ -0,0 +1,130 @@ +package ratel + +import ( + "fmt" + + "eventstore.mleku.dev" + "github.com/dgraph-io/badger/v4" + sha256 "github.com/minio/sha256-simd" + . "nostr.mleku.dev" + "nostr.mleku.dev/codec/event" + "nostr.mleku.dev/codec/eventid" + "nostr.mleku.dev/codec/timestamp" + "ratel.mleku.dev/keys" + "ratel.mleku.dev/keys/createdat" + "ratel.mleku.dev/keys/id" + "ratel.mleku.dev/keys/index" + "ratel.mleku.dev/keys/serial" +) + +func (r *T) SaveEvent(c Ctx, ev *event.T) (err E) { + if ev.Kind.IsEphemeral() { + Log.T.F("not saving ephemeral event\n%s", ev.Serialize()) + // send it out + return + } + Log.T.C(func() S { + evs, _ := ev.MarshalJSON(nil) + return fmt.Sprintf("saving event\n%d %s", len(evs), evs) + }) + // make sure Close waits for this to complete + r.WG.Add(1) + defer r.WG.Done() + // first, search to see if the event ID already exists. + var foundSerial []byte + seri := serial.New(nil) + err = r.View(func(txn *badger.Txn) (err error) { + // query event by id to ensure we don't try to save duplicates + prf := index.Id.Key(id.New(eventid.NewWith(ev.ID))) + it := txn.NewIterator(badger.IteratorOptions{}) + defer it.Close() + it.Seek(prf) + if it.ValidForPrefix(prf) { + var k []byte + // get the serial + k = it.Item().Key() + // copy serial out + keys.Read(k, index.Empty(), id.New(eventid.New()), seri) + // save into foundSerial + foundSerial = seri.Val + } + return + }) + if Chk.E(err) { + return + } + if foundSerial != nil { + Log.T.Ln("found possible duplicate or stub for %s", ev) + err = r.Update(func(txn *badger.Txn) (err error) { + // retrieve the event record + evKey := keys.Write(index.New(index.Event), seri) + it := txn.NewIterator(badger.IteratorOptions{}) + defer it.Close() + it.Seek(evKey) + if it.ValidForPrefix(evKey) { + if it.Item().ValueSize() != sha256.Size { + // not a stub, we already have it + Log.T.Ln("duplicate event", ev.ID) + return eventstore.ErrDupEvent + } + // we only need to restore the event binary and write the access counter key + // encode to binary + var bin B + if bin, err = ev.MarshalBinary(bin); Chk.E(err) { + return + } + if err = txn.Set(it.Item().Key(), bin); Chk.E(err) { + return + } + // bump counter key + counterKey := GetCounterKey(seri) + val := keys.Write(createdat.New(timestamp.Now())) + if err = txn.Set(counterKey, val); Chk.E(err) { + return + } + return + } + return + }) + // if it was a dupe, we are done. + if err != nil { + return + } + return + } + var bin B + if bin, err = ev.MarshalBinary(bin); Chk.E(err) { + return + } + Log.T.F("saving event to badger %s", ev) + // otherwise, save new event record. + if err = r.Update(func(txn *badger.Txn) (err error) { + var idx []byte + var ser *serial.T + idx, ser = r.SerialKey() + // encode to binary + // raw event store + if err = txn.Set(idx, bin); Chk.E(err) { + return + } + // add the indexes + var indexKeys [][]byte + indexKeys = GetIndexKeysForEvent(ev, ser) + for _, k := range indexKeys { + if err = txn.Set(k, nil); Chk.E(err) { + return + } + } + // initialise access counter key + counterKey := GetCounterKey(ser) + val := keys.Write(createdat.New(timestamp.Now())) + if err = txn.Set(counterKey, val); Chk.E(err) { + return + } + Log.T.F("event saved %0x %s", ev.ID, r.dataDir) + return + }); Chk.E(err) { + return + } + return +}