add nuke method, integrate counter index, add full-id index
it is now simple to upgrade to the new version, export the db, then nuke, and import again and voila, all indexes now are generated
This commit is contained in:
@@ -26,13 +26,14 @@ func (r *T) CountEvents(c context.T, f *filter.T) (count int, approx bool, err e
|
||||
if queries, extraFilter, since, err = PrepareQueries(f); chk.E(err) {
|
||||
return
|
||||
}
|
||||
// search for the keys generated from the filter
|
||||
var delEvs [][]byte
|
||||
defer func() {
|
||||
// after the count delete any events that are expired as per NIP-40
|
||||
for _, d := range delEvs {
|
||||
chk.E(r.DeleteEvent(r.Ctx, eventid.NewWith(d)))
|
||||
}
|
||||
}()
|
||||
// search for the keys generated from the filter
|
||||
for _, q := range queries {
|
||||
select {
|
||||
case <-c.Done():
|
||||
|
||||
@@ -44,7 +44,7 @@ func (r *T) DeleteEvent(c context.T, eid *eventid.T, noTombstone ...bool) (err e
|
||||
}
|
||||
var indexKeys [][]byte
|
||||
ev := event.New()
|
||||
var evKey, evb, counterKey, tombstoneKey []byte
|
||||
var evKey, evb, tombstoneKey []byte
|
||||
// fetch the event to get its index keys
|
||||
err = r.View(func(txn *badger.Txn) (err error) {
|
||||
// retrieve the event record
|
||||
@@ -66,9 +66,8 @@ func (r *T) DeleteEvent(c context.T, eid *eventid.T, noTombstone ...bool) (err e
|
||||
}
|
||||
// log.I.S(rem, ev, seri)
|
||||
indexKeys = GetIndexKeysForEvent(ev, seri)
|
||||
counterKey = GetCounterKey(seri)
|
||||
// we don't make tombstones for replacements, but it is better to shift that
|
||||
// logic outside of this method.
|
||||
// logic outside of this closure.
|
||||
if len(noTombstone) > 0 && !noTombstone[0] {
|
||||
ts := tombstone.NewWith(ev.EventID())
|
||||
tombstoneKey = prefixes.Tombstone.Key(ts, createdat.New(timestamp.Now()))
|
||||
@@ -87,9 +86,6 @@ func (r *T) DeleteEvent(c context.T, eid *eventid.T, noTombstone ...bool) (err e
|
||||
if err = txn.Delete(key); chk.E(err) {
|
||||
}
|
||||
}
|
||||
if err = txn.Delete(counterKey); chk.E(err) {
|
||||
return
|
||||
}
|
||||
if len(tombstoneKey) > 0 {
|
||||
// write tombstone
|
||||
log.W.F("writing tombstone %0x for event %0x", tombstoneKey, ev.ID)
|
||||
|
||||
@@ -127,6 +127,9 @@ func (r *T) Export(c context.T, w io.Writer, pubkeys ...[]byte) {
|
||||
k := item.KeyCopy(nil)
|
||||
evKey := prefixes.Event.Key(serial.FromKey(k))
|
||||
counter++
|
||||
if counter%1000 == 0 && counter > 0 {
|
||||
log.I.F("%d events exported", counter)
|
||||
}
|
||||
keyChan <- evKey
|
||||
}
|
||||
return
|
||||
@@ -181,6 +184,9 @@ func (r *T) Export(c context.T, w io.Writer, pubkeys ...[]byte) {
|
||||
}
|
||||
}
|
||||
counter++
|
||||
if counter%1000 == 0 && counter > 0 {
|
||||
log.I.F("%d events exported", counter)
|
||||
}
|
||||
}
|
||||
return
|
||||
})
|
||||
|
||||
@@ -80,10 +80,10 @@ func (r *T) FetchByIds(c context.T, ids [][]byte) (evs event.Ts, err error) {
|
||||
return
|
||||
default:
|
||||
}
|
||||
// if events were found that should be deleted, delete them
|
||||
var delEvs [][]byte
|
||||
defer func() {
|
||||
for _, d := range delEvs {
|
||||
// after the count delete any events that are expired as per NIP-40
|
||||
chk.E(r.DeleteEvent(r.Ctx, eventid.NewWith(d)))
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -5,7 +5,9 @@ import (
|
||||
"realy.lol/ratel/prefixes"
|
||||
)
|
||||
|
||||
// GetCounterKey returns the proper counter key for a given event ID.
|
||||
// GetCounterKey returns the proper counter key for a given event ID. This needs
|
||||
// a separate function because of what it does, but is generated in the general
|
||||
// GetIndexKeysForEvent function.
|
||||
func GetCounterKey(ser *serial.T) (key []byte) {
|
||||
key = prefixes.Counter.Key(ser)
|
||||
// log.T.F("counter key %d %d", index.Counter, ser.Uint64())
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"realy.lol/eventid"
|
||||
"realy.lol/ratel/keys"
|
||||
"realy.lol/ratel/keys/createdat"
|
||||
"realy.lol/ratel/keys/fullid"
|
||||
"realy.lol/ratel/keys/id"
|
||||
"realy.lol/ratel/keys/index"
|
||||
"realy.lol/ratel/keys/kinder"
|
||||
@@ -27,6 +28,7 @@ func GetIndexKeysForEvent(ev *event.T, ser *serial.T) (keyz [][]byte) {
|
||||
CA := createdat.New(ev.CreatedAt)
|
||||
K := kinder.New(ev.Kind.ToU16())
|
||||
PK, _ := pubkey.New(ev.PubKey)
|
||||
FID := fullid.New(eventid.NewWith(ev.ID))
|
||||
// indexes
|
||||
{ // ~ by id
|
||||
k := prefixes.Id.Key(ID, ser)
|
||||
@@ -92,5 +94,14 @@ func GetIndexKeysForEvent(ev *event.T, ser *serial.T) (keyz [][]byte) {
|
||||
// log.T.F("date key: %x %0x %0x", k[0], k[1:9], k[9:])
|
||||
keyz = append(keyz, k)
|
||||
}
|
||||
{ // Counter index - for storing last access time of events.
|
||||
k := GetCounterKey(ser)
|
||||
keyz = append(keyz, k)
|
||||
}
|
||||
{ // - full ID index - enabling retrieving the event ID without unmarshalling the data
|
||||
k := prefixes.FullIdIndex.Key(FID, CA, ser)
|
||||
// log.T.F("full id: %x %0x %0x", k[0], k[1:9], k[9:])
|
||||
keyz = append(keyz, k)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
59
ratel/keys/fullid/fullid.go
Normal file
59
ratel/keys/fullid/fullid.go
Normal file
@@ -0,0 +1,59 @@
|
||||
package fullid
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"realy.lol/ratel/keys"
|
||||
"realy.lol/sha256"
|
||||
|
||||
"realy.lol/eventid"
|
||||
"realy.lol/hex"
|
||||
)
|
||||
|
||||
const Len = sha256.Size
|
||||
|
||||
type T struct {
|
||||
Val []byte
|
||||
}
|
||||
|
||||
var _ keys.Element = &T{}
|
||||
|
||||
func New(evID ...*eventid.T) (p *T) {
|
||||
if len(evID) < 1 || len(evID[0].String()) < 1 {
|
||||
return &T{make([]byte, Len)}
|
||||
}
|
||||
evid := evID[0].String()
|
||||
if len(evid) < 64 {
|
||||
evid = strings.Repeat("0", 64-len(evid)) + evid
|
||||
}
|
||||
if len(evid) > 64 {
|
||||
evid = evid[:64]
|
||||
}
|
||||
b, err := hex.Dec(evid[:Len*2])
|
||||
if chk.E(err) {
|
||||
return
|
||||
}
|
||||
return &T{Val: b}
|
||||
}
|
||||
|
||||
func (p *T) Write(buf *bytes.Buffer) {
|
||||
if len(p.Val) != Len {
|
||||
panic(fmt.Sprintln("must use New or initialize Val with len", Len))
|
||||
}
|
||||
buf.Write(p.Val)
|
||||
}
|
||||
|
||||
func (p *T) Read(buf *bytes.Buffer) (el keys.Element) {
|
||||
// allow uninitialized struct
|
||||
if len(p.Val) != Len {
|
||||
p.Val = make([]byte, Len)
|
||||
}
|
||||
if n, err := buf.Read(p.Val); chk.E(err) || n != Len {
|
||||
return nil
|
||||
}
|
||||
return p
|
||||
}
|
||||
|
||||
func (p *T) Len() int { return Len }
|
||||
25
ratel/keys/fullid/fullid_test.go
Normal file
25
ratel/keys/fullid/fullid_test.go
Normal file
@@ -0,0 +1,25 @@
|
||||
package fullid
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"testing"
|
||||
|
||||
"lukechampine.com/frand"
|
||||
|
||||
"realy.lol/eventid"
|
||||
"realy.lol/sha256"
|
||||
)
|
||||
|
||||
func TestT(t *testing.T) {
|
||||
fakeIdBytes := frand.Bytes(sha256.Size)
|
||||
id := eventid.NewWith(fakeIdBytes)
|
||||
v := New(id)
|
||||
buf := new(bytes.Buffer)
|
||||
v.Write(buf)
|
||||
buf2 := bytes.NewBuffer(buf.Bytes())
|
||||
v2 := New()
|
||||
el := v2.Read(buf2).(*T)
|
||||
if bytes.Compare(el.Val, v.Val) != 0 {
|
||||
t.Fatalf("expected %x got %x", v.Val, el.Val)
|
||||
}
|
||||
}
|
||||
9
ratel/keys/fullid/util.go
Normal file
9
ratel/keys/fullid/util.go
Normal file
@@ -0,0 +1,9 @@
|
||||
package fullid
|
||||
|
||||
import (
|
||||
"realy.lol/lol"
|
||||
)
|
||||
|
||||
var (
|
||||
log, chk, errorf = lol.Main.Log, lol.Main.Check, lol.Main.Errorf
|
||||
)
|
||||
@@ -17,6 +17,8 @@ func (r *T) Nuke() (err error) {
|
||||
{prefixes.Tag32.B()},
|
||||
{prefixes.TagAddr.B()},
|
||||
{prefixes.Counter.B()},
|
||||
{prefixes.PubkeyIndex.B()},
|
||||
{prefixes.FullIdIndex.B()},
|
||||
}...); chk.E(err) {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package prefixes
|
||||
import (
|
||||
"realy.lol/ec/schnorr"
|
||||
"realy.lol/ratel/keys/createdat"
|
||||
"realy.lol/ratel/keys/fullid"
|
||||
"realy.lol/ratel/keys/id"
|
||||
"realy.lol/ratel/keys/index"
|
||||
"realy.lol/ratel/keys/kinder"
|
||||
@@ -87,8 +88,24 @@ const (
|
||||
// PubkeyIndex is the prefix for an index that stores a mapping between pubkeys
|
||||
// and a pubkey serial.
|
||||
//
|
||||
// todo: this is useful feature but rather than for saving space on pubkeys in
|
||||
// events might have a more useful place in some kind of search API. eg just
|
||||
// want pubkey from event id, combined with FullIdIndex.
|
||||
//
|
||||
// [ 11 ][ 32 bytes pubkey ][ 8 bytes pubkey serial ]
|
||||
PubkeyIndex
|
||||
|
||||
// FullIdIndex is a secondary table for IDs that is used to fetch the full Id
|
||||
// hash instead of fetching and unmarshalling the event. The Id index will
|
||||
// ultimately be deprecated in favor of this because returning event Ids and
|
||||
// letting the client handle pagination reduces relay complexity.
|
||||
//
|
||||
// In addition, as a mechanism of sorting, the event ID bears also a timestamp
|
||||
// from its created_at field. The serial acts as a "first seen" ordering and
|
||||
// then you also have the (claimed) chronological ordering.
|
||||
//
|
||||
// [ 2 ][ 32 bytes eventid.T ][ 8 bytes timestamp.T ][ 8 bytes Serial ]
|
||||
FullIdIndex
|
||||
)
|
||||
|
||||
// FilterPrefixes is a slice of the prefixes used by filter index to enable a loop
|
||||
@@ -102,6 +119,7 @@ var FilterPrefixes = [][]byte{
|
||||
{Tag.B()},
|
||||
{Tag32.B()},
|
||||
{TagAddr.B()},
|
||||
{FullIdIndex.B()},
|
||||
}
|
||||
|
||||
// KeySizes are the byte size of keys of each type of key prefix. int(P) or call the P.I() method
|
||||
@@ -132,4 +150,6 @@ var KeySizes = []int{
|
||||
1 + sha256.Size/2 + serial.Len,
|
||||
// PubkeyIndex
|
||||
1 + schnorr.PubKeyBytesLen + serial.Len,
|
||||
// FullIdIndex
|
||||
1 + fullid.Len + serial.Len,
|
||||
}
|
||||
|
||||
@@ -104,6 +104,7 @@ func (r *T) QueryEvents(c context.T, f *filter.T) (evs event.Ts, err error) {
|
||||
var delEvs [][]byte
|
||||
defer func() {
|
||||
for _, d := range delEvs {
|
||||
// if events were found that should be deleted, delete them
|
||||
chk.E(r.DeleteEvent(r.Ctx, eventid.NewWith(d)))
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -113,17 +113,14 @@ func (r *T) SaveEvent(c context.T, ev *event.T) (err error) {
|
||||
indexKeys = GetIndexKeysForEvent(ev, ser)
|
||||
// log.I.S(indexKeys)
|
||||
for _, k := range indexKeys {
|
||||
if err = txn.Set(k, nil); chk.E(err) {
|
||||
var val []byte
|
||||
if k[0] == prefixes.Counter.B() {
|
||||
val = keys.Write(createdat.New(timestamp.Now()))
|
||||
}
|
||||
if err = txn.Set(k, val); chk.E(err) {
|
||||
return
|
||||
}
|
||||
}
|
||||
// initialise access counter key
|
||||
counterKey := GetCounterKey(ser)
|
||||
// log.I.S(counterKey)
|
||||
val := keys.Write(createdat.New(timestamp.Now()))
|
||||
if err = txn.Set(counterKey, val); chk.E(err) {
|
||||
return
|
||||
}
|
||||
// log.D.F("saved event to ratel %s:\n%s", r.dataDir, ev.Serialize())
|
||||
return
|
||||
}); chk.E(err) {
|
||||
|
||||
@@ -10,7 +10,7 @@ import (
|
||||
)
|
||||
|
||||
func (s *Server) exportHandler(h Handler) {
|
||||
if ok := s.auth(h.Request); !ok {
|
||||
if ok := s.authAdmin(h.Request); !ok {
|
||||
s.unauthorized(h)
|
||||
return
|
||||
}
|
||||
@@ -50,7 +50,7 @@ func (s *Server) exportHandler(h Handler) {
|
||||
}
|
||||
|
||||
func (s *Server) importHandler(h Handler) {
|
||||
if ok := s.auth(h.Request); !ok {
|
||||
if ok := s.authAdmin(h.Request); !ok {
|
||||
s.unauthorized(h)
|
||||
return
|
||||
}
|
||||
@@ -65,7 +65,7 @@ func (s *Server) importHandler(h Handler) {
|
||||
}
|
||||
|
||||
func (s *Server) shutdownHandler(h Handler) {
|
||||
if ok := s.auth(h.Request); !ok {
|
||||
if ok := s.authAdmin(h.Request); !ok {
|
||||
s.unauthorized(h)
|
||||
return
|
||||
}
|
||||
@@ -74,6 +74,21 @@ func (s *Server) shutdownHandler(h Handler) {
|
||||
s.Shutdown()
|
||||
}
|
||||
|
||||
func (s *Server) handleNuke(h Handler) {
|
||||
if ok := s.authAdmin(h.Request); !ok {
|
||||
s.unauthorized(h)
|
||||
return
|
||||
}
|
||||
fprintf(h.ResponseWriter, "nuking DB")
|
||||
var err error
|
||||
if err = s.relay.Storage().Nuke(); chk.E(err) {
|
||||
}
|
||||
if realy, ok := s.relay.(*app.Relay); ok {
|
||||
realy.ZeroLists()
|
||||
realy.CheckOwnerLists(context.Bg())
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) defaultHandler(h Handler) {
|
||||
fprintf(h.ResponseWriter, "todo: realy web interface page\n\n")
|
||||
s.handleRelayInfo(h)
|
||||
|
||||
@@ -44,7 +44,7 @@ func (s *Server) JWTVerifyFunc(npub string) (jwtPub string, pk []byte, err error
|
||||
return
|
||||
}
|
||||
|
||||
func (s *Server) auth(r *http.Request) (authed bool) {
|
||||
func (s *Server) authAdmin(r *http.Request) (authed bool) {
|
||||
var valid bool
|
||||
var pubkey []byte
|
||||
var err error
|
||||
@@ -76,10 +76,11 @@ func (s *Server) HandleHTTP(h Handler) {
|
||||
Route(h, Paths{
|
||||
"application/nostr+json": {
|
||||
"/relayinfo": s.handleRelayInfo,
|
||||
"/event": s.handleSimpleEvent,
|
||||
"/events": s.handleEvents,
|
||||
},
|
||||
"": {
|
||||
// methods that may need auth depending on configuration
|
||||
"/event": s.handleSimpleEvent,
|
||||
"/events": s.handleEvents,
|
||||
// admin methods that require REALY_ADMIN_NPUBS auth
|
||||
"/nuke": s.handleNuke, // todo: need some kind of confirmation scheme on this endpoint, particularly
|
||||
"/export": s.exportHandler,
|
||||
"/import": s.importHandler,
|
||||
"/shutdown": s.shutdownHandler,
|
||||
|
||||
@@ -17,7 +17,7 @@ func Route(h Handler, p Paths) {
|
||||
acc := h.Request.Header.Get("Accept")
|
||||
log.I.S(acc)
|
||||
for proto, fns := range p {
|
||||
if proto == acc {
|
||||
if proto == acc || proto == "" {
|
||||
for path, fn := range fns {
|
||||
if path == h.URL.Path {
|
||||
fn(h)
|
||||
|
||||
Reference in New Issue
Block a user