initial commit

This commit is contained in:
2024-09-11 11:19:58 +01:00
commit 6688f6ffc8
40 changed files with 2162 additions and 0 deletions

8
.idea/.gitignore generated vendored Normal file
View File

@@ -0,0 +1,8 @@
# Default ignored files
/shelf/
/workspace.xml
# Editor-based HTTP Client requests
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml

17
.idea/material_theme_project_new.xml generated Normal file
View File

@@ -0,0 +1,17 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="MaterialThemeProjectNewConfig">
<option name="metadata">
<MTProjectMetadataState>
<option name="migrated" value="true" />
<option name="pristineConfig" value="false" />
<option name="userId" value="3591a73d:191bebadc43:-7ff9" />
</MTProjectMetadataState>
</option>
<option name="titleBarState">
<MTProjectTitleBarConfigState>
<option name="overrideColor" value="false" />
</MTProjectTitleBarConfigState>
</option>
</component>
</project>

8
.idea/modules.xml generated Normal file
View File

@@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/ratel.mleku.dev.iml" filepath="$PROJECT_DIR$/.idea/ratel.mleku.dev.iml" />
</modules>
</component>
</project>

13
.idea/ratel.mleku.dev.iml generated Normal file
View File

@@ -0,0 +1,13 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="WEB_MODULE" version="4">
<component name="Go" enabled="true">
<buildTags>
<option name="os" value="linux" />
</buildTags>
</component>
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

6
.idea/vcs.xml generated Normal file
View File

@@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>

22
close.go Normal file
View File

@@ -0,0 +1,22 @@
package ratel
import (
. "nostr.mleku.dev"
)
func (r *T) Close() (err E) {
Log.I.F("closing database %s", r.Path())
if err = r.DB.Flatten(4); Chk.E(err) {
return
}
Log.D.F("database flattened")
if err = r.seq.Release(); Chk.E(err) {
return
}
Log.D.F("database released")
if err = r.DB.Close(); Chk.E(err) {
return
}
Log.I.F("database closed")
return
}

14
countevents.go Normal file
View File

@@ -0,0 +1,14 @@
package ratel
import (
. "nostr.mleku.dev"
"nostr.mleku.dev/codec/event"
"nostr.mleku.dev/codec/filter"
)
func (r *T) CountEvents(c Ctx, f *filter.T) (count N, err E) {
var evs []*event.T
evs, err = r.QueryEvents(c, f)
count = len(evs)
return
}

9
del/del.go Normal file
View File

@@ -0,0 +1,9 @@
package del
import "bytes"
type Items [][]byte
func (c Items) Len() int { return len(c) }
func (c Items) Less(i, j int) bool { return bytes.Compare(c[i], c[j]) < 0 }
func (c Items) Swap(i, j int) { c[i], c[j] = c[j], c[i] }

79
deleteevent.go Normal file
View File

@@ -0,0 +1,79 @@
package ratel
import (
"github.com/dgraph-io/badger/v4"
. "nostr.mleku.dev"
"nostr.mleku.dev/codec/event"
"nostr.mleku.dev/codec/eventid"
"ratel.mleku.dev/keys"
"ratel.mleku.dev/keys/id"
"ratel.mleku.dev/keys/index"
"ratel.mleku.dev/keys/serial"
)
func (r *T) DeleteEvent(c Ctx, eid *eventid.T) (err E) {
var foundSerial []byte
seri := serial.New(nil)
err = r.View(func(txn *badger.Txn) (err error) {
// query event by id to ensure we don't try to save duplicates
prf := index.Id.Key(id.New(eid))
it := txn.NewIterator(badger.IteratorOptions{})
defer it.Close()
it.Seek(prf)
if it.ValidForPrefix(prf) {
var k []byte
// get the serial
k = it.Item().Key()
// copy serial out
keys.Read(k, index.Empty(), id.New(eventid.New()), seri)
// save into foundSerial
foundSerial = seri.Val
}
return
})
if Chk.E(err) {
return
}
if foundSerial == nil {
return
}
var indexKeys []B
ev := &event.T{}
var evKey, evb, counterKey B
// fetch the event to get its index keys
err = r.View(func(txn *badger.Txn) (err error) {
// retrieve the event record
evKey = keys.Write(index.New(index.Event), seri)
it := txn.NewIterator(badger.IteratorOptions{})
defer it.Close()
it.Seek(evKey)
if it.ValidForPrefix(evKey) {
if evb, err = it.Item().ValueCopy(evb); Chk.E(err) {
return
}
if _, err = ev.MarshalJSON(evb); Chk.E(err) {
return
}
indexKeys = GetIndexKeysForEvent(ev, seri)
counterKey = GetCounterKey(seri)
return
}
return
})
if Chk.E(err) {
return
}
err = r.Update(func(txn *badger.Txn) (err E) {
if err = txn.Delete(evKey); Chk.E(err) {
}
for _, key := range indexKeys {
if err = txn.Delete(key); Chk.E(err) {
}
}
if err = txn.Delete(counterKey); Chk.E(err) {
return
}
return
})
return
}

14
getecounterkey.go Normal file
View File

@@ -0,0 +1,14 @@
package ratel
import (
. "nostr.mleku.dev"
"ratel.mleku.dev/keys/index"
"ratel.mleku.dev/keys/serial"
)
// GetCounterKey returns the proper counter key for a given event ID.
func GetCounterKey(ser *serial.T) (key B) {
key = index.Counter.Key(ser)
// Log.T.F("counter key %d %d", index.Counter, ser.Uint64())
return
}

93
getindexkeysforevent.go Normal file
View File

@@ -0,0 +1,93 @@
package ratel
import (
. "nostr.mleku.dev"
"nostr.mleku.dev/codec/event"
"nostr.mleku.dev/codec/eventid"
"nostr.mleku.dev/codec/tag"
"ratel.mleku.dev/keys"
"ratel.mleku.dev/keys/createdat"
"ratel.mleku.dev/keys/id"
"ratel.mleku.dev/keys/index"
"ratel.mleku.dev/keys/kinder"
"ratel.mleku.dev/keys/pubkey"
"ratel.mleku.dev/keys/serial"
)
// GetIndexKeysForEvent generates all the index keys required to filter for
// events. evtSerial should be the output of Serial() which gets a unique,
// monotonic counter value for each new event.
func GetIndexKeysForEvent(ev *event.T, ser *serial.T) (keyz [][]byte) {
var err error
keyz = make([][]byte, 0, 18)
ID := id.New(eventid.NewWith(ev.ID))
CA := createdat.New(ev.CreatedAt)
K := kinder.New(ev.Kind.ToU16())
PK, _ := pubkey.New(ev.PubKey)
// indexes
{ // ~ by id
k := index.Id.Key(ID, ser)
// Log.T.F("id key: %x %0x %0x", k[0], k[1:9], k[9:])
keyz = append(keyz, k)
}
{ // ~ by pubkey+date
k := index.Pubkey.Key(PK, CA, ser)
// Log.T.F("pubkey + date key: %x %0x %0x %0x",
// k[0], k[1:9], k[9:17], k[17:])
keyz = append(keyz, k)
}
{ // ~ by kind+date
k := index.Kind.Key(K, CA, ser)
Log.T.F("kind + date key: %x %0x %0x %0x",
k[0], k[1:3], k[3:11], k[11:])
keyz = append(keyz, k)
}
{ // ~ by pubkey+kind+date
k := index.PubkeyKind.Key(PK, K, CA, ser)
// Log.T.F("pubkey + kind + date key: %x %0x %0x %0x %0x",
// k[0], k[1:9], k[9:11], k[11:19], k[19:])
keyz = append(keyz, k)
}
// ~ by tag value + date
for i, t := range ev.Tags.T {
// there is no value field
if len(t.Field) < 2 ||
// the tag is not a-zA-Z probably (this would permit arbitrary other
// single byte chars)
len(t.Field[0]) != 1 ||
// the second field is zero length
len(t.Field[1]) == 0 ||
// the second field is more than 100 characters long
len(t.Field[1]) > 100 {
// any of the above is true then the tag is not indexable
continue
}
var firstIndex int
var tt *tag.T
for firstIndex, tt = range ev.Tags.T {
if len(tt.Field) >= 2 && Equals(tt.Field[1], t.Field[1]) {
break
}
}
if firstIndex != i {
// duplicate
continue
}
// get key prefix (with full length) and offset where to write the last
// parts
prf, elems := index.P(0), []keys.Element(nil)
if prf, elems, err = GetTagKeyElements(S(t.Field[1]), CA, ser); Chk.E(err) {
return
}
k := prf.Key(elems...)
Log.T.F("tag '%s': %s key %0x", t.Field[0], t.Field[1:], k)
keyz = append(keyz, k)
}
{ // ~ by date only
k := index.CreatedAt.Key(CA, ser)
// Log.T.F("date key: %x %0x %0x", k[0], k[1:9], k[9:])
keyz = append(keyz, k)
}
return
}

69
gettagkeyelements.go Normal file
View File

@@ -0,0 +1,69 @@
package ratel
import (
"strconv"
"strings"
. "nostr.mleku.dev"
"ratel.mleku.dev/keys"
"ratel.mleku.dev/keys/arb"
"ratel.mleku.dev/keys/createdat"
"ratel.mleku.dev/keys/index"
"ratel.mleku.dev/keys/kinder"
"ratel.mleku.dev/keys/pubkey"
"ratel.mleku.dev/keys/serial"
"ec.mleku.dev/v2/schnorr"
"util.mleku.dev/hex"
)
func GetTagKeyElements(tagValue string, CA *createdat.T,
ser *serial.T) (prf index.P,
elems []keys.Element, err error) {
var pkb []byte
// first check if it might be a public key, fastest test
if len(tagValue) == 2*schnorr.PubKeyBytesLen {
// this could be a pubkey
pkb, err = hex.Dec(tagValue)
if err == nil {
// it's a pubkey
var pkk keys.Element
if pkk, err = pubkey.NewFromBytes(pkb); Chk.E(err) {
return
}
prf, elems = index.Tag32, keys.Make(pkk, ser)
return
}
}
// check for a tag
if strings.Count(tagValue, ":") == 2 {
// this means we will get 3 pieces here
split := strings.Split(tagValue, ":")
// middle element should be a public key so must be 64 hex ciphers
if len(split[1]) != schnorr.PubKeyBytesLen*2 {
return
}
var k uint16
var d string
if pkb, err = hex.Dec(split[1]); !Chk.E(err) {
var kin uint64
if kin, err = strconv.ParseUint(split[0], 10, 16); err == nil {
k = uint16(kin)
d = split[2]
var pk *pubkey.T
if pk, err = pubkey.NewFromBytes(pkb); Chk.E(err) {
return
}
prf = index.TagAddr
elems = keys.Make(kinder.New(k), pk, arb.NewFromString(d), CA,
ser)
return
}
}
}
// store whatever as utf-8
prf = index.Tag
elems = keys.Make(arb.NewFromString(tagValue), CA, ser)
return
}

56
gettagkeyprefix.go Normal file
View File

@@ -0,0 +1,56 @@
package ratel
import (
"eventstore.mleku.dev"
. "nostr.mleku.dev"
"ratel.mleku.dev/keys"
"ratel.mleku.dev/keys/arb"
"ratel.mleku.dev/keys/index"
"ratel.mleku.dev/keys/kinder"
"ratel.mleku.dev/keys/pubkey"
"util.mleku.dev/hex"
)
// GetTagKeyPrefix returns tag index prefixes based on the initial field of a
// tag.
//
// There is 3 types of index tag keys:
//
// - TagAddr: [ 8 ][ 2b Kind ][ 8b Pubkey ][ address/URL ][ 8b Serial ]
//
// - Tag32: [ 7 ][ 8b Pubkey ][ 8b Serial ]
//
// - Tag: [ 6 ][ address/URL ][ 8b Serial ]
//
// This function produces the initial bytes without the index.
func GetTagKeyPrefix(tagValue string) (key []byte, err error) {
if k, pkb, d := eventstore.GetAddrTagElements(tagValue); len(pkb) == 32 {
// store value in the new special "a" tag index
var pk *pubkey.T
if pk, err = pubkey.NewFromBytes(pkb); Chk.E(err) {
return
}
els := []keys.Element{kinder.New(k), pk}
if len(d) > 0 {
els = append(els, arb.NewFromString(d))
}
key = index.TagAddr.Key(els...)
} else if pkb, _ := hex.Dec(tagValue); len(pkb) == 32 {
// store value as bytes
var pkk *pubkey.T
if pkk, err = pubkey.NewFromBytes(pkb); Chk.E(err) {
return
}
key = index.Tag32.Key(pkk)
} else {
// store whatever as utf-8
if len(tagValue) > 0 {
var a *arb.T
a = arb.NewFromString(tagValue)
key = index.Tag.Key(a)
} else {
key = index.Tag.Key()
}
}
return
}

3
go.mod Normal file
View File

@@ -0,0 +1,3 @@
module ratel.mleku.dev
go 1.22.7

9
go.work Normal file
View File

@@ -0,0 +1,9 @@
go 1.22.7
use (
.
)
replace nostr.mleku.dev => ../nostr.mleku.dev
replace util.mleku.dev => ../util.mleku.dev
replace eventstore.mleku.dev => ../eventstore.mleku.dev

98
init.go Normal file
View File

@@ -0,0 +1,98 @@
package ratel
import (
"encoding/binary"
"errors"
"fmt"
. "nostr.mleku.dev"
"ratel.mleku.dev/keys/index"
"github.com/dgraph-io/badger/v4"
"github.com/dgraph-io/badger/v4/options"
"util.mleku.dev/units"
)
func (r *T) Init(path S) (err E) {
r.dataDir = path
Log.I.Ln("opening ratel event store at", r.Path())
opts := badger.DefaultOptions(r.dataDir)
opts.BlockCacheSize = int64(r.BlockCacheSize)
opts.BlockSize = units.Mb
opts.CompactL0OnClose = true
opts.LmaxCompaction = true
opts.Compression = options.None
// opts.Compression = options.ZSTD
r.Logger = NewLogger(r.InitLogLevel, r.dataDir)
opts.Logger = r.Logger
if r.DB, err = badger.Open(opts); Chk.E(err) {
return err
}
Log.T.Ln("getting event store sequence index", r.dataDir)
if r.seq, err = r.DB.GetSequence([]byte("events"), 1000); Chk.E(err) {
return err
}
Log.T.Ln("running migrations", r.dataDir)
if err = r.runMigrations(); Chk.E(err) {
return Log.E.Err("error running migrations: %w; %s", err, r.dataDir)
}
if r.DBSizeLimit > 0 {
// go r.GarbageCollector()
} else {
// go r.GCCount()
}
return nil
}
const Version = 1
func (r *T) runMigrations() (err error) {
return r.Update(func(txn *badger.Txn) (err error) {
var version uint16
var item *badger.Item
item, err = txn.Get([]byte{index.Version.B()})
if errors.Is(err, badger.ErrKeyNotFound) {
version = 0
} else if Chk.E(err) {
return err
} else {
Chk.E(item.Value(func(val []byte) (err error) {
version = binary.BigEndian.Uint16(val)
return
}))
}
// do the migrations in increasing steps (there is no rollback)
if version < Version {
// if there is any data in the relay we will stop and notify the user, otherwise we
// just set version to 1 and proceed
prefix := []byte{index.Id.B()}
it := txn.NewIterator(badger.IteratorOptions{
PrefetchValues: true,
PrefetchSize: 100,
Prefix: prefix,
})
defer it.Close()
hasAnyEntries := false
for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
hasAnyEntries = true
break
}
if hasAnyEntries {
return fmt.Errorf("your database is at version %d, but in order to migrate up "+
"to version 1 you must manually export all the events and then import "+
"again:\n"+
"run an old version of this software, export the data, then delete the "+
"database files, run the new version, import the data back it", version)
}
Chk.E(r.bumpVersion(txn, Version))
}
return nil
})
}
func (r *T) bumpVersion(txn *badger.Txn, version uint16) error {
buf := make([]byte, 2)
binary.BigEndian.PutUint16(buf, version)
return txn.Set([]byte{index.Version.B()}, buf)
}

85
keys/arb/arb.go Normal file
View File

@@ -0,0 +1,85 @@
package arb
import (
"bytes"
. "nostr.mleku.dev"
"ratel.mleku.dev/keys"
)
// T is an arbitrary length byte string. In any construction there can only be one with arbitrary length. Custom lengths
// can be created by calling New with the custom length in it, both for Read and Write operations.
type T struct {
Val B
}
var _ keys.Element = &T{}
// New creates a new arb.T. This must have the expected length for the provided byte slice as this is what the Read
// method will aim to copy. In general this will be a bounded field, either the final or only arbitrary length field in
// a key.
func New(b B) (p *T) {
if len(b) == 0 {
Log.T.Ln("empty or nil slice is the same as zero value, " +
"use keys.ReadWithArbElem")
return &T{}
}
return &T{Val: b}
}
func NewWithLen(l int) (p *T) { return &T{Val: make([]byte, l)} }
func NewFromString(s S) (p *T) { return New([]byte(s)) }
func (p *T) Write(buf *bytes.Buffer) {
if len(p.Val) == 0 {
Log.T.Ln("empty slice has no effect")
return
}
buf.Write(p.Val)
}
func (p *T) Read(buf *bytes.Buffer) (el keys.Element) {
if len(p.Val) < 1 {
Log.T.Ln("empty slice has no effect")
return
}
if _, err := buf.Read(p.Val); Chk.E(err) {
return nil
}
return p
}
func (p *T) Len() int {
if p == nil {
panic("uninitialized pointer to arb.T")
}
return len(p.Val)
}
// ReadWithArbElem is a variant of Read that recognises an arbitrary length element by its zero length and imputes its
// actual length by the byte buffer size and the lengths of the fixed length fields.
//
// For reasons of space efficiency, it is not practical to use TLVs for badger database key fields, so this will panic
// if there is more than one arbitrary length element.
func ReadWithArbElem(b B, elems ...keys.Element) {
var arbEl int
var arbSet bool
l := len(b)
for i, el := range elems {
elLen := el.Len()
l -= elLen
if elLen == 0 {
if arbSet {
panic("cannot have more than one arbitrary length field in a key")
}
arbEl = i
arbSet = true
}
}
// now we can say that the remainder is the correct length for the arb element
elems[arbEl] = New(make([]byte, l))
buf := bytes.NewBuffer(b)
for _, el := range elems {
el.Read(buf)
}
}

22
keys/arb/arb_test.go Normal file
View File

@@ -0,0 +1,22 @@
package arb
import (
"bytes"
"testing"
"lukechampine.com/frand"
)
func TestT(t *testing.T) {
randomBytes := frand.Bytes(frand.Intn(128))
v := New(randomBytes)
buf := new(bytes.Buffer)
v.Write(buf)
randomCopy := make([]byte, len(randomBytes))
buf2 := bytes.NewBuffer(buf.Bytes())
v2 := New(randomCopy)
el := v2.Read(buf2).(*T)
if bytes.Compare(el.Val, v.Val) != 0 {
t.Fatalf("expected %x got %x", v.Val, el.Val)
}
}

45
keys/count/count.go Normal file
View File

@@ -0,0 +1,45 @@
package count
import (
"nostr.mleku.dev/codec/timestamp"
)
type Item struct {
Serial uint64
Size uint32
Freshness *timestamp.T
}
type Items []*Item
func (c Items) Len() int { return len(c) }
func (c Items) Less(i, j int) bool { return c[i].Freshness.I64() < c[j].Freshness.I64() }
func (c Items) Swap(i, j int) { c[i], c[j] = c[j], c[i] }
func (c Items) Total() (total int) {
for i := range c {
total += int(c[i].Size)
}
return
}
type ItemsBySerial []*Item
func (c ItemsBySerial) Len() int { return len(c) }
func (c ItemsBySerial) Less(i, j int) bool { return c[i].Serial < c[j].Serial }
func (c ItemsBySerial) Swap(i, j int) { c[i], c[j] = c[j], c[i] }
func (c ItemsBySerial) Total() (total int) {
for i := range c {
total += int(c[i].Size)
}
return
}
type Fresh struct {
Serial uint64
Freshness *timestamp.T
}
type Freshes []*Fresh
func (c Freshes) Len() int { return len(c) }
func (c Freshes) Less(i, j int) bool { return c[i].Serial < c[j].Serial }
func (c Freshes) Swap(i, j int) { c[i], c[j] = c[j], c[i] }

View File

@@ -0,0 +1,47 @@
package createdat
import (
"bytes"
"encoding/binary"
. "nostr.mleku.dev"
"nostr.mleku.dev/codec/timestamp"
"ratel.mleku.dev/keys"
"ratel.mleku.dev/keys/serial"
)
const Len = 8
type T struct {
Val *timestamp.T
}
var _ keys.Element = &T{}
func New(c *timestamp.T) (p *T) { return &T{Val: c} }
func (c *T) Write(buf *bytes.Buffer) {
buf.Write(c.Val.Bytes())
}
func (c *T) Read(buf *bytes.Buffer) (el keys.Element) {
b := make([]byte, Len)
if n, err := buf.Read(b); Chk.E(err) || n != Len {
return nil
}
c.Val = timestamp.FromUnix(int64(binary.BigEndian.Uint64(b)))
return c
}
func (c *T) Len() int { return Len }
// FromKey expects to find a datestamp in the 8 bytes before a serial in a key.
func FromKey(k []byte) (p *T) {
if len(k) < Len+serial.Len {
err := Errorf.F("cannot get a serial without at least %d bytes", Len+serial.Len)
panic(err)
}
key := make([]byte, 0, Len)
key = append(key, k[len(k)-Len-serial.Len:len(k)-serial.Len]...)
return &T{Val: timestamp.FromBytes(key)}
}

View File

@@ -0,0 +1,25 @@
package createdat
import (
"bytes"
"math"
"testing"
"lukechampine.com/frand"
"nostr.mleku.dev/codec/timestamp"
)
func TestT(t *testing.T) {
for _ = range 1000000 {
n := timestamp.FromUnix(int64(frand.Intn(math.MaxInt64)))
v := New(n)
buf := new(bytes.Buffer)
v.Write(buf)
buf2 := bytes.NewBuffer(buf.Bytes())
v2 := New(timestamp.New())
el := v2.Read(buf2).(*T)
if el.Val.Int() != n.Int() {
t.Fatalf("expected %d got %d", n.Int(), el.Val.Int())
}
}
}

58
keys/id/id.go Normal file
View File

@@ -0,0 +1,58 @@
package id
import (
"bytes"
"fmt"
. "nostr.mleku.dev"
"ratel.mleku.dev/keys"
"strings"
"nostr.mleku.dev/codec/eventid"
"util.mleku.dev/hex"
)
const Len = 8
type T struct {
Val []byte
}
var _ keys.Element = &T{}
func New(evID ...*eventid.T) (p *T) {
if len(evID) < 1 || len(evID[0].String()) < 1 {
return &T{make([]byte, Len)}
}
evid := evID[0].String()
if len(evid) < 64 {
evid = strings.Repeat("0", 64-len(evid)) + evid
}
if len(evid) > 64 {
evid = evid[:64]
}
b, err := hex.Dec(evid[:Len*2])
if Chk.E(err) {
return
}
return &T{Val: b}
}
func (p *T) Write(buf *bytes.Buffer) {
if len(p.Val) != Len {
panic(fmt.Sprintln("must use New or initialize Val with len", Len))
}
buf.Write(p.Val)
}
func (p *T) Read(buf *bytes.Buffer) (el keys.Element) {
// allow uninitialized struct
if len(p.Val) != Len {
p.Val = make([]byte, Len)
}
if n, err := buf.Read(p.Val); Chk.E(err) || n != Len {
return nil
}
return p
}
func (p *T) Len() int { return Len }

24
keys/id/id_test.go Normal file
View File

@@ -0,0 +1,24 @@
package id
import (
"bytes"
"testing"
sha256 "github.com/minio/sha256-simd"
"lukechampine.com/frand"
"nostr.mleku.dev/codec/eventid"
)
func TestT(t *testing.T) {
fakeIdBytes := frand.Bytes(sha256.Size)
id := eventid.NewWith(fakeIdBytes)
v := New(id)
buf := new(bytes.Buffer)
v.Write(buf)
buf2 := bytes.NewBuffer(buf.Bytes())
v2 := New()
el := v2.Read(buf2).(*T)
if bytes.Compare(el.Val, v.Val) != 0 {
t.Fatalf("expected %x got %x", v.Val, el.Val)
}
}

48
keys/index/index.go Normal file
View File

@@ -0,0 +1,48 @@
package index
import (
"bytes"
"fmt"
. "nostr.mleku.dev"
"ratel.mleku.dev/keys"
)
const Len = 1
type T struct {
Val []byte
}
var _ keys.Element = &T{}
func New[V byte | P | int](code ...V) (p *T) {
var cod []byte
switch len(code) {
case 0:
cod = []byte{0}
default:
cod = []byte{byte(code[0])}
}
return &T{Val: cod}
}
func Empty() (p *T) {
return &T{Val: []byte{0}}
}
func (p *T) Write(buf *bytes.Buffer) {
if len(p.Val) != Len {
panic(fmt.Sprintln("must use New or initialize Val with len", Len))
}
buf.Write(p.Val)
}
func (p *T) Read(buf *bytes.Buffer) (el keys.Element) {
p.Val = make([]byte, Len)
if n, err := buf.Read(p.Val); Chk.E(err) || n != Len {
return nil
}
return p
}
func (p *T) Len() int { return Len }

19
keys/index/index_test.go Normal file
View File

@@ -0,0 +1,19 @@
package index
import (
"bytes"
"testing"
)
func TestT(t *testing.T) {
v := Version.Key()
// v := New(n)
// buf := new(bytes.Buffer)
// v.Write(buf)
buf2 := bytes.NewBuffer(v)
v2 := New(0)
el := v2.Read(buf2).(*T)
if el.Val[0] != v[0] {
t.Fatalf("expected %d got %d", v[0], el.Val)
}
}

141
keys/index/prefixes.go Normal file
View File

@@ -0,0 +1,141 @@
package index
import (
"ratel.mleku.dev/keys"
"ratel.mleku.dev/keys/createdat"
"ratel.mleku.dev/keys/id"
"ratel.mleku.dev/keys/kinder"
"ratel.mleku.dev/keys/pubkey"
"ratel.mleku.dev/keys/serial"
)
type P byte
// Key writes a key with the P prefix byte and an arbitrary list of
// keys.Element.
func (p P) Key(element ...keys.Element) (b []byte) {
b = keys.Write(
append([]keys.Element{New(byte(p))}, element...)...)
// Log.T.F("key %x", b)
return
}
// B returns the index.P as a byte.
func (p P) B() byte { return byte(p) }
// I returns the index.P as an int (for use with the KeySizes.
func (p P) I() int { return int(p) }
// GetAsBytes todo wat is dis?
func GetAsBytes(prf ...P) (b [][]byte) {
b = make([][]byte, len(prf))
for i := range prf {
b[i] = []byte{byte(prf[i])}
}
return
}
const (
// Version is the key that stores the version number, the value is a 16-bit
// integer (2 bytes)
//
// [ 255 ][ 2 byte/16 bit version code ]
Version P = 255
)
const (
// Event is the prefix used with a Serial counter value provided by badgerDB to
// provide conflict-free 8 byte 64-bit unique keys for event records, which
// follows the prefix.
//
// [ 0 ][ 8 bytes Serial ]
Event P = iota
// CreatedAt creates an index key that contains the unix
// timestamp of the event record serial.
//
// [ 1 ][ 8 bytes timestamp.T ][ 8 bytes Serial ]
CreatedAt
// Id contains the first 8 bytes of the ID of the event and the 8
// byte Serial of the event record.
//
// [ 2 ][ 8 bytes eventid.T prefix ][ 8 bytes Serial ]
Id
// Kind contains the kind and datestamp.
//
// [ 3 ][ 2 bytes kind.T ][ 8 bytes timestamp.T ][ 8 bytes Serial ]
Kind
// Pubkey contains pubkey prefix and timestamp.
//
// [ 4 ][ 8 bytes pubkey prefix ][ 8 bytes timestamp.T ][ 8 bytes Serial ]
Pubkey
// PubkeyKind contains pubkey prefix, kind and timestamp.
//
// [ 5 ][ 8 bytes pubkey prefix ][ 2 bytes kind.T ][ 8 bytes timestamp.T ][ 8 bytes Serial ]
PubkeyKind
// Tag is for miscellaneous arbitrary length tags, with timestamp and event
// serial after.
//
// [ 6 ][ tag string 1 <= 100 bytes ][ 8 bytes timestamp.T ][ 8 bytes Serial ]
Tag
// Tag32 contains the 8 byte pubkey prefix, timestamp and serial.
//
// [ 7 ][ 8 bytes pubkey prefix ][ 8 bytes timestamp.T ][ 8 bytes Serial ]
Tag32
// TagAddr contains the kind, pubkey prefix, value (index 2) of address tag (eg
// relay address), followed by timestamp and serial.
//
// [ 8 ][ 2 byte kind.T][ 8 byte pubkey prefix ][ network address ][ 8 byte timestamp.T ][ 8 byte Serial ]
TagAddr
// Counter is the eventid.T prefix, value stores the average time of access
// (average of all access timestamps) and the size of the record.
//
// [ 9 ][ 8 bytes Serial ] : value: [ 8 bytes timestamp ]
Counter
)
// FilterPrefixes is a slice of the prefixes used by filter index to enable a loop
// for pulling events matching a serial
var FilterPrefixes = [][]byte{
{CreatedAt.B()},
{Id.B()},
{Kind.B()},
{Pubkey.B()},
{PubkeyKind.B()},
{Tag.B()},
{Tag32.B()},
{TagAddr.B()},
}
// KeySizes are the byte size of keys of each type of key prefix. int(P) or call the P.I() method
// corresponds to the index 1:1. For future index additions be sure to add the
// relevant KeySizes sum as it describes the data for a programmer.
var KeySizes = []int{
// Event
1 + serial.Len,
// CreatedAt
1 + createdat.Len + serial.Len,
// Id
1 + id.Len + serial.Len,
// Kind
1 + kinder.Len + createdat.Len + serial.Len,
// Pubkey
1 + pubkey.Len + createdat.Len + serial.Len,
// PubkeyKind
1 + pubkey.Len + kinder.Len + createdat.Len + serial.Len,
// Tag (worst case scenario)
1 + 100 + createdat.Len + serial.Len,
// Tag32
1 + pubkey.Len + createdat.Len + serial.Len,
// TagAddr
1 + kinder.Len + pubkey.Len + 100 + createdat.Len + serial.Len,
// Counter
1 + serial.Len,
}

43
keys/keys.go Normal file
View File

@@ -0,0 +1,43 @@
// Package keys is a composable framework for constructing badger keys from
// fields of events.
package keys
import (
"bytes"
)
// Element is an enveloper for a type that can Read and Write its binary form.
type Element interface {
// Write the binary form of the field into the given bytes.Buffer.
Write(buf *bytes.Buffer)
// Read accepts a bytes.Buffer and decodes a field from it.
Read(buf *bytes.Buffer) Element
// Len gives the length of the bytes output by the type.
Len() int
}
// Write the contents of each Element to a byte slice.
func Write(elems ...Element) []byte {
// get the length of the buffer required
var length int
for _, el := range elems {
length += el.Len()
}
buf := bytes.NewBuffer(make([]byte, 0, length))
// write out the data from each element
for _, el := range elems {
el.Write(buf)
}
return buf.Bytes()
}
// Read the contents of a byte slice into the provided list of Element types.
func Read(b []byte, elems ...Element) {
buf := bytes.NewBuffer(b)
for _, el := range elems {
el.Read(buf)
}
}
// Make is a convenience method to wrap a list of Element into a slice.
func Make(elems ...Element) []Element { return elems }

127
keys/keys_test.go Normal file
View File

@@ -0,0 +1,127 @@
// package keys_test needs to be a different package name or the implementation
// types imports will circular
package keys_test
import (
"bytes"
"crypto/sha256"
"testing"
"ratel.mleku.dev/keys"
"ratel.mleku.dev/keys/createdat"
"ratel.mleku.dev/keys/id"
"ratel.mleku.dev/keys/index"
"ratel.mleku.dev/keys/kinder"
"ratel.mleku.dev/keys/pubkey"
"ratel.mleku.dev/keys/serial"
"ec.mleku.dev/v2/schnorr"
"lukechampine.com/frand"
"nostr.mleku.dev/codec/eventid"
"nostr.mleku.dev/codec/kind"
"nostr.mleku.dev/codec/timestamp"
)
func TestElement(t *testing.T) {
for _ = range 100000 {
var failed bool
{ // construct a typical key type of structure
// a prefix
np := index.Version
vp := index.New(byte(np))
// an id
fakeIdBytes := frand.Bytes(sha256.Size)
i := eventid.NewWith(fakeIdBytes)
vid := id.New(i)
// a kinder
n := kind.New(1059)
vk := kinder.New(n.K)
// a pubkey
fakePubkeyBytes := frand.Bytes(schnorr.PubKeyBytesLen)
var vpk *pubkey.T
var err error
vpk, err = pubkey.NewFromBytes(fakePubkeyBytes)
if err != nil {
t.Fatal(err)
}
// a createdat
ts := timestamp.Now()
vca := createdat.New(ts)
// a serial
fakeSerialBytes := frand.Bytes(serial.Len)
vs := serial.New(fakeSerialBytes)
// write Element list into buffer
b := keys.Write(vp, vid, vk, vpk, vca, vs)
// check that values decoded all correctly
// we expect the following types, so we must create them:
var vp2 = index.New(0)
var vid2 = id.New()
var vk2 = kinder.New(0)
var vpk2 *pubkey.T
vpk2, err = pubkey.New()
if err != nil {
t.Fatal(err)
}
var vca2 = createdat.New(timestamp.New())
var vs2 = serial.New(nil)
// read it in
keys.Read(b, vp2, vid2, vk2, vpk2, vca2, vs2)
// this is a lot of tests, so use switch syntax
switch {
case bytes.Compare(vp.Val, vp2.Val) != 0:
t.Logf("failed to decode correctly got %v expected %v", vp2.Val,
vp.Val)
failed = true
fallthrough
case bytes.Compare(vid.Val, vid2.Val) != 0:
t.Logf("failed to decode correctly got %v expected %v", vid2.Val,
vid.Val)
failed = true
fallthrough
case vk.Val.ToU16() != vk2.Val.ToU16():
t.Logf("failed to decode correctly got %v expected %v", vk2.Val,
vk.Val)
failed = true
fallthrough
case !bytes.Equal(vpk.Val, vpk2.Val):
t.Logf("failed to decode correctly got %v expected %v", vpk2.Val,
vpk.Val)
failed = true
fallthrough
case vca.Val.I64() != vca2.Val.I64():
t.Logf("failed to decode correctly got %v expected %v", vca2.Val,
vca.Val)
failed = true
fallthrough
case !bytes.Equal(vs.Val, vs2.Val):
t.Logf("failed to decode correctly got %v expected %v", vpk2.Val,
vpk.Val)
failed = true
}
}
{ // construct a counter value
// a createdat
ts := timestamp.Now()
vca := createdat.New(ts)
// a sizer
// n := uint32(frand.Uint64n(math.MaxUint32))
// write out values
b := keys.Write(vca)
// check that values decoded all correctly
// we expect the following types, so we must create them:
var vca2 = createdat.New(timestamp.New())
// read it in
keys.Read(b, vca2)
// check they match
if vca.Val.I64() != vca2.Val.I64() {
t.Logf("failed to decode correctly got %v expected %v", vca2.Val,
vca.Val)
failed = true
}
}
if failed {
t.FailNow()
}
}
}

43
keys/kinder/kind.go Normal file
View File

@@ -0,0 +1,43 @@
package kinder
import (
"bytes"
"encoding/binary"
. "nostr.mleku.dev"
"ratel.mleku.dev/keys"
"nostr.mleku.dev/codec/kind"
)
const Len = 2
type T struct {
Val *kind.T
}
var _ keys.Element = &T{}
// New creates a new kinder.T for reading/writing kind.T values.
func New[V uint16 | int](c V) (p *T) { return &T{Val: kind.New(c)} }
func Make(c *kind.T) (v []byte) {
v = make([]byte, Len)
binary.BigEndian.PutUint16(v, c.K)
return
}
func (c *T) Write(buf *bytes.Buffer) {
buf.Write(Make(c.Val))
}
func (c *T) Read(buf *bytes.Buffer) (el keys.Element) {
b := make([]byte, Len)
if n, err := buf.Read(b); Chk.E(err) || n != Len {
return nil
}
v := binary.BigEndian.Uint16(b)
c.Val = kind.New(v)
return c
}
func (c *T) Len() int { return Len }

21
keys/kinder/kind_test.go Normal file
View File

@@ -0,0 +1,21 @@
package kinder
import (
"bytes"
"testing"
"nostr.mleku.dev/codec/kind"
)
func TestT(t *testing.T) {
n := kind.New(1059)
v := New(n.ToU16())
buf := new(bytes.Buffer)
v.Write(buf)
buf2 := bytes.NewBuffer(buf.Bytes())
v2 := New(0)
el := v2.Read(buf2).(*T)
if el.Val.ToU16() != n.ToU16() {
t.Fatalf("expected %d got %d", n, el.Val)
}
}

75
keys/pubkey/pubkey.go Normal file
View File

@@ -0,0 +1,75 @@
package pubkey
import (
"bytes"
"fmt"
. "nostr.mleku.dev"
"ratel.mleku.dev/keys"
"ec.mleku.dev/v2/schnorr"
)
const Len = 8
type T struct {
Val []byte
}
var _ keys.Element = &T{}
// New creates a new pubkey prefix, if parameter is omitted, new one is
// allocated (for read) if more than one is given, only the first is used, and
// if the first one is not the correct hexadecimal length of 64, return error.
func New(pk ...B) (p *T, err E) {
if len(pk) < 1 {
// allows init with no parameter
return &T{make([]byte, Len)}, nil
}
// // only the first pubkey will be used
// if len(pk[0]) != schnorr.PubKeyBytesLen*2 {
// err = Log.E.Err("pubkey hex must be 64 chars, got", len(pk[0]))
// return
// }
// b, err := hex.Dec(pk[0][:Len*2])
// if Chk.E(err) {
// return
// }
return &T{Val: pk[0][:Len]}, nil
}
func NewFromBytes(pkb []byte) (p *T, err error) {
if len(pkb) != schnorr.PubKeyBytesLen {
err = Log.E.Err("provided key not correct length, got %d expected %d",
len(pkb), schnorr.PubKeyBytesLen)
Log.T.S(pkb)
return
}
b := make([]byte, Len)
copy(b, pkb[:Len])
p = &T{Val: b}
return
}
func (p *T) Write(buf *bytes.Buffer) {
if p == nil {
panic("nil pubkey")
}
if p.Val == nil || len(p.Val) != Len {
panic(fmt.Sprintln("must use New or initialize Val with len", Len))
}
buf.Write(p.Val)
}
func (p *T) Read(buf *bytes.Buffer) (el keys.Element) {
// allow uninitialized struct
if len(p.Val) != Len {
p.Val = make([]byte, Len)
}
if n, err := buf.Read(p.Val); Chk.E(err) || n != Len {
return nil
}
return p
}
func (p *T) Len() int { return Len }

View File

@@ -0,0 +1,28 @@
package pubkey
import (
"bytes"
. "nostr.mleku.dev"
"testing"
"ec.mleku.dev/v2/schnorr"
"lukechampine.com/frand"
)
func TestT(t *testing.T) {
for _ = range 10000000 {
fakePubkeyBytes := frand.Bytes(schnorr.PubKeyBytesLen)
v, err := New(fakePubkeyBytes)
if Chk.E(err) {
t.FailNow()
}
buf := new(bytes.Buffer)
v.Write(buf)
buf2 := bytes.NewBuffer(buf.Bytes())
v2, _ := New()
el := v2.Read(buf2).(*T)
if bytes.Compare(el.Val, v.Val) != 0 {
t.Fatalf("expected %x got %x", v.Val, el.Val)
}
}
}

81
keys/serial/serial.go Normal file
View File

@@ -0,0 +1,81 @@
package serial
import (
"bytes"
"encoding/binary"
"fmt"
. "nostr.mleku.dev"
"ratel.mleku.dev/keys"
)
const Len = 8
// T is a badger DB serial number used for conflict free event record keys.
type T struct {
Val []byte
}
var _ keys.Element = &T{}
// New returns a new serial record key.Element - if nil or short slice is given,
// initialize a fresh one with Len (for reading), otherwise if equal or longer,
// trim if long and store into struct (for writing).
func New(ser []byte) (p *T) {
switch {
case len(ser) < Len:
// Log.I.Ln("empty serial")
// allows use of nil to init
ser = make([]byte, Len)
default:
ser = ser[:Len]
}
return &T{Val: ser}
}
// FromKey expects the last Len bytes of the given slice to be the serial.
func FromKey(k []byte) (p *T) {
if len(k) < Len {
panic(fmt.Sprintf("cannot get a serial without at least 8 bytes %x", k))
}
key := make([]byte, Len)
copy(key, k[len(k)-Len:])
return &T{Val: key}
}
func Make(s uint64) (ser []byte) {
ser = make([]byte, 8)
binary.BigEndian.PutUint64(ser, s)
return
}
func (p *T) Write(buf *bytes.Buffer) {
if len(p.Val) != Len {
panic(fmt.Sprintln("must use New or initialize Val with len", Len))
}
buf.Write(p.Val)
}
func (p *T) Read(buf *bytes.Buffer) (el keys.Element) {
// allow uninitialized struct
if len(p.Val) != Len {
p.Val = make([]byte, Len)
}
if n, err := buf.Read(p.Val); Chk.E(err) || n != Len {
return nil
}
return p
}
func (p *T) Len() int { return Len }
func (p *T) Uint64() (u uint64) { return binary.BigEndian.Uint64(p.Val) }
// Match compares a key bytes to a serial, all indexes have the serial at
// the end indicating the event record they refer to, and if they match returns
// true.
func Match(index, ser []byte) bool {
l := len(index)
if l < Len {
return false
}
return bytes.Compare(index[l-Len:], ser) == 0
}

View File

@@ -0,0 +1,22 @@
package serial_test
import (
"bytes"
"ratel.mleku.dev/keys/serial"
"testing"
"lukechampine.com/frand"
)
func TestT(t *testing.T) {
fakeSerialBytes := frand.Bytes(serial.Len)
v := serial.New(fakeSerialBytes)
buf := new(bytes.Buffer)
v.Write(buf)
buf2 := bytes.NewBuffer(buf.Bytes())
v2 := &serial.T{} // or can use New(nil)
el := v2.Read(buf2).(*serial.T)
if bytes.Compare(el.Val, v.Val) != 0 {
t.Fatalf("expected %x got %x", v.Val, el.Val)
}
}

64
log.go Normal file
View File

@@ -0,0 +1,64 @@
package ratel
import (
"fmt"
"runtime"
"strings"
. "nostr.mleku.dev"
"util.mleku.dev/atomic"
"util.mleku.dev/lol"
)
func NewLogger(logLevel int, label string) (l *logger) {
Log.T.Ln("getting logger for", label)
l = &logger{Label: label}
l.Level.Store(int32(logLevel))
return
}
type logger struct {
Level atomic.Int32
Label string
}
func (l *logger) SetLogLevel(level int) {
l.Level.Store(int32(level))
}
func (l *logger) Errorf(s string, i ...interface{}) {
if l.Level.Load() >= lol.Error {
s = l.Label + ": " + s
txt := fmt.Sprintf(s, i...)
_, file, line, _ := runtime.Caller(2)
Log.E.F("%s %s:%d", strings.TrimSpace(txt), file, line)
}
}
func (l *logger) Warningf(s string, i ...interface{}) {
if l.Level.Load() >= lol.Warn {
s = l.Label + ": " + s
txt := fmt.Sprintf(s, i...)
_, file, line, _ := runtime.Caller(2)
Log.W.F("%s %s:%d", strings.TrimSpace(txt), file, line)
}
}
func (l *logger) Infof(s string, i ...interface{}) {
if l.Level.Load() >= lol.Info {
s = l.Label + ": " + s
txt := fmt.Sprintf(s, i...)
_, file, line, _ := runtime.Caller(2)
Log.T.F("%s %s:%d", strings.TrimSpace(txt), file, line)
}
}
func (l *logger) Debugf(s string, i ...interface{}) {
if l.Level.Load() >= lol.Debug {
s = l.Label + ": " + s
txt := fmt.Sprintf(s, i...)
_, file, line, _ := runtime.Caller(2)
Log.T.F("%s %s:%d", strings.TrimSpace(txt), file, line)
}
}

125
main.go Normal file
View File

@@ -0,0 +1,125 @@
package ratel
import (
"encoding/binary"
"sync"
"time"
"eventstore.mleku.dev"
"github.com/dgraph-io/badger/v4"
. "nostr.mleku.dev"
"ratel.mleku.dev/keys/index"
"ratel.mleku.dev/keys/serial"
"util.mleku.dev/context"
)
type T struct {
Ctx context.T
WG *sync.WaitGroup
dataDir string
// DBSizeLimit is the number of bytes we want to keep the data store from exceeding.
DBSizeLimit int
// DBLowWater is the percentage of DBSizeLimit a GC run will reduce the used storage down
// to.
DBLowWater int
// DBHighWater is the trigger point at which a GC run should start if exceeded.
DBHighWater int
// GCFrequency is the frequency of checks of the current utilisation.
GCFrequency time.Duration
HasL2 bool
BlockCacheSize int
InitLogLevel int
Logger *logger
// DB is the badger db enveloper
*badger.DB
// seq is the monotonic collision free index for raw event storage.
seq *badger.Sequence
// Threads is how many CPU threads we dedicate to concurrent actions, flatten and GC mark
Threads int
// MaxLimit is a default limit that applies to a query without a limit, to avoid sending out
// too many events to a client from a malformed or excessively broad filter.
MaxLimit int
// ActuallyDelete sets whether we actually delete or rewrite deleted entries with a modified
// deleted prefix value (8th bit set)
ActuallyDelete bool
}
var _ eventstore.I = (*T)(nil)
// GetBackend returns a reasonably configured badger.Backend.
//
// The variadic params correspond to DBSizeLimit, DBLowWater, DBHighWater and
// GCFrequency as an integer multiplier of number of seconds.
//
// Note that the cancel function for the context needs to be managed by the
// caller.
func GetBackend(Ctx context.T, WG *sync.WaitGroup, path S, hasL2 bool,
blockCacheSize, logLevel,
maxLimit int, params ...int) (b *T) {
var sizeLimit, lw, hw, freq = 0, 86, 92, 60
switch len(params) {
case 4:
freq = params[3]
fallthrough
case 3:
hw = params[2]
fallthrough
case 2:
lw = params[1]
fallthrough
case 1:
sizeLimit = params[0]
}
// if unset, assume a safe maximum limit for unlimited filters.
if maxLimit == 0 {
maxLimit = 512
}
b = &T{
Ctx: Ctx,
WG: WG,
DBSizeLimit: sizeLimit,
DBLowWater: lw,
DBHighWater: hw,
GCFrequency: time.Duration(freq) * time.Second,
HasL2: hasL2,
BlockCacheSize: blockCacheSize,
InitLogLevel: logLevel,
MaxLimit: maxLimit,
dataDir: path,
}
return
}
func (r *T) Path() S { return r.dataDir }
// SerialKey returns a key used for storing events, and the raw serial counter
// bytes to copy into index keys.
func (r *T) SerialKey() (idx []byte, ser *serial.T) {
var err error
var s []byte
if s, err = r.SerialBytes(); Chk.E(err) {
panic(err)
}
ser = serial.New(s)
return index.Event.Key(ser), ser
}
// Serial returns the next monotonic conflict free unique serial on the database.
func (r *T) Serial() (ser uint64, err error) {
if ser, err = r.seq.Next(); Chk.E(err) {
}
// Log.T.F("serial %x", ser)
return
}
// SerialBytes returns a new serial value, used to store an event record with a
// conflict-free unique code (it is a monotonic, atomic, ascending counter).
func (r *T) SerialBytes() (ser []byte, err error) {
var serU64 uint64
if serU64, err = r.Serial(); Chk.E(err) {
panic(err)
}
ser = make([]byte, serial.Len)
binary.BigEndian.PutUint64(ser, serU64)
return
}

28
nuke.go Normal file
View File

@@ -0,0 +1,28 @@
package ratel
import (
. "nostr.mleku.dev"
"ratel.mleku.dev/keys/index"
)
func (r *T) Nuke() (err E) {
Log.W.F("nukening database at %s", r.dataDir)
if err = r.DB.DropPrefix([][]byte{
{index.Event.B()},
{index.CreatedAt.B()},
{index.Id.B()},
{index.Kind.B()},
{index.Pubkey.B()},
{index.PubkeyKind.B()},
{index.Tag.B()},
{index.Tag32.B()},
{index.TagAddr.B()},
{index.Counter.B()},
}...); Chk.E(err) {
return
}
if err = r.DB.RunValueLogGC(0.8); Chk.E(err) {
return
}
return
}

184
preparequeries.go Normal file
View File

@@ -0,0 +1,184 @@
package ratel
import (
"encoding/binary"
"fmt"
"math"
. "nostr.mleku.dev"
"ratel.mleku.dev/keys/id"
"ratel.mleku.dev/keys/index"
"ratel.mleku.dev/keys/kinder"
"ratel.mleku.dev/keys/pubkey"
"ratel.mleku.dev/keys/serial"
"nostr.mleku.dev/codec/event"
"nostr.mleku.dev/codec/eventid"
"nostr.mleku.dev/codec/filter"
"nostr.mleku.dev/codec/timestamp"
)
type Results struct {
Ev *event.T
TS *timestamp.T
Ser *serial.T
}
type query struct {
index int
queryFilter *filter.T
searchPrefix []byte
start []byte
skipTS bool
}
// PrepareQueries analyses a filter and generates a set of query specs that produce
// key prefixes to search for in the badger key indexes.
func PrepareQueries(f *filter.T) (
qs []query,
ext *filter.T,
since uint64,
err error,
) {
if f == nil {
err = Errorf.E("filter cannot be nil")
}
switch {
// first if there is IDs, just search for them, this overrides all other filters
case len(f.IDs.Field) > 0:
qs = make([]query, f.IDs.Len())
for i, idHex := range f.IDs.Field {
ih := id.New(eventid.NewWith(B(idHex)))
if ih == nil {
Log.E.F("failed to decode event ID: %s", idHex)
// just ignore it, clients will be clients
continue
}
prf := index.Id.Key(ih)
// Log.T.F("id prefix to search on %0x from key %0x", prf, ih.Val)
qs[i] = query{
index: i,
queryFilter: f,
searchPrefix: prf,
skipTS: true,
}
}
// Log.T.S("ids", qs)
// second we make a set of queries based on author pubkeys, optionally with kinds
case f.Authors.Len() > 0:
// if there is no kinds, we just make the queries based on the author pub keys
if f.Kinds.Len() == 0 {
qs = make([]query, f.Authors.Len())
for i, pubkeyHex := range f.Authors.Field {
var pk *pubkey.T
if pk, err = pubkey.New(pubkeyHex); Chk.E(err) {
// bogus filter, continue anyway
continue
}
sp := index.Pubkey.Key(pk)
// Log.I.F("search only for authors %0x from pub key %0x", sp, pk.Val)
qs[i] = query{
index: i,
queryFilter: f,
searchPrefix: sp,
}
}
// Log.I.S("authors", qs)
} else {
// if there is kinds as well, we are searching via the kind/pubkey prefixes
qs = make([]query, f.Authors.Len()*f.Kinds.Len())
i := 0
authors:
for _, pubkeyHex := range f.Authors.Field {
for _, kind := range f.Kinds.K {
var pk *pubkey.T
if pk, err = pubkey.New(pubkeyHex); Chk.E(err) {
// skip this dodgy thing
continue authors
}
ki := kinder.New(kind.K)
sp := index.PubkeyKind.Key(pk, ki)
// Log.T.F("search for authors from pub key %0x and kind %0x", pk.Val, ki.Val)
qs[i] = query{index: i, queryFilter: f, searchPrefix: sp}
i++
}
}
// Log.T.S("authors/kinds", qs)
}
if f.Tags != nil && f.Tags.T != nil || f.Tags.Len() > 0 {
ext = &filter.T{Tags: f.Tags}
// Log.T.S("extra filter", ext)
}
case f.Tags.Len() > 0:
// determine the size of the queries array by inspecting all tags sizes
size := 0
for _, values := range f.Tags.T {
size += values.Len() - 1
}
if size == 0 {
return nil, nil, 0, fmt.Errorf("empty tag filters")
}
// we need a query for each tag search
qs = make([]query, size)
// and any kinds mentioned as well in extra filter
ext = &filter.T{Kinds: f.Kinds}
i := 0
Log.T.S(f.Tags.T)
for _, values := range f.Tags.T {
Log.T.S(values.Field)
for _, value := range values.Field[1:] {
// get key prefix (with full length) and offset where to write the last parts
var prf []byte
if prf, err = GetTagKeyPrefix(S(value)); Chk.E(err) {
continue
}
// remove the last part to get just the prefix we want here
Log.T.F("search for tags from %0x", prf)
qs[i] = query{index: i, queryFilter: f, searchPrefix: prf}
i++
}
}
// Log.T.S("tags", qs)
case f.Kinds.Len() > 0:
// if there is no ids, pubs or tags, we are just searching for kinds
qs = make([]query, f.Kinds.Len())
for i, kind := range f.Kinds.K {
kk := kinder.New(kind.K)
ki := index.Kind.Key(kk)
qs[i] = query{
index: i,
queryFilter: f,
searchPrefix: ki,
}
}
// Log.T.S("kinds", qs)
default:
if len(qs) > 0 {
qs[0] = query{index: 0, queryFilter: f,
searchPrefix: index.CreatedAt.Key()}
ext = nil
}
// Log.T.S("other", qs)
}
var until uint64 = math.MaxUint64
if f.Until != nil {
if fu := uint64(*f.Until); fu < until {
until = fu - 1
}
}
for i, q := range qs {
qs[i].start = binary.BigEndian.AppendUint64(q.searchPrefix, until)
}
// this is where we'll end the iteration
if f.Since != nil {
if fs := uint64(*f.Since); fs > since {
since = fs
}
}
// if we got an empty filter, we still need a query for scraping the newest
if len(qs) == 0 {
qs = append(qs, query{index: 0, queryFilter: f, searchPrefix: B{1},
start: B{1, 255, 255, 255, 255, 255, 255, 255, 255}})
}
return
}

159
queryevents.go Normal file
View File

@@ -0,0 +1,159 @@
package ratel
import (
"github.com/dgraph-io/badger/v4"
sha256 "github.com/minio/sha256-simd"
. "nostr.mleku.dev"
"nostr.mleku.dev/codec/event"
"nostr.mleku.dev/codec/filter"
"nostr.mleku.dev/codec/tag"
"ratel.mleku.dev/keys/createdat"
"ratel.mleku.dev/keys/index"
"ratel.mleku.dev/keys/serial"
)
func (r *T) QueryEvents(c Ctx, f *filter.T) (evs []*event.T, err E) {
Log.T.F("query for events\n%s", f)
var queries []query
var extraFilter *filter.T
var since uint64
if queries, extraFilter, since, err = PrepareQueries(f); Chk.E(err) {
return
}
var limit bool
if f.Limit != 0 {
Log.T.S("query has a limit")
limit = true
}
Log.T.S(queries, extraFilter)
// search for the keys generated from the filter
var eventKeys [][]byte
for _, q := range queries {
Log.T.S(q, extraFilter)
err = r.View(func(txn *badger.Txn) (err E) {
// iterate only through keys and in reverse order
opts := badger.IteratorOptions{
Reverse: true,
}
it := txn.NewIterator(opts)
defer it.Close()
// for it.Rewind(); it.Valid(); it.Next() {
for it.Seek(q.start); it.ValidForPrefix(q.searchPrefix); it.Next() {
item := it.Item()
k := item.KeyCopy(nil)
Log.T.S(k)
if !q.skipTS {
if len(k) < createdat.Len+serial.Len {
continue
}
createdAt := createdat.FromKey(k)
Log.T.F("%d < %d", createdAt.Val.U64(), since)
if createdAt.Val.U64() < since {
break
}
}
ser := serial.FromKey(k)
eventKeys = append(eventKeys, index.Event.Key(ser))
}
return
})
if Chk.E(err) {
// this can't actually happen because the View function above does not set err.
}
search:
for _, eventKey := range eventKeys {
// Log.I.S(eventKey)
var v B
err = r.View(func(txn *badger.Txn) (err E) {
opts := badger.IteratorOptions{Reverse: true}
it := txn.NewIterator(opts)
defer it.Close()
// for it.Rewind(); it.Valid(); it.Next() {
for it.Seek(eventKey); it.ValidForPrefix(eventKey); it.Next() {
item := it.Item()
k := item.KeyCopy(nil)
Log.T.S(k)
if v, err = item.ValueCopy(nil); Chk.E(err) {
continue
}
if r.HasL2 && len(v) == sha256.Size {
// this is a stub entry that indicates an L2 needs to be accessed for it, so
// we populate only the event.T.ID and return the result, the caller will
// expect this as a signal to query the L2 event store.
ev := &event.T{}
Log.T.F("found event stub %0x must seek in L2", v)
ev.ID = v
select {
case <-c.Done():
return
case <-r.Ctx.Done():
Log.T.Ln("backend context canceled")
return
default:
}
evs = append(evs, ev)
return
}
}
return
})
if v == nil {
continue
}
ev := &event.T{}
var rem B
if rem, err = ev.UnmarshalBinary(v); Chk.E(err) {
return
}
Log.T.S(ev)
if len(rem) > 0 {
Log.T.S(rem)
}
// check if this matches the other filters that were not part of the index.
if extraFilter == nil || extraFilter.Matches(ev) {
// check if this event is replaced by one we already have in the result.
if ev.Kind.IsReplaceable() {
for _, evc := range evs {
// replaceable means there should be only the newest for the pubkey and
// kind.
if Equals(ev.PubKey, evc.PubKey) && ev.Kind.Equal(evc.Kind) {
// we won't add it to the results slice
continue search
}
}
}
if ev.Kind.IsParameterizedReplaceable() &&
ev.Tags.GetFirst(tag.New("d")) != nil {
for _, evc := range evs {
// parameterized replaceable means there should only be the newest for a
// pubkey, kind and the value field of the `d` tag.
if ev.Kind.Equal(evc.Kind) && Equals(ev.PubKey, evc.PubKey) &&
Equals(ev.Tags.GetFirst(tag.New("d")).Value(),
ev.Tags.GetFirst(tag.New("d")).Value()) {
// we won't add it to the results slice
continue search
}
}
}
Log.T.F("sending back result\n%s\n", ev)
evs = append(evs, ev)
if limit {
f.Limit--
if f.Limit == 0 {
return
}
} else {
// if there is no limit, cap it at the MaxLimit, assume this was the intent
// or the client is erroneous, if any limit greater is requested this will
// be used instead as the previous clause.
if len(evs) > r.MaxLimit {
return
}
}
}
}
}
Log.T.S(evs)
Log.T.Ln("query complete")
return
}

130
saveevent.go Normal file
View File

@@ -0,0 +1,130 @@
package ratel
import (
"fmt"
"eventstore.mleku.dev"
"github.com/dgraph-io/badger/v4"
sha256 "github.com/minio/sha256-simd"
. "nostr.mleku.dev"
"nostr.mleku.dev/codec/event"
"nostr.mleku.dev/codec/eventid"
"nostr.mleku.dev/codec/timestamp"
"ratel.mleku.dev/keys"
"ratel.mleku.dev/keys/createdat"
"ratel.mleku.dev/keys/id"
"ratel.mleku.dev/keys/index"
"ratel.mleku.dev/keys/serial"
)
func (r *T) SaveEvent(c Ctx, ev *event.T) (err E) {
if ev.Kind.IsEphemeral() {
Log.T.F("not saving ephemeral event\n%s", ev.Serialize())
// send it out
return
}
Log.T.C(func() S {
evs, _ := ev.MarshalJSON(nil)
return fmt.Sprintf("saving event\n%d %s", len(evs), evs)
})
// make sure Close waits for this to complete
r.WG.Add(1)
defer r.WG.Done()
// first, search to see if the event ID already exists.
var foundSerial []byte
seri := serial.New(nil)
err = r.View(func(txn *badger.Txn) (err error) {
// query event by id to ensure we don't try to save duplicates
prf := index.Id.Key(id.New(eventid.NewWith(ev.ID)))
it := txn.NewIterator(badger.IteratorOptions{})
defer it.Close()
it.Seek(prf)
if it.ValidForPrefix(prf) {
var k []byte
// get the serial
k = it.Item().Key()
// copy serial out
keys.Read(k, index.Empty(), id.New(eventid.New()), seri)
// save into foundSerial
foundSerial = seri.Val
}
return
})
if Chk.E(err) {
return
}
if foundSerial != nil {
Log.T.Ln("found possible duplicate or stub for %s", ev)
err = r.Update(func(txn *badger.Txn) (err error) {
// retrieve the event record
evKey := keys.Write(index.New(index.Event), seri)
it := txn.NewIterator(badger.IteratorOptions{})
defer it.Close()
it.Seek(evKey)
if it.ValidForPrefix(evKey) {
if it.Item().ValueSize() != sha256.Size {
// not a stub, we already have it
Log.T.Ln("duplicate event", ev.ID)
return eventstore.ErrDupEvent
}
// we only need to restore the event binary and write the access counter key
// encode to binary
var bin B
if bin, err = ev.MarshalBinary(bin); Chk.E(err) {
return
}
if err = txn.Set(it.Item().Key(), bin); Chk.E(err) {
return
}
// bump counter key
counterKey := GetCounterKey(seri)
val := keys.Write(createdat.New(timestamp.Now()))
if err = txn.Set(counterKey, val); Chk.E(err) {
return
}
return
}
return
})
// if it was a dupe, we are done.
if err != nil {
return
}
return
}
var bin B
if bin, err = ev.MarshalBinary(bin); Chk.E(err) {
return
}
Log.T.F("saving event to badger %s", ev)
// otherwise, save new event record.
if err = r.Update(func(txn *badger.Txn) (err error) {
var idx []byte
var ser *serial.T
idx, ser = r.SerialKey()
// encode to binary
// raw event store
if err = txn.Set(idx, bin); Chk.E(err) {
return
}
// add the indexes
var indexKeys [][]byte
indexKeys = GetIndexKeysForEvent(ev, ser)
for _, k := range indexKeys {
if err = txn.Set(k, nil); Chk.E(err) {
return
}
}
// initialise access counter key
counterKey := GetCounterKey(ser)
val := keys.Write(createdat.New(timestamp.Now()))
if err = txn.Set(counterKey, val); Chk.E(err) {
return
}
Log.T.F("event saved %0x %s", ev.ID, r.dataDir)
return
}); Chk.E(err) {
return
}
return
}