create new index that records the links between pubkeys, events, kinds, and inbound/outbound/author
This commit is contained in:
185
pkg/database/PUBKEY_GRAPH.md
Normal file
185
pkg/database/PUBKEY_GRAPH.md
Normal file
@@ -0,0 +1,185 @@
|
||||
# Pubkey Graph System
|
||||
|
||||
## Overview
|
||||
|
||||
The pubkey graph system provides efficient social graph queries by creating bidirectional, direction-aware edges between events and pubkeys in the ORLY relay.
|
||||
|
||||
## Architecture
|
||||
|
||||
### 1. Pubkey Serial Assignment
|
||||
|
||||
**Purpose**: Compress 32-byte pubkeys to 5-byte serials for space efficiency.
|
||||
|
||||
**Tables**:
|
||||
- `pks|pubkey_hash(8)|serial(5)` - Hash-to-serial lookup (16 bytes)
|
||||
- `spk|serial(5)` → 32-byte pubkey (value) - Serial-to-pubkey reverse lookup
|
||||
|
||||
**Space Savings**: Each graph edge saves 27 bytes per pubkey reference (32 → 5 bytes).
|
||||
|
||||
### 2. Graph Edge Storage
|
||||
|
||||
**Bidirectional edges with metadata**:
|
||||
|
||||
#### EventPubkeyGraph (Forward)
|
||||
```
|
||||
epg|event_serial(5)|pubkey_serial(5)|kind(2)|direction(1) = 16 bytes
|
||||
```
|
||||
|
||||
#### PubkeyEventGraph (Reverse)
|
||||
```
|
||||
peg|pubkey_serial(5)|kind(2)|direction(1)|event_serial(5) = 16 bytes
|
||||
```
|
||||
|
||||
### 3. Direction Byte
|
||||
|
||||
The direction byte distinguishes relationship types:
|
||||
|
||||
| Value | Direction | From Event Perspective | From Pubkey Perspective |
|
||||
|-------|-----------|------------------------|-------------------------|
|
||||
| `0` | Author | This pubkey is the event author | I am the author of this event |
|
||||
| `1` | P-Tag Out | Event references this pubkey | *(not used in reverse)* |
|
||||
| `2` | P-Tag In | *(not used in forward)* | I am referenced by this event |
|
||||
|
||||
**Location in keys**:
|
||||
- **EventPubkeyGraph**: Byte 13 (after 3+5+5)
|
||||
- **PubkeyEventGraph**: Byte 10 (after 3+5+2)
|
||||
|
||||
## Graph Edge Creation
|
||||
|
||||
When an event is saved:
|
||||
|
||||
1. **Extract pubkeys**:
|
||||
- Event author: `ev.Pubkey`
|
||||
- P-tags: All `["p", "<hex-pubkey>", ...]` tags
|
||||
|
||||
2. **Get or create serials**: Each unique pubkey gets a monotonic 5-byte serial
|
||||
|
||||
3. **Create bidirectional edges**:
|
||||
|
||||
For **author** (pubkey = event author):
|
||||
```
|
||||
epg|event_serial|author_serial|kind|0 (author edge)
|
||||
peg|author_serial|kind|0|event_serial (is-author edge)
|
||||
```
|
||||
|
||||
For each **p-tag** (referenced pubkey):
|
||||
```
|
||||
epg|event_serial|ptag_serial|kind|1 (outbound reference)
|
||||
peg|ptag_serial|kind|2|event_serial (inbound reference)
|
||||
```
|
||||
|
||||
## Query Patterns
|
||||
|
||||
### Find all events authored by a pubkey
|
||||
```
|
||||
Prefix scan: peg|pubkey_serial|*|0|*
|
||||
Filter: direction == 0 (author)
|
||||
```
|
||||
|
||||
### Find all events mentioning a pubkey (inbound p-tags)
|
||||
```
|
||||
Prefix scan: peg|pubkey_serial|*|2|*
|
||||
Filter: direction == 2 (p-tag inbound)
|
||||
```
|
||||
|
||||
### Find all kind-1 events mentioning a pubkey
|
||||
```
|
||||
Prefix scan: peg|pubkey_serial|0x0001|2|*
|
||||
Exact match: kind == 1, direction == 2
|
||||
```
|
||||
|
||||
### Find all pubkeys referenced by an event (outbound p-tags)
|
||||
```
|
||||
Prefix scan: epg|event_serial|*|*|1
|
||||
Filter: direction == 1 (p-tag outbound)
|
||||
```
|
||||
|
||||
### Find the author of an event
|
||||
```
|
||||
Prefix scan: epg|event_serial|*|*|0
|
||||
Filter: direction == 0 (author)
|
||||
```
|
||||
|
||||
## Implementation Details
|
||||
|
||||
### Thread Safety
|
||||
|
||||
The `GetOrCreatePubkeySerial` function uses:
|
||||
1. Read transaction to check for existing serial
|
||||
2. If not found, get next sequence number
|
||||
3. Write transaction with double-check to handle race conditions
|
||||
4. Returns existing serial if another goroutine created it concurrently
|
||||
|
||||
### Deduplication
|
||||
|
||||
The save-event function deduplicates pubkeys before creating serials:
|
||||
- Map keyed by hex-encoded pubkey
|
||||
- Prevents duplicate edges when author is also in p-tags
|
||||
|
||||
### Edge Cases
|
||||
|
||||
1. **Author in p-tags**: Only creates author edge (direction=0), skips duplicate p-tag edge
|
||||
2. **Invalid p-tags**: Silently skipped if hex decode fails or length != 32 bytes
|
||||
3. **No p-tags**: Only author edge is created
|
||||
|
||||
## Performance Characteristics
|
||||
|
||||
### Space Efficiency
|
||||
|
||||
Per event with N unique pubkeys:
|
||||
- **Old approach** (storing full pubkeys): N × 32 bytes = 32N bytes
|
||||
- **New approach** (using serials): N × 5 bytes = 5N bytes
|
||||
- **Savings**: 27N bytes per event (84% reduction)
|
||||
|
||||
Example: Event with author + 10 p-tags:
|
||||
- Old: 11 × 32 = 352 bytes
|
||||
- New: 11 × 5 = 55 bytes
|
||||
- **Saved: 297 bytes (84%)**
|
||||
|
||||
### Query Performance
|
||||
|
||||
1. **Pubkey lookup**: O(1) hash lookup via 8-byte truncated hash
|
||||
2. **Serial generation**: O(1) atomic increment
|
||||
3. **Graph queries**: Sequential scan with prefix optimization
|
||||
4. **Kind filtering**: Built into key ordering, no event decoding needed
|
||||
|
||||
## Testing
|
||||
|
||||
Comprehensive tests verify:
|
||||
- ✅ Serial assignment and deduplication
|
||||
- ✅ Bidirectional graph edge creation
|
||||
- ✅ Multiple events sharing pubkeys
|
||||
- ✅ Direction byte correctness
|
||||
- ✅ Edge cases (invalid pubkeys, non-existent keys)
|
||||
|
||||
## Future Query APIs
|
||||
|
||||
The graph structure supports efficient queries for:
|
||||
|
||||
1. **Social Graph Queries**:
|
||||
- Who does Alice follow? (p-tags authored by Alice)
|
||||
- Who follows Bob? (p-tags referencing Bob)
|
||||
- Common connections between Alice and Bob
|
||||
|
||||
2. **Event Discovery**:
|
||||
- All replies to Alice's events (kind-1 events with p-tag to Alice)
|
||||
- All events Alice has replied to (kind-1 events by Alice with p-tags)
|
||||
- Quote reposts, mentions, reactions by event kind
|
||||
|
||||
3. **Analytics**:
|
||||
- Most-mentioned pubkeys (count p-tag-in edges)
|
||||
- Most active authors (count author edges)
|
||||
- Interaction patterns by kind
|
||||
|
||||
## Migration Notes
|
||||
|
||||
This is a **new index** that:
|
||||
- Runs alongside existing event indexes
|
||||
- Populated automatically for all new events
|
||||
- Does NOT require reindexing existing events (yet)
|
||||
- Can be backfilled via a migration if needed
|
||||
|
||||
To backfill existing events, run a migration that:
|
||||
1. Iterates all events
|
||||
2. Extracts pubkeys and creates serials
|
||||
3. Creates graph edges for each event
|
||||
@@ -26,6 +26,7 @@ type D struct {
|
||||
Logger *logger
|
||||
*badger.DB
|
||||
seq *badger.Sequence
|
||||
pubkeySeq *badger.Sequence // Sequence for pubkey serials
|
||||
ready chan struct{} // Closed when database is ready to serve requests
|
||||
queryCache *querycache.EventCache
|
||||
}
|
||||
@@ -136,6 +137,9 @@ func New(
|
||||
if d.seq, err = d.DB.GetSequence([]byte("EVENTS"), 1000); chk.E(err) {
|
||||
return
|
||||
}
|
||||
if d.pubkeySeq, err = d.DB.GetSequence([]byte("PUBKEYS"), 1000); chk.E(err) {
|
||||
return
|
||||
}
|
||||
// run code that updates indexes when new indexes have been added and bumps
|
||||
// the version so they aren't run again.
|
||||
d.RunMigrations()
|
||||
|
||||
@@ -75,6 +75,12 @@ const (
|
||||
WordPrefix = I("wrd") // word hash, serial
|
||||
ExpirationPrefix = I("exp") // timestamp of expiration
|
||||
VersionPrefix = I("ver") // database version number, for triggering reindexes when new keys are added (policy is add-only).
|
||||
|
||||
// Pubkey graph indexes
|
||||
PubkeySerialPrefix = I("pks") // pubkey hash -> pubkey serial
|
||||
SerialPubkeyPrefix = I("spk") // pubkey serial -> pubkey hash (full 32 bytes)
|
||||
EventPubkeyGraphPrefix = I("epg") // event serial -> pubkey serial (graph edges)
|
||||
PubkeyEventGraphPrefix = I("peg") // pubkey serial -> event serial (reverse edges)
|
||||
)
|
||||
|
||||
// Prefix returns the three byte human-readable prefixes that go in front of
|
||||
@@ -118,6 +124,15 @@ func Prefix(prf int) (i I) {
|
||||
return VersionPrefix
|
||||
case Word:
|
||||
return WordPrefix
|
||||
|
||||
case PubkeySerial:
|
||||
return PubkeySerialPrefix
|
||||
case SerialPubkey:
|
||||
return SerialPubkeyPrefix
|
||||
case EventPubkeyGraph:
|
||||
return EventPubkeyGraphPrefix
|
||||
case PubkeyEventGraph:
|
||||
return PubkeyEventGraphPrefix
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -167,6 +182,15 @@ func Identify(r io.Reader) (i int, err error) {
|
||||
i = Expiration
|
||||
case WordPrefix:
|
||||
i = Word
|
||||
|
||||
case PubkeySerialPrefix:
|
||||
i = PubkeySerial
|
||||
case SerialPubkeyPrefix:
|
||||
i = SerialPubkey
|
||||
case EventPubkeyGraphPrefix:
|
||||
i = EventPubkeyGraph
|
||||
case PubkeyEventGraphPrefix:
|
||||
i = PubkeyEventGraph
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -519,3 +543,68 @@ func VersionDec(
|
||||
) (enc *T) {
|
||||
return New(NewPrefix(), ver)
|
||||
}
|
||||
|
||||
// PubkeySerial maps a pubkey hash to its unique serial number
|
||||
//
|
||||
// 3 prefix|8 pubkey hash|5 serial
|
||||
var PubkeySerial = next()
|
||||
|
||||
func PubkeySerialVars() (p *types.PubHash, ser *types.Uint40) {
|
||||
return new(types.PubHash), new(types.Uint40)
|
||||
}
|
||||
func PubkeySerialEnc(p *types.PubHash, ser *types.Uint40) (enc *T) {
|
||||
return New(NewPrefix(PubkeySerial), p, ser)
|
||||
}
|
||||
func PubkeySerialDec(p *types.PubHash, ser *types.Uint40) (enc *T) {
|
||||
return New(NewPrefix(), p, ser)
|
||||
}
|
||||
|
||||
// SerialPubkey maps a pubkey serial to the full 32-byte pubkey
|
||||
// This stores the full pubkey (32 bytes) as the value, not inline
|
||||
//
|
||||
// 3 prefix|5 serial -> 32 byte pubkey value
|
||||
var SerialPubkey = next()
|
||||
|
||||
func SerialPubkeyVars() (ser *types.Uint40) {
|
||||
return new(types.Uint40)
|
||||
}
|
||||
func SerialPubkeyEnc(ser *types.Uint40) (enc *T) {
|
||||
return New(NewPrefix(SerialPubkey), ser)
|
||||
}
|
||||
func SerialPubkeyDec(ser *types.Uint40) (enc *T) {
|
||||
return New(NewPrefix(), ser)
|
||||
}
|
||||
|
||||
// EventPubkeyGraph creates a bidirectional graph edge between events and pubkeys
|
||||
// This stores event_serial -> pubkey_serial relationships with event kind and direction
|
||||
// Direction: 0=author, 1=p-tag-out (event references pubkey)
|
||||
//
|
||||
// 3 prefix|5 event serial|5 pubkey serial|2 kind|1 direction
|
||||
var EventPubkeyGraph = next()
|
||||
|
||||
func EventPubkeyGraphVars() (eventSer *types.Uint40, pubkeySer *types.Uint40, kind *types.Uint16, direction *types.Letter) {
|
||||
return new(types.Uint40), new(types.Uint40), new(types.Uint16), new(types.Letter)
|
||||
}
|
||||
func EventPubkeyGraphEnc(eventSer *types.Uint40, pubkeySer *types.Uint40, kind *types.Uint16, direction *types.Letter) (enc *T) {
|
||||
return New(NewPrefix(EventPubkeyGraph), eventSer, pubkeySer, kind, direction)
|
||||
}
|
||||
func EventPubkeyGraphDec(eventSer *types.Uint40, pubkeySer *types.Uint40, kind *types.Uint16, direction *types.Letter) (enc *T) {
|
||||
return New(NewPrefix(), eventSer, pubkeySer, kind, direction)
|
||||
}
|
||||
|
||||
// PubkeyEventGraph creates the reverse edge: pubkey_serial -> event_serial with event kind and direction
|
||||
// This enables querying all events related to a pubkey, optionally filtered by kind and direction
|
||||
// Direction: 0=is-author, 2=p-tag-in (pubkey is referenced by event)
|
||||
//
|
||||
// 3 prefix|5 pubkey serial|2 kind|1 direction|5 event serial
|
||||
var PubkeyEventGraph = next()
|
||||
|
||||
func PubkeyEventGraphVars() (pubkeySer *types.Uint40, kind *types.Uint16, direction *types.Letter, eventSer *types.Uint40) {
|
||||
return new(types.Uint40), new(types.Uint16), new(types.Letter), new(types.Uint40)
|
||||
}
|
||||
func PubkeyEventGraphEnc(pubkeySer *types.Uint40, kind *types.Uint16, direction *types.Letter, eventSer *types.Uint40) (enc *T) {
|
||||
return New(NewPrefix(PubkeyEventGraph), pubkeySer, kind, direction, eventSer)
|
||||
}
|
||||
func PubkeyEventGraphDec(pubkeySer *types.Uint40, kind *types.Uint16, direction *types.Letter, eventSer *types.Uint40) (enc *T) {
|
||||
return New(NewPrefix(), pubkeySer, kind, direction, eventSer)
|
||||
}
|
||||
|
||||
@@ -8,6 +8,13 @@ import (
|
||||
|
||||
const LetterLen = 1
|
||||
|
||||
// Edge direction constants for pubkey graph relationships
|
||||
const (
|
||||
EdgeDirectionAuthor byte = 0 // The pubkey is the event author
|
||||
EdgeDirectionPTagOut byte = 1 // Outbound: Event author references this pubkey in p-tag
|
||||
EdgeDirectionPTagIn byte = 2 // Inbound: This pubkey is referenced in event's p-tag
|
||||
)
|
||||
|
||||
type Letter struct {
|
||||
val byte
|
||||
}
|
||||
|
||||
365
pkg/database/pubkey-graph_test.go
Normal file
365
pkg/database/pubkey-graph_test.go
Normal file
@@ -0,0 +1,365 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/dgraph-io/badger/v4"
|
||||
"next.orly.dev/pkg/database/indexes"
|
||||
"next.orly.dev/pkg/database/indexes/types"
|
||||
"next.orly.dev/pkg/encoders/event"
|
||||
"next.orly.dev/pkg/encoders/hex"
|
||||
"next.orly.dev/pkg/encoders/tag"
|
||||
)
|
||||
|
||||
func TestPubkeySerialAssignment(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
db, err := New(ctx, cancel, t.TempDir(), "info")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create database: %v", err)
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
// Create a test pubkey
|
||||
pubkey1 := make([]byte, 32)
|
||||
for i := range pubkey1 {
|
||||
pubkey1[i] = byte(i)
|
||||
}
|
||||
|
||||
// Get or create serial for the first time
|
||||
t.Logf("First call: GetOrCreatePubkeySerial for pubkey %s", hex.Enc(pubkey1))
|
||||
ser1, err := db.GetOrCreatePubkeySerial(pubkey1)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to get or create pubkey serial: %v", err)
|
||||
}
|
||||
|
||||
if ser1 == nil {
|
||||
t.Fatal("Serial should not be nil")
|
||||
}
|
||||
t.Logf("First call returned serial: %d", ser1.Get())
|
||||
|
||||
// Debug: List all keys in database
|
||||
var keyCount int
|
||||
db.View(func(txn *badger.Txn) error {
|
||||
it := txn.NewIterator(badger.DefaultIteratorOptions)
|
||||
defer it.Close()
|
||||
for it.Rewind(); it.Valid(); it.Next() {
|
||||
key := it.Item().KeyCopy(nil)
|
||||
t.Logf("Found key: %s (len=%d)", hex.Enc(key), len(key))
|
||||
keyCount++
|
||||
if keyCount > 20 {
|
||||
break // Limit output
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
t.Logf("Total keys found (first 20): %d", keyCount)
|
||||
|
||||
// Debug: what prefix should we be looking for?
|
||||
pubHash := new(types.PubHash)
|
||||
pubHash.FromPubkey(pubkey1)
|
||||
expectedPrefix := []byte(indexes.PubkeySerialPrefix)
|
||||
t.Logf("Expected PubkeySerial prefix: %s = %s", string(expectedPrefix), hex.Enc(expectedPrefix))
|
||||
|
||||
// Try direct lookup
|
||||
t.Logf("Direct lookup: GetPubkeySerial for same pubkey")
|
||||
serDirect, err := db.GetPubkeySerial(pubkey1)
|
||||
if err != nil {
|
||||
t.Logf("Direct lookup failed: %v", err)
|
||||
} else {
|
||||
t.Logf("Direct lookup returned serial: %d", serDirect.Get())
|
||||
}
|
||||
|
||||
// Get the same pubkey again - should return the same serial
|
||||
t.Logf("Second call: GetOrCreatePubkeySerial for same pubkey")
|
||||
ser2, err := db.GetOrCreatePubkeySerial(pubkey1)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to get existing pubkey serial: %v", err)
|
||||
}
|
||||
t.Logf("Second call returned serial: %d", ser2.Get())
|
||||
|
||||
if ser1.Get() != ser2.Get() {
|
||||
t.Errorf("Expected same serial, got %d and %d", ser1.Get(), ser2.Get())
|
||||
}
|
||||
|
||||
// Create a different pubkey
|
||||
pubkey2 := make([]byte, 32)
|
||||
for i := range pubkey2 {
|
||||
pubkey2[i] = byte(i + 100)
|
||||
}
|
||||
|
||||
ser3, err := db.GetOrCreatePubkeySerial(pubkey2)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to get or create second pubkey serial: %v", err)
|
||||
}
|
||||
|
||||
if ser3.Get() == ser1.Get() {
|
||||
t.Error("Different pubkeys should have different serials")
|
||||
}
|
||||
|
||||
// Test reverse lookup: serial -> pubkey
|
||||
retrievedPubkey1, err := db.GetPubkeyBySerial(ser1)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to get pubkey by serial: %v", err)
|
||||
}
|
||||
|
||||
if hex.Enc(retrievedPubkey1) != hex.Enc(pubkey1) {
|
||||
t.Errorf("Retrieved pubkey doesn't match. Expected %s, got %s",
|
||||
hex.Enc(pubkey1), hex.Enc(retrievedPubkey1))
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventPubkeyGraph(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
db, err := New(ctx, cancel, t.TempDir(), "info")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create database: %v", err)
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
// Create test event with author and p-tags
|
||||
authorPubkey, _ := hex.Dec("0000000000000000000000000000000000000000000000000000000000000001")
|
||||
pTagPubkey1, _ := hex.Dec("0000000000000000000000000000000000000000000000000000000000000002")
|
||||
pTagPubkey2, _ := hex.Dec("0000000000000000000000000000000000000000000000000000000000000003")
|
||||
|
||||
eventID := make([]byte, 32)
|
||||
eventID[0] = 1
|
||||
eventSig := make([]byte, 64)
|
||||
eventSig[0] = 1
|
||||
|
||||
ev := &event.E{
|
||||
ID: eventID,
|
||||
Pubkey: authorPubkey,
|
||||
CreatedAt: 1234567890,
|
||||
Kind: 1, // text note
|
||||
Content: []byte("Test event with p-tags"),
|
||||
Sig: eventSig,
|
||||
Tags: tag.NewS(
|
||||
tag.NewFromAny("p", hex.Enc(pTagPubkey1)),
|
||||
tag.NewFromAny("p", hex.Enc(pTagPubkey2)),
|
||||
tag.NewFromAny("e", "someeventid"),
|
||||
),
|
||||
}
|
||||
|
||||
// Save the event - this should create pubkey serials and graph edges
|
||||
_, err = db.SaveEvent(ctx, ev)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to save event: %v", err)
|
||||
}
|
||||
|
||||
// Verify that pubkey serials were created
|
||||
authorSerial, err := db.GetPubkeySerial(authorPubkey)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to get author pubkey serial: %v", err)
|
||||
}
|
||||
if authorSerial == nil {
|
||||
t.Fatal("Author serial should not be nil")
|
||||
}
|
||||
|
||||
pTag1Serial, err := db.GetPubkeySerial(pTagPubkey1)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to get p-tag1 pubkey serial: %v", err)
|
||||
}
|
||||
if pTag1Serial == nil {
|
||||
t.Fatal("P-tag1 serial should not be nil")
|
||||
}
|
||||
|
||||
pTag2Serial, err := db.GetPubkeySerial(pTagPubkey2)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to get p-tag2 pubkey serial: %v", err)
|
||||
}
|
||||
if pTag2Serial == nil {
|
||||
t.Fatal("P-tag2 serial should not be nil")
|
||||
}
|
||||
|
||||
// Verify all three pubkeys have different serials
|
||||
if authorSerial.Get() == pTag1Serial.Get() || authorSerial.Get() == pTag2Serial.Get() || pTag1Serial.Get() == pTag2Serial.Get() {
|
||||
t.Error("All pubkey serials should be unique")
|
||||
}
|
||||
|
||||
t.Logf("Event saved successfully with graph edges:")
|
||||
t.Logf(" Author serial: %d", authorSerial.Get())
|
||||
t.Logf(" P-tag1 serial: %d", pTag1Serial.Get())
|
||||
t.Logf(" P-tag2 serial: %d", pTag2Serial.Get())
|
||||
}
|
||||
|
||||
func TestMultipleEventsWithSamePubkeys(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
db, err := New(ctx, cancel, t.TempDir(), "info")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create database: %v", err)
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
// Create two events from the same author mentioning the same person
|
||||
authorPubkey, _ := hex.Dec("0000000000000000000000000000000000000000000000000000000000000001")
|
||||
pTagPubkey, _ := hex.Dec("0000000000000000000000000000000000000000000000000000000000000002")
|
||||
|
||||
eventID1 := make([]byte, 32)
|
||||
eventID1[0] = 1
|
||||
eventSig1 := make([]byte, 64)
|
||||
eventSig1[0] = 1
|
||||
|
||||
ev1 := &event.E{
|
||||
ID: eventID1,
|
||||
Pubkey: authorPubkey,
|
||||
CreatedAt: 1234567890,
|
||||
Kind: 1,
|
||||
Content: []byte("First event"),
|
||||
Sig: eventSig1,
|
||||
Tags: tag.NewS(
|
||||
tag.NewFromAny("p", hex.Enc(pTagPubkey)),
|
||||
),
|
||||
}
|
||||
|
||||
eventID2 := make([]byte, 32)
|
||||
eventID2[0] = 2
|
||||
eventSig2 := make([]byte, 64)
|
||||
eventSig2[0] = 2
|
||||
|
||||
ev2 := &event.E{
|
||||
ID: eventID2,
|
||||
Pubkey: authorPubkey,
|
||||
CreatedAt: 1234567891,
|
||||
Kind: 1,
|
||||
Content: []byte("Second event"),
|
||||
Sig: eventSig2,
|
||||
Tags: tag.NewS(
|
||||
tag.NewFromAny("p", hex.Enc(pTagPubkey)),
|
||||
),
|
||||
}
|
||||
|
||||
// Save both events
|
||||
_, err = db.SaveEvent(ctx, ev1)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to save event 1: %v", err)
|
||||
}
|
||||
|
||||
_, err = db.SaveEvent(ctx, ev2)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to save event 2: %v", err)
|
||||
}
|
||||
|
||||
// Verify the same pubkeys got the same serials
|
||||
authorSerial1, _ := db.GetPubkeySerial(authorPubkey)
|
||||
pTagSerial1, _ := db.GetPubkeySerial(pTagPubkey)
|
||||
|
||||
if authorSerial1 == nil || pTagSerial1 == nil {
|
||||
t.Fatal("Pubkey serials should exist after saving events")
|
||||
}
|
||||
|
||||
t.Logf("Both events share the same pubkey serials:")
|
||||
t.Logf(" Author serial: %d", authorSerial1.Get())
|
||||
t.Logf(" P-tag serial: %d", pTagSerial1.Get())
|
||||
}
|
||||
|
||||
func TestPubkeySerialEdgeCases(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
db, err := New(ctx, cancel, t.TempDir(), "info")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create database: %v", err)
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
// Test with invalid pubkey length
|
||||
invalidPubkey := make([]byte, 16) // Wrong length
|
||||
_, err = db.GetOrCreatePubkeySerial(invalidPubkey)
|
||||
if err == nil {
|
||||
t.Error("Should reject pubkey with invalid length")
|
||||
}
|
||||
|
||||
// Test GetPubkeySerial for non-existent pubkey
|
||||
nonExistentPubkey := make([]byte, 32)
|
||||
for i := range nonExistentPubkey {
|
||||
nonExistentPubkey[i] = 0xFF
|
||||
}
|
||||
|
||||
_, err = db.GetPubkeySerial(nonExistentPubkey)
|
||||
if err == nil {
|
||||
t.Error("Should return error for non-existent pubkey serial")
|
||||
}
|
||||
}
|
||||
|
||||
func TestGraphEdgeDirections(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
db, err := New(ctx, cancel, t.TempDir(), "info")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create database: %v", err)
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
// Create test event with author and p-tags
|
||||
authorPubkey, _ := hex.Dec("0000000000000000000000000000000000000000000000000000000000000001")
|
||||
pTagPubkey, _ := hex.Dec("0000000000000000000000000000000000000000000000000000000000000002")
|
||||
|
||||
eventID := make([]byte, 32)
|
||||
eventID[0] = 1
|
||||
eventSig := make([]byte, 64)
|
||||
eventSig[0] = 1
|
||||
|
||||
ev := &event.E{
|
||||
ID: eventID,
|
||||
Pubkey: authorPubkey,
|
||||
CreatedAt: 1234567890,
|
||||
Kind: 1, // text note
|
||||
Content: []byte("Test event"),
|
||||
Sig: eventSig,
|
||||
Tags: tag.NewS(
|
||||
tag.NewFromAny("p", hex.Enc(pTagPubkey)),
|
||||
),
|
||||
}
|
||||
|
||||
// Save the event
|
||||
_, err = db.SaveEvent(ctx, ev)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to save event: %v", err)
|
||||
}
|
||||
|
||||
// Verify graph edges with correct direction bytes
|
||||
// Look for PubkeyEventGraph keys and check direction byte
|
||||
var foundAuthorEdge, foundPTagEdge bool
|
||||
db.View(func(txn *badger.Txn) error {
|
||||
it := txn.NewIterator(badger.DefaultIteratorOptions)
|
||||
defer it.Close()
|
||||
|
||||
prefix := []byte(indexes.PubkeyEventGraphPrefix)
|
||||
for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
|
||||
key := it.Item().KeyCopy(nil)
|
||||
// Key format: peg(3)|pubkey_serial(5)|kind(2)|direction(1)|event_serial(5) = 16 bytes
|
||||
if len(key) == 16 {
|
||||
direction := key[10] // Byte at position 10 is the direction
|
||||
t.Logf("Found PubkeyEventGraph edge: key=%s, direction=%d", hex.Enc(key), direction)
|
||||
|
||||
if direction == types.EdgeDirectionAuthor {
|
||||
foundAuthorEdge = true
|
||||
t.Logf(" ✓ Found author edge (direction=0)")
|
||||
} else if direction == types.EdgeDirectionPTagIn {
|
||||
foundPTagEdge = true
|
||||
t.Logf(" ✓ Found p-tag inbound edge (direction=2)")
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
if !foundAuthorEdge {
|
||||
t.Error("Did not find author edge with direction=0")
|
||||
}
|
||||
if !foundPTagEdge {
|
||||
t.Error("Did not find p-tag inbound edge with direction=2")
|
||||
}
|
||||
|
||||
t.Logf("Graph edges correctly stored with direction bytes:")
|
||||
t.Logf(" Author edge: %v (direction=0)", foundAuthorEdge)
|
||||
t.Logf(" P-tag inbound edge: %v (direction=2)", foundPTagEdge)
|
||||
}
|
||||
197
pkg/database/pubkey-serial.go
Normal file
197
pkg/database/pubkey-serial.go
Normal file
@@ -0,0 +1,197 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
|
||||
"github.com/dgraph-io/badger/v4"
|
||||
"lol.mleku.dev/chk"
|
||||
"next.orly.dev/pkg/database/indexes"
|
||||
"next.orly.dev/pkg/database/indexes/types"
|
||||
"next.orly.dev/pkg/encoders/hex"
|
||||
)
|
||||
|
||||
// GetOrCreatePubkeySerial returns the serial for a pubkey, creating one if it doesn't exist.
|
||||
// The pubkey parameter should be 32 bytes (schnorr public key).
|
||||
// This function is thread-safe and uses transactions to ensure atomicity.
|
||||
func (d *D) GetOrCreatePubkeySerial(pubkey []byte) (ser *types.Uint40, err error) {
|
||||
if len(pubkey) != 32 {
|
||||
err = errors.New("pubkey must be 32 bytes")
|
||||
return
|
||||
}
|
||||
|
||||
// Create pubkey hash
|
||||
pubHash := new(types.PubHash)
|
||||
if err = pubHash.FromPubkey(pubkey); chk.E(err) {
|
||||
return
|
||||
}
|
||||
|
||||
// First, try to get existing serial (separate transaction for read)
|
||||
var existingSer *types.Uint40
|
||||
existingSer, err = d.GetPubkeySerial(pubkey)
|
||||
if err == nil && existingSer != nil {
|
||||
// Serial already exists
|
||||
ser = existingSer
|
||||
return ser, nil
|
||||
}
|
||||
|
||||
// Serial doesn't exist, create a new one
|
||||
var serial uint64
|
||||
if serial, err = d.pubkeySeq.Next(); chk.E(err) {
|
||||
return
|
||||
}
|
||||
|
||||
ser = new(types.Uint40)
|
||||
if err = ser.Set(serial); chk.E(err) {
|
||||
return
|
||||
}
|
||||
|
||||
// Store both mappings in a transaction
|
||||
err = d.Update(func(txn *badger.Txn) error {
|
||||
// Double-check that the serial wasn't created by another goroutine
|
||||
// while we were getting the sequence number
|
||||
prefixBuf := new(bytes.Buffer)
|
||||
prefixBuf.Write([]byte(indexes.PubkeySerialPrefix))
|
||||
if terr := pubHash.MarshalWrite(prefixBuf); chk.E(terr) {
|
||||
return terr
|
||||
}
|
||||
searchPrefix := prefixBuf.Bytes()
|
||||
|
||||
opts := badger.DefaultIteratorOptions
|
||||
opts.PrefetchValues = false
|
||||
opts.Prefix = searchPrefix
|
||||
it := txn.NewIterator(opts)
|
||||
it.Seek(searchPrefix)
|
||||
if it.Valid() {
|
||||
// Another goroutine created it, extract and return that serial
|
||||
key := it.Item().KeyCopy(nil)
|
||||
it.Close()
|
||||
if len(key) == 16 {
|
||||
serialBytes := key[11:16]
|
||||
serialBuf := bytes.NewReader(serialBytes)
|
||||
existSer := new(types.Uint40)
|
||||
if terr := existSer.UnmarshalRead(serialBuf); terr == nil {
|
||||
ser = existSer
|
||||
return nil // Don't write, just return the existing serial
|
||||
}
|
||||
}
|
||||
}
|
||||
it.Close()
|
||||
|
||||
// Store pubkey hash -> serial mapping
|
||||
keyBuf := new(bytes.Buffer)
|
||||
if terr := indexes.PubkeySerialEnc(pubHash, ser).MarshalWrite(keyBuf); chk.E(terr) {
|
||||
return terr
|
||||
}
|
||||
fullKey := make([]byte, len(keyBuf.Bytes()))
|
||||
copy(fullKey, keyBuf.Bytes())
|
||||
// DEBUG: log the key being written
|
||||
if len(fullKey) > 0 {
|
||||
// log.T.F("Writing PubkeySerial: key=%s (len=%d), prefix=%s", hex.Enc(fullKey), len(fullKey), string(fullKey[:3]))
|
||||
}
|
||||
if terr := txn.Set(fullKey, nil); chk.E(terr) {
|
||||
return terr
|
||||
}
|
||||
|
||||
// Store serial -> full pubkey mapping (pubkey stored as value)
|
||||
keyBuf.Reset()
|
||||
if terr := indexes.SerialPubkeyEnc(ser).MarshalWrite(keyBuf); chk.E(terr) {
|
||||
return terr
|
||||
}
|
||||
if terr := txn.Set(keyBuf.Bytes(), pubkey); chk.E(terr) {
|
||||
return terr
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// GetPubkeySerial returns the serial for a pubkey if it exists.
|
||||
// Returns an error if the pubkey doesn't have a serial yet.
|
||||
func (d *D) GetPubkeySerial(pubkey []byte) (ser *types.Uint40, err error) {
|
||||
if len(pubkey) != 32 {
|
||||
err = errors.New("pubkey must be 32 bytes")
|
||||
return
|
||||
}
|
||||
|
||||
// Create pubkey hash
|
||||
pubHash := new(types.PubHash)
|
||||
if err = pubHash.FromPubkey(pubkey); chk.E(err) {
|
||||
return
|
||||
}
|
||||
|
||||
// Build search key with just prefix + pubkey hash (no serial)
|
||||
prefixBuf := new(bytes.Buffer)
|
||||
prefixBuf.Write([]byte(indexes.PubkeySerialPrefix)) // 3 bytes
|
||||
if err = pubHash.MarshalWrite(prefixBuf); chk.E(err) {
|
||||
return
|
||||
}
|
||||
searchPrefix := prefixBuf.Bytes() // Should be 11 bytes: 3 (prefix) + 8 (pubkey hash)
|
||||
|
||||
ser = new(types.Uint40)
|
||||
err = d.View(func(txn *badger.Txn) error {
|
||||
opts := badger.DefaultIteratorOptions
|
||||
opts.PrefetchValues = false // We only need the key
|
||||
it := txn.NewIterator(opts)
|
||||
defer it.Close()
|
||||
|
||||
// Seek to the prefix and check if we found a matching key
|
||||
it.Seek(searchPrefix)
|
||||
if !it.ValidForPrefix(searchPrefix) {
|
||||
return errors.New("pubkey serial not found")
|
||||
}
|
||||
|
||||
// Extract serial from key (last 5 bytes)
|
||||
// Key format: prefix(3) + pubkey_hash(8) + serial(5) = 16 bytes
|
||||
key := it.Item().KeyCopy(nil)
|
||||
if len(key) != 16 {
|
||||
return errors.New("invalid key length for pubkey serial")
|
||||
}
|
||||
|
||||
// Verify the prefix matches
|
||||
if !bytes.HasPrefix(key, searchPrefix) {
|
||||
return errors.New("key prefix mismatch")
|
||||
}
|
||||
|
||||
serialBytes := key[11:16] // Extract last 5 bytes (the serial)
|
||||
|
||||
// Decode serial
|
||||
serialBuf := bytes.NewReader(serialBytes)
|
||||
if err := ser.UnmarshalRead(serialBuf); chk.E(err) {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// GetPubkeyBySerial returns the full 32-byte pubkey for a given serial.
|
||||
func (d *D) GetPubkeyBySerial(ser *types.Uint40) (pubkey []byte, err error) {
|
||||
keyBuf := new(bytes.Buffer)
|
||||
if err = indexes.SerialPubkeyEnc(ser).MarshalWrite(keyBuf); chk.E(err) {
|
||||
return
|
||||
}
|
||||
|
||||
err = d.View(func(txn *badger.Txn) error {
|
||||
item, gerr := txn.Get(keyBuf.Bytes())
|
||||
if chk.E(gerr) {
|
||||
return gerr
|
||||
}
|
||||
|
||||
return item.Value(func(val []byte) error {
|
||||
pubkey = make([]byte, len(val))
|
||||
copy(pubkey, val)
|
||||
return nil
|
||||
})
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
err = errors.New("pubkey not found for serial: " + hex.Enc([]byte{byte(ser.Get())}))
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
@@ -180,6 +180,47 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) (
|
||||
if idxs, err = GetIndexesForEvent(ev, serial); chk.E(err) {
|
||||
return
|
||||
}
|
||||
|
||||
// Collect all pubkeys for graph: author + p-tags
|
||||
// Store with direction indicator: author (0) vs p-tag (1)
|
||||
type pubkeyWithDirection struct {
|
||||
serial *types.Uint40
|
||||
isAuthor bool
|
||||
}
|
||||
pubkeysForGraph := make(map[string]pubkeyWithDirection)
|
||||
|
||||
// Add author pubkey
|
||||
var authorSerial *types.Uint40
|
||||
if authorSerial, err = d.GetOrCreatePubkeySerial(ev.Pubkey); chk.E(err) {
|
||||
return
|
||||
}
|
||||
pubkeysForGraph[hex.Enc(ev.Pubkey)] = pubkeyWithDirection{
|
||||
serial: authorSerial,
|
||||
isAuthor: true,
|
||||
}
|
||||
|
||||
// Extract p-tag pubkeys using GetAll
|
||||
pTags := ev.Tags.GetAll([]byte("p"))
|
||||
for _, pTag := range pTags {
|
||||
if len(pTag.T) >= 2 {
|
||||
// Decode hex pubkey from p-tag
|
||||
var ptagPubkey []byte
|
||||
if ptagPubkey, err = hex.Dec(string(pTag.T[tag.Value])); err == nil && len(ptagPubkey) == 32 {
|
||||
pkHex := hex.Enc(ptagPubkey)
|
||||
// Skip if already added as author
|
||||
if _, exists := pubkeysForGraph[pkHex]; !exists {
|
||||
var ptagSerial *types.Uint40
|
||||
if ptagSerial, err = d.GetOrCreatePubkeySerial(ptagPubkey); chk.E(err) {
|
||||
return
|
||||
}
|
||||
pubkeysForGraph[pkHex] = pubkeyWithDirection{
|
||||
serial: ptagSerial,
|
||||
isAuthor: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// log.T.F(
|
||||
// "SaveEvent: generated %d indexes for event %x (kind %d)", len(idxs),
|
||||
// ev.ID, ev.Kind,
|
||||
@@ -320,6 +361,48 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) (
|
||||
}
|
||||
log.T.F("SaveEvent: also stored replaceable event with specialized key")
|
||||
}
|
||||
|
||||
// Create graph edges between event and all related pubkeys
|
||||
// This creates bidirectional edges: event->pubkey and pubkey->event
|
||||
// Include the event kind and direction for efficient graph queries
|
||||
eventKind := new(types.Uint16)
|
||||
eventKind.Set(ev.Kind)
|
||||
|
||||
for _, pkInfo := range pubkeysForGraph {
|
||||
// Determine direction for forward edge (event -> pubkey perspective)
|
||||
directionForward := new(types.Letter)
|
||||
// Determine direction for reverse edge (pubkey -> event perspective)
|
||||
directionReverse := new(types.Letter)
|
||||
|
||||
if pkInfo.isAuthor {
|
||||
// Event author relationship
|
||||
directionForward.Set(types.EdgeDirectionAuthor) // 0: author
|
||||
directionReverse.Set(types.EdgeDirectionAuthor) // 0: is author of event
|
||||
} else {
|
||||
// P-tag relationship
|
||||
directionForward.Set(types.EdgeDirectionPTagOut) // 1: event references pubkey (outbound)
|
||||
directionReverse.Set(types.EdgeDirectionPTagIn) // 2: pubkey is referenced (inbound)
|
||||
}
|
||||
|
||||
// Create event -> pubkey edge (with kind and direction)
|
||||
keyBuf := new(bytes.Buffer)
|
||||
if err = indexes.EventPubkeyGraphEnc(ser, pkInfo.serial, eventKind, directionForward).MarshalWrite(keyBuf); chk.E(err) {
|
||||
return
|
||||
}
|
||||
if err = txn.Set(keyBuf.Bytes(), nil); chk.E(err) {
|
||||
return
|
||||
}
|
||||
|
||||
// Create pubkey -> event edge (reverse, with kind and direction for filtering)
|
||||
keyBuf.Reset()
|
||||
if err = indexes.PubkeyEventGraphEnc(pkInfo.serial, eventKind, directionReverse, ser).MarshalWrite(keyBuf); chk.E(err) {
|
||||
return
|
||||
}
|
||||
if err = txn.Set(keyBuf.Bytes(), nil); chk.E(err) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
},
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user