diff --git a/pkg/database/PUBKEY_GRAPH.md b/pkg/database/PUBKEY_GRAPH.md new file mode 100644 index 0000000..2354f88 --- /dev/null +++ b/pkg/database/PUBKEY_GRAPH.md @@ -0,0 +1,185 @@ +# Pubkey Graph System + +## Overview + +The pubkey graph system provides efficient social graph queries by creating bidirectional, direction-aware edges between events and pubkeys in the ORLY relay. + +## Architecture + +### 1. Pubkey Serial Assignment + +**Purpose**: Compress 32-byte pubkeys to 5-byte serials for space efficiency. + +**Tables**: +- `pks|pubkey_hash(8)|serial(5)` - Hash-to-serial lookup (16 bytes) +- `spk|serial(5)` → 32-byte pubkey (value) - Serial-to-pubkey reverse lookup + +**Space Savings**: Each graph edge saves 27 bytes per pubkey reference (32 → 5 bytes). + +### 2. Graph Edge Storage + +**Bidirectional edges with metadata**: + +#### EventPubkeyGraph (Forward) +``` +epg|event_serial(5)|pubkey_serial(5)|kind(2)|direction(1) = 16 bytes +``` + +#### PubkeyEventGraph (Reverse) +``` +peg|pubkey_serial(5)|kind(2)|direction(1)|event_serial(5) = 16 bytes +``` + +### 3. Direction Byte + +The direction byte distinguishes relationship types: + +| Value | Direction | From Event Perspective | From Pubkey Perspective | +|-------|-----------|------------------------|-------------------------| +| `0` | Author | This pubkey is the event author | I am the author of this event | +| `1` | P-Tag Out | Event references this pubkey | *(not used in reverse)* | +| `2` | P-Tag In | *(not used in forward)* | I am referenced by this event | + +**Location in keys**: +- **EventPubkeyGraph**: Byte 13 (after 3+5+5) +- **PubkeyEventGraph**: Byte 10 (after 3+5+2) + +## Graph Edge Creation + +When an event is saved: + +1. **Extract pubkeys**: + - Event author: `ev.Pubkey` + - P-tags: All `["p", "", ...]` tags + +2. **Get or create serials**: Each unique pubkey gets a monotonic 5-byte serial + +3. **Create bidirectional edges**: + + For **author** (pubkey = event author): + ``` + epg|event_serial|author_serial|kind|0 (author edge) + peg|author_serial|kind|0|event_serial (is-author edge) + ``` + + For each **p-tag** (referenced pubkey): + ``` + epg|event_serial|ptag_serial|kind|1 (outbound reference) + peg|ptag_serial|kind|2|event_serial (inbound reference) + ``` + +## Query Patterns + +### Find all events authored by a pubkey +``` +Prefix scan: peg|pubkey_serial|*|0|* +Filter: direction == 0 (author) +``` + +### Find all events mentioning a pubkey (inbound p-tags) +``` +Prefix scan: peg|pubkey_serial|*|2|* +Filter: direction == 2 (p-tag inbound) +``` + +### Find all kind-1 events mentioning a pubkey +``` +Prefix scan: peg|pubkey_serial|0x0001|2|* +Exact match: kind == 1, direction == 2 +``` + +### Find all pubkeys referenced by an event (outbound p-tags) +``` +Prefix scan: epg|event_serial|*|*|1 +Filter: direction == 1 (p-tag outbound) +``` + +### Find the author of an event +``` +Prefix scan: epg|event_serial|*|*|0 +Filter: direction == 0 (author) +``` + +## Implementation Details + +### Thread Safety + +The `GetOrCreatePubkeySerial` function uses: +1. Read transaction to check for existing serial +2. If not found, get next sequence number +3. Write transaction with double-check to handle race conditions +4. Returns existing serial if another goroutine created it concurrently + +### Deduplication + +The save-event function deduplicates pubkeys before creating serials: +- Map keyed by hex-encoded pubkey +- Prevents duplicate edges when author is also in p-tags + +### Edge Cases + +1. **Author in p-tags**: Only creates author edge (direction=0), skips duplicate p-tag edge +2. **Invalid p-tags**: Silently skipped if hex decode fails or length != 32 bytes +3. **No p-tags**: Only author edge is created + +## Performance Characteristics + +### Space Efficiency + +Per event with N unique pubkeys: +- **Old approach** (storing full pubkeys): N × 32 bytes = 32N bytes +- **New approach** (using serials): N × 5 bytes = 5N bytes +- **Savings**: 27N bytes per event (84% reduction) + +Example: Event with author + 10 p-tags: +- Old: 11 × 32 = 352 bytes +- New: 11 × 5 = 55 bytes +- **Saved: 297 bytes (84%)** + +### Query Performance + +1. **Pubkey lookup**: O(1) hash lookup via 8-byte truncated hash +2. **Serial generation**: O(1) atomic increment +3. **Graph queries**: Sequential scan with prefix optimization +4. **Kind filtering**: Built into key ordering, no event decoding needed + +## Testing + +Comprehensive tests verify: +- ✅ Serial assignment and deduplication +- ✅ Bidirectional graph edge creation +- ✅ Multiple events sharing pubkeys +- ✅ Direction byte correctness +- ✅ Edge cases (invalid pubkeys, non-existent keys) + +## Future Query APIs + +The graph structure supports efficient queries for: + +1. **Social Graph Queries**: + - Who does Alice follow? (p-tags authored by Alice) + - Who follows Bob? (p-tags referencing Bob) + - Common connections between Alice and Bob + +2. **Event Discovery**: + - All replies to Alice's events (kind-1 events with p-tag to Alice) + - All events Alice has replied to (kind-1 events by Alice with p-tags) + - Quote reposts, mentions, reactions by event kind + +3. **Analytics**: + - Most-mentioned pubkeys (count p-tag-in edges) + - Most active authors (count author edges) + - Interaction patterns by kind + +## Migration Notes + +This is a **new index** that: +- Runs alongside existing event indexes +- Populated automatically for all new events +- Does NOT require reindexing existing events (yet) +- Can be backfilled via a migration if needed + +To backfill existing events, run a migration that: +1. Iterates all events +2. Extracts pubkeys and creates serials +3. Creates graph edges for each event diff --git a/pkg/database/database.go b/pkg/database/database.go index 289eab3..5175da6 100644 --- a/pkg/database/database.go +++ b/pkg/database/database.go @@ -26,7 +26,8 @@ type D struct { Logger *logger *badger.DB seq *badger.Sequence - ready chan struct{} // Closed when database is ready to serve requests + pubkeySeq *badger.Sequence // Sequence for pubkey serials + ready chan struct{} // Closed when database is ready to serve requests queryCache *querycache.EventCache } @@ -136,6 +137,9 @@ func New( if d.seq, err = d.DB.GetSequence([]byte("EVENTS"), 1000); chk.E(err) { return } + if d.pubkeySeq, err = d.DB.GetSequence([]byte("PUBKEYS"), 1000); chk.E(err) { + return + } // run code that updates indexes when new indexes have been added and bumps // the version so they aren't run again. d.RunMigrations() diff --git a/pkg/database/indexes/keys.go b/pkg/database/indexes/keys.go index 88a05c8..a865738 100644 --- a/pkg/database/indexes/keys.go +++ b/pkg/database/indexes/keys.go @@ -72,9 +72,15 @@ const ( TagPubkeyPrefix = I("tpc") // tag, pubkey, created at TagKindPubkeyPrefix = I("tkp") // tag, kind, pubkey, created at - WordPrefix = I("wrd") // word hash, serial + WordPrefix = I("wrd") // word hash, serial ExpirationPrefix = I("exp") // timestamp of expiration VersionPrefix = I("ver") // database version number, for triggering reindexes when new keys are added (policy is add-only). + + // Pubkey graph indexes + PubkeySerialPrefix = I("pks") // pubkey hash -> pubkey serial + SerialPubkeyPrefix = I("spk") // pubkey serial -> pubkey hash (full 32 bytes) + EventPubkeyGraphPrefix = I("epg") // event serial -> pubkey serial (graph edges) + PubkeyEventGraphPrefix = I("peg") // pubkey serial -> event serial (reverse edges) ) // Prefix returns the three byte human-readable prefixes that go in front of @@ -118,6 +124,15 @@ func Prefix(prf int) (i I) { return VersionPrefix case Word: return WordPrefix + + case PubkeySerial: + return PubkeySerialPrefix + case SerialPubkey: + return SerialPubkeyPrefix + case EventPubkeyGraph: + return EventPubkeyGraphPrefix + case PubkeyEventGraph: + return PubkeyEventGraphPrefix } return } @@ -167,6 +182,15 @@ func Identify(r io.Reader) (i int, err error) { i = Expiration case WordPrefix: i = Word + + case PubkeySerialPrefix: + i = PubkeySerial + case SerialPubkeyPrefix: + i = SerialPubkey + case EventPubkeyGraphPrefix: + i = EventPubkeyGraph + case PubkeyEventGraphPrefix: + i = PubkeyEventGraph } return } @@ -519,3 +543,68 @@ func VersionDec( ) (enc *T) { return New(NewPrefix(), ver) } + +// PubkeySerial maps a pubkey hash to its unique serial number +// +// 3 prefix|8 pubkey hash|5 serial +var PubkeySerial = next() + +func PubkeySerialVars() (p *types.PubHash, ser *types.Uint40) { + return new(types.PubHash), new(types.Uint40) +} +func PubkeySerialEnc(p *types.PubHash, ser *types.Uint40) (enc *T) { + return New(NewPrefix(PubkeySerial), p, ser) +} +func PubkeySerialDec(p *types.PubHash, ser *types.Uint40) (enc *T) { + return New(NewPrefix(), p, ser) +} + +// SerialPubkey maps a pubkey serial to the full 32-byte pubkey +// This stores the full pubkey (32 bytes) as the value, not inline +// +// 3 prefix|5 serial -> 32 byte pubkey value +var SerialPubkey = next() + +func SerialPubkeyVars() (ser *types.Uint40) { + return new(types.Uint40) +} +func SerialPubkeyEnc(ser *types.Uint40) (enc *T) { + return New(NewPrefix(SerialPubkey), ser) +} +func SerialPubkeyDec(ser *types.Uint40) (enc *T) { + return New(NewPrefix(), ser) +} + +// EventPubkeyGraph creates a bidirectional graph edge between events and pubkeys +// This stores event_serial -> pubkey_serial relationships with event kind and direction +// Direction: 0=author, 1=p-tag-out (event references pubkey) +// +// 3 prefix|5 event serial|5 pubkey serial|2 kind|1 direction +var EventPubkeyGraph = next() + +func EventPubkeyGraphVars() (eventSer *types.Uint40, pubkeySer *types.Uint40, kind *types.Uint16, direction *types.Letter) { + return new(types.Uint40), new(types.Uint40), new(types.Uint16), new(types.Letter) +} +func EventPubkeyGraphEnc(eventSer *types.Uint40, pubkeySer *types.Uint40, kind *types.Uint16, direction *types.Letter) (enc *T) { + return New(NewPrefix(EventPubkeyGraph), eventSer, pubkeySer, kind, direction) +} +func EventPubkeyGraphDec(eventSer *types.Uint40, pubkeySer *types.Uint40, kind *types.Uint16, direction *types.Letter) (enc *T) { + return New(NewPrefix(), eventSer, pubkeySer, kind, direction) +} + +// PubkeyEventGraph creates the reverse edge: pubkey_serial -> event_serial with event kind and direction +// This enables querying all events related to a pubkey, optionally filtered by kind and direction +// Direction: 0=is-author, 2=p-tag-in (pubkey is referenced by event) +// +// 3 prefix|5 pubkey serial|2 kind|1 direction|5 event serial +var PubkeyEventGraph = next() + +func PubkeyEventGraphVars() (pubkeySer *types.Uint40, kind *types.Uint16, direction *types.Letter, eventSer *types.Uint40) { + return new(types.Uint40), new(types.Uint16), new(types.Letter), new(types.Uint40) +} +func PubkeyEventGraphEnc(pubkeySer *types.Uint40, kind *types.Uint16, direction *types.Letter, eventSer *types.Uint40) (enc *T) { + return New(NewPrefix(PubkeyEventGraph), pubkeySer, kind, direction, eventSer) +} +func PubkeyEventGraphDec(pubkeySer *types.Uint40, kind *types.Uint16, direction *types.Letter, eventSer *types.Uint40) (enc *T) { + return New(NewPrefix(), pubkeySer, kind, direction, eventSer) +} diff --git a/pkg/database/indexes/types/letter.go b/pkg/database/indexes/types/letter.go index 3733917..c666ef5 100644 --- a/pkg/database/indexes/types/letter.go +++ b/pkg/database/indexes/types/letter.go @@ -8,6 +8,13 @@ import ( const LetterLen = 1 +// Edge direction constants for pubkey graph relationships +const ( + EdgeDirectionAuthor byte = 0 // The pubkey is the event author + EdgeDirectionPTagOut byte = 1 // Outbound: Event author references this pubkey in p-tag + EdgeDirectionPTagIn byte = 2 // Inbound: This pubkey is referenced in event's p-tag +) + type Letter struct { val byte } diff --git a/pkg/database/pubkey-graph_test.go b/pkg/database/pubkey-graph_test.go new file mode 100644 index 0000000..6223681 --- /dev/null +++ b/pkg/database/pubkey-graph_test.go @@ -0,0 +1,365 @@ +package database + +import ( + "context" + "testing" + + "github.com/dgraph-io/badger/v4" + "next.orly.dev/pkg/database/indexes" + "next.orly.dev/pkg/database/indexes/types" + "next.orly.dev/pkg/encoders/event" + "next.orly.dev/pkg/encoders/hex" + "next.orly.dev/pkg/encoders/tag" +) + +func TestPubkeySerialAssignment(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + db, err := New(ctx, cancel, t.TempDir(), "info") + if err != nil { + t.Fatalf("Failed to create database: %v", err) + } + defer db.Close() + + // Create a test pubkey + pubkey1 := make([]byte, 32) + for i := range pubkey1 { + pubkey1[i] = byte(i) + } + + // Get or create serial for the first time + t.Logf("First call: GetOrCreatePubkeySerial for pubkey %s", hex.Enc(pubkey1)) + ser1, err := db.GetOrCreatePubkeySerial(pubkey1) + if err != nil { + t.Fatalf("Failed to get or create pubkey serial: %v", err) + } + + if ser1 == nil { + t.Fatal("Serial should not be nil") + } + t.Logf("First call returned serial: %d", ser1.Get()) + + // Debug: List all keys in database + var keyCount int + db.View(func(txn *badger.Txn) error { + it := txn.NewIterator(badger.DefaultIteratorOptions) + defer it.Close() + for it.Rewind(); it.Valid(); it.Next() { + key := it.Item().KeyCopy(nil) + t.Logf("Found key: %s (len=%d)", hex.Enc(key), len(key)) + keyCount++ + if keyCount > 20 { + break // Limit output + } + } + return nil + }) + t.Logf("Total keys found (first 20): %d", keyCount) + + // Debug: what prefix should we be looking for? + pubHash := new(types.PubHash) + pubHash.FromPubkey(pubkey1) + expectedPrefix := []byte(indexes.PubkeySerialPrefix) + t.Logf("Expected PubkeySerial prefix: %s = %s", string(expectedPrefix), hex.Enc(expectedPrefix)) + + // Try direct lookup + t.Logf("Direct lookup: GetPubkeySerial for same pubkey") + serDirect, err := db.GetPubkeySerial(pubkey1) + if err != nil { + t.Logf("Direct lookup failed: %v", err) + } else { + t.Logf("Direct lookup returned serial: %d", serDirect.Get()) + } + + // Get the same pubkey again - should return the same serial + t.Logf("Second call: GetOrCreatePubkeySerial for same pubkey") + ser2, err := db.GetOrCreatePubkeySerial(pubkey1) + if err != nil { + t.Fatalf("Failed to get existing pubkey serial: %v", err) + } + t.Logf("Second call returned serial: %d", ser2.Get()) + + if ser1.Get() != ser2.Get() { + t.Errorf("Expected same serial, got %d and %d", ser1.Get(), ser2.Get()) + } + + // Create a different pubkey + pubkey2 := make([]byte, 32) + for i := range pubkey2 { + pubkey2[i] = byte(i + 100) + } + + ser3, err := db.GetOrCreatePubkeySerial(pubkey2) + if err != nil { + t.Fatalf("Failed to get or create second pubkey serial: %v", err) + } + + if ser3.Get() == ser1.Get() { + t.Error("Different pubkeys should have different serials") + } + + // Test reverse lookup: serial -> pubkey + retrievedPubkey1, err := db.GetPubkeyBySerial(ser1) + if err != nil { + t.Fatalf("Failed to get pubkey by serial: %v", err) + } + + if hex.Enc(retrievedPubkey1) != hex.Enc(pubkey1) { + t.Errorf("Retrieved pubkey doesn't match. Expected %s, got %s", + hex.Enc(pubkey1), hex.Enc(retrievedPubkey1)) + } +} + +func TestEventPubkeyGraph(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + db, err := New(ctx, cancel, t.TempDir(), "info") + if err != nil { + t.Fatalf("Failed to create database: %v", err) + } + defer db.Close() + + // Create test event with author and p-tags + authorPubkey, _ := hex.Dec("0000000000000000000000000000000000000000000000000000000000000001") + pTagPubkey1, _ := hex.Dec("0000000000000000000000000000000000000000000000000000000000000002") + pTagPubkey2, _ := hex.Dec("0000000000000000000000000000000000000000000000000000000000000003") + + eventID := make([]byte, 32) + eventID[0] = 1 + eventSig := make([]byte, 64) + eventSig[0] = 1 + + ev := &event.E{ + ID: eventID, + Pubkey: authorPubkey, + CreatedAt: 1234567890, + Kind: 1, // text note + Content: []byte("Test event with p-tags"), + Sig: eventSig, + Tags: tag.NewS( + tag.NewFromAny("p", hex.Enc(pTagPubkey1)), + tag.NewFromAny("p", hex.Enc(pTagPubkey2)), + tag.NewFromAny("e", "someeventid"), + ), + } + + // Save the event - this should create pubkey serials and graph edges + _, err = db.SaveEvent(ctx, ev) + if err != nil { + t.Fatalf("Failed to save event: %v", err) + } + + // Verify that pubkey serials were created + authorSerial, err := db.GetPubkeySerial(authorPubkey) + if err != nil { + t.Fatalf("Failed to get author pubkey serial: %v", err) + } + if authorSerial == nil { + t.Fatal("Author serial should not be nil") + } + + pTag1Serial, err := db.GetPubkeySerial(pTagPubkey1) + if err != nil { + t.Fatalf("Failed to get p-tag1 pubkey serial: %v", err) + } + if pTag1Serial == nil { + t.Fatal("P-tag1 serial should not be nil") + } + + pTag2Serial, err := db.GetPubkeySerial(pTagPubkey2) + if err != nil { + t.Fatalf("Failed to get p-tag2 pubkey serial: %v", err) + } + if pTag2Serial == nil { + t.Fatal("P-tag2 serial should not be nil") + } + + // Verify all three pubkeys have different serials + if authorSerial.Get() == pTag1Serial.Get() || authorSerial.Get() == pTag2Serial.Get() || pTag1Serial.Get() == pTag2Serial.Get() { + t.Error("All pubkey serials should be unique") + } + + t.Logf("Event saved successfully with graph edges:") + t.Logf(" Author serial: %d", authorSerial.Get()) + t.Logf(" P-tag1 serial: %d", pTag1Serial.Get()) + t.Logf(" P-tag2 serial: %d", pTag2Serial.Get()) +} + +func TestMultipleEventsWithSamePubkeys(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + db, err := New(ctx, cancel, t.TempDir(), "info") + if err != nil { + t.Fatalf("Failed to create database: %v", err) + } + defer db.Close() + + // Create two events from the same author mentioning the same person + authorPubkey, _ := hex.Dec("0000000000000000000000000000000000000000000000000000000000000001") + pTagPubkey, _ := hex.Dec("0000000000000000000000000000000000000000000000000000000000000002") + + eventID1 := make([]byte, 32) + eventID1[0] = 1 + eventSig1 := make([]byte, 64) + eventSig1[0] = 1 + + ev1 := &event.E{ + ID: eventID1, + Pubkey: authorPubkey, + CreatedAt: 1234567890, + Kind: 1, + Content: []byte("First event"), + Sig: eventSig1, + Tags: tag.NewS( + tag.NewFromAny("p", hex.Enc(pTagPubkey)), + ), + } + + eventID2 := make([]byte, 32) + eventID2[0] = 2 + eventSig2 := make([]byte, 64) + eventSig2[0] = 2 + + ev2 := &event.E{ + ID: eventID2, + Pubkey: authorPubkey, + CreatedAt: 1234567891, + Kind: 1, + Content: []byte("Second event"), + Sig: eventSig2, + Tags: tag.NewS( + tag.NewFromAny("p", hex.Enc(pTagPubkey)), + ), + } + + // Save both events + _, err = db.SaveEvent(ctx, ev1) + if err != nil { + t.Fatalf("Failed to save event 1: %v", err) + } + + _, err = db.SaveEvent(ctx, ev2) + if err != nil { + t.Fatalf("Failed to save event 2: %v", err) + } + + // Verify the same pubkeys got the same serials + authorSerial1, _ := db.GetPubkeySerial(authorPubkey) + pTagSerial1, _ := db.GetPubkeySerial(pTagPubkey) + + if authorSerial1 == nil || pTagSerial1 == nil { + t.Fatal("Pubkey serials should exist after saving events") + } + + t.Logf("Both events share the same pubkey serials:") + t.Logf(" Author serial: %d", authorSerial1.Get()) + t.Logf(" P-tag serial: %d", pTagSerial1.Get()) +} + +func TestPubkeySerialEdgeCases(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + db, err := New(ctx, cancel, t.TempDir(), "info") + if err != nil { + t.Fatalf("Failed to create database: %v", err) + } + defer db.Close() + + // Test with invalid pubkey length + invalidPubkey := make([]byte, 16) // Wrong length + _, err = db.GetOrCreatePubkeySerial(invalidPubkey) + if err == nil { + t.Error("Should reject pubkey with invalid length") + } + + // Test GetPubkeySerial for non-existent pubkey + nonExistentPubkey := make([]byte, 32) + for i := range nonExistentPubkey { + nonExistentPubkey[i] = 0xFF + } + + _, err = db.GetPubkeySerial(nonExistentPubkey) + if err == nil { + t.Error("Should return error for non-existent pubkey serial") + } +} + +func TestGraphEdgeDirections(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + db, err := New(ctx, cancel, t.TempDir(), "info") + if err != nil { + t.Fatalf("Failed to create database: %v", err) + } + defer db.Close() + + // Create test event with author and p-tags + authorPubkey, _ := hex.Dec("0000000000000000000000000000000000000000000000000000000000000001") + pTagPubkey, _ := hex.Dec("0000000000000000000000000000000000000000000000000000000000000002") + + eventID := make([]byte, 32) + eventID[0] = 1 + eventSig := make([]byte, 64) + eventSig[0] = 1 + + ev := &event.E{ + ID: eventID, + Pubkey: authorPubkey, + CreatedAt: 1234567890, + Kind: 1, // text note + Content: []byte("Test event"), + Sig: eventSig, + Tags: tag.NewS( + tag.NewFromAny("p", hex.Enc(pTagPubkey)), + ), + } + + // Save the event + _, err = db.SaveEvent(ctx, ev) + if err != nil { + t.Fatalf("Failed to save event: %v", err) + } + + // Verify graph edges with correct direction bytes + // Look for PubkeyEventGraph keys and check direction byte + var foundAuthorEdge, foundPTagEdge bool + db.View(func(txn *badger.Txn) error { + it := txn.NewIterator(badger.DefaultIteratorOptions) + defer it.Close() + + prefix := []byte(indexes.PubkeyEventGraphPrefix) + for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() { + key := it.Item().KeyCopy(nil) + // Key format: peg(3)|pubkey_serial(5)|kind(2)|direction(1)|event_serial(5) = 16 bytes + if len(key) == 16 { + direction := key[10] // Byte at position 10 is the direction + t.Logf("Found PubkeyEventGraph edge: key=%s, direction=%d", hex.Enc(key), direction) + + if direction == types.EdgeDirectionAuthor { + foundAuthorEdge = true + t.Logf(" ✓ Found author edge (direction=0)") + } else if direction == types.EdgeDirectionPTagIn { + foundPTagEdge = true + t.Logf(" ✓ Found p-tag inbound edge (direction=2)") + } + } + } + return nil + }) + + if !foundAuthorEdge { + t.Error("Did not find author edge with direction=0") + } + if !foundPTagEdge { + t.Error("Did not find p-tag inbound edge with direction=2") + } + + t.Logf("Graph edges correctly stored with direction bytes:") + t.Logf(" Author edge: %v (direction=0)", foundAuthorEdge) + t.Logf(" P-tag inbound edge: %v (direction=2)", foundPTagEdge) +} diff --git a/pkg/database/pubkey-serial.go b/pkg/database/pubkey-serial.go new file mode 100644 index 0000000..cd14ff8 --- /dev/null +++ b/pkg/database/pubkey-serial.go @@ -0,0 +1,197 @@ +package database + +import ( + "bytes" + "errors" + + "github.com/dgraph-io/badger/v4" + "lol.mleku.dev/chk" + "next.orly.dev/pkg/database/indexes" + "next.orly.dev/pkg/database/indexes/types" + "next.orly.dev/pkg/encoders/hex" +) + +// GetOrCreatePubkeySerial returns the serial for a pubkey, creating one if it doesn't exist. +// The pubkey parameter should be 32 bytes (schnorr public key). +// This function is thread-safe and uses transactions to ensure atomicity. +func (d *D) GetOrCreatePubkeySerial(pubkey []byte) (ser *types.Uint40, err error) { + if len(pubkey) != 32 { + err = errors.New("pubkey must be 32 bytes") + return + } + + // Create pubkey hash + pubHash := new(types.PubHash) + if err = pubHash.FromPubkey(pubkey); chk.E(err) { + return + } + + // First, try to get existing serial (separate transaction for read) + var existingSer *types.Uint40 + existingSer, err = d.GetPubkeySerial(pubkey) + if err == nil && existingSer != nil { + // Serial already exists + ser = existingSer + return ser, nil + } + + // Serial doesn't exist, create a new one + var serial uint64 + if serial, err = d.pubkeySeq.Next(); chk.E(err) { + return + } + + ser = new(types.Uint40) + if err = ser.Set(serial); chk.E(err) { + return + } + + // Store both mappings in a transaction + err = d.Update(func(txn *badger.Txn) error { + // Double-check that the serial wasn't created by another goroutine + // while we were getting the sequence number + prefixBuf := new(bytes.Buffer) + prefixBuf.Write([]byte(indexes.PubkeySerialPrefix)) + if terr := pubHash.MarshalWrite(prefixBuf); chk.E(terr) { + return terr + } + searchPrefix := prefixBuf.Bytes() + + opts := badger.DefaultIteratorOptions + opts.PrefetchValues = false + opts.Prefix = searchPrefix + it := txn.NewIterator(opts) + it.Seek(searchPrefix) + if it.Valid() { + // Another goroutine created it, extract and return that serial + key := it.Item().KeyCopy(nil) + it.Close() + if len(key) == 16 { + serialBytes := key[11:16] + serialBuf := bytes.NewReader(serialBytes) + existSer := new(types.Uint40) + if terr := existSer.UnmarshalRead(serialBuf); terr == nil { + ser = existSer + return nil // Don't write, just return the existing serial + } + } + } + it.Close() + + // Store pubkey hash -> serial mapping + keyBuf := new(bytes.Buffer) + if terr := indexes.PubkeySerialEnc(pubHash, ser).MarshalWrite(keyBuf); chk.E(terr) { + return terr + } + fullKey := make([]byte, len(keyBuf.Bytes())) + copy(fullKey, keyBuf.Bytes()) + // DEBUG: log the key being written + if len(fullKey) > 0 { + // log.T.F("Writing PubkeySerial: key=%s (len=%d), prefix=%s", hex.Enc(fullKey), len(fullKey), string(fullKey[:3])) + } + if terr := txn.Set(fullKey, nil); chk.E(terr) { + return terr + } + + // Store serial -> full pubkey mapping (pubkey stored as value) + keyBuf.Reset() + if terr := indexes.SerialPubkeyEnc(ser).MarshalWrite(keyBuf); chk.E(terr) { + return terr + } + if terr := txn.Set(keyBuf.Bytes(), pubkey); chk.E(terr) { + return terr + } + + return nil + }) + + return +} + +// GetPubkeySerial returns the serial for a pubkey if it exists. +// Returns an error if the pubkey doesn't have a serial yet. +func (d *D) GetPubkeySerial(pubkey []byte) (ser *types.Uint40, err error) { + if len(pubkey) != 32 { + err = errors.New("pubkey must be 32 bytes") + return + } + + // Create pubkey hash + pubHash := new(types.PubHash) + if err = pubHash.FromPubkey(pubkey); chk.E(err) { + return + } + + // Build search key with just prefix + pubkey hash (no serial) + prefixBuf := new(bytes.Buffer) + prefixBuf.Write([]byte(indexes.PubkeySerialPrefix)) // 3 bytes + if err = pubHash.MarshalWrite(prefixBuf); chk.E(err) { + return + } + searchPrefix := prefixBuf.Bytes() // Should be 11 bytes: 3 (prefix) + 8 (pubkey hash) + + ser = new(types.Uint40) + err = d.View(func(txn *badger.Txn) error { + opts := badger.DefaultIteratorOptions + opts.PrefetchValues = false // We only need the key + it := txn.NewIterator(opts) + defer it.Close() + + // Seek to the prefix and check if we found a matching key + it.Seek(searchPrefix) + if !it.ValidForPrefix(searchPrefix) { + return errors.New("pubkey serial not found") + } + + // Extract serial from key (last 5 bytes) + // Key format: prefix(3) + pubkey_hash(8) + serial(5) = 16 bytes + key := it.Item().KeyCopy(nil) + if len(key) != 16 { + return errors.New("invalid key length for pubkey serial") + } + + // Verify the prefix matches + if !bytes.HasPrefix(key, searchPrefix) { + return errors.New("key prefix mismatch") + } + + serialBytes := key[11:16] // Extract last 5 bytes (the serial) + + // Decode serial + serialBuf := bytes.NewReader(serialBytes) + if err := ser.UnmarshalRead(serialBuf); chk.E(err) { + return err + } + + return nil + }) + + return +} + +// GetPubkeyBySerial returns the full 32-byte pubkey for a given serial. +func (d *D) GetPubkeyBySerial(ser *types.Uint40) (pubkey []byte, err error) { + keyBuf := new(bytes.Buffer) + if err = indexes.SerialPubkeyEnc(ser).MarshalWrite(keyBuf); chk.E(err) { + return + } + + err = d.View(func(txn *badger.Txn) error { + item, gerr := txn.Get(keyBuf.Bytes()) + if chk.E(gerr) { + return gerr + } + + return item.Value(func(val []byte) error { + pubkey = make([]byte, len(val)) + copy(pubkey, val) + return nil + }) + }) + + if err != nil { + err = errors.New("pubkey not found for serial: " + hex.Enc([]byte{byte(ser.Get())})) + } + + return +} diff --git a/pkg/database/save-event.go b/pkg/database/save-event.go index 030e93b..aed7cfd 100644 --- a/pkg/database/save-event.go +++ b/pkg/database/save-event.go @@ -180,6 +180,47 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) ( if idxs, err = GetIndexesForEvent(ev, serial); chk.E(err) { return } + + // Collect all pubkeys for graph: author + p-tags + // Store with direction indicator: author (0) vs p-tag (1) + type pubkeyWithDirection struct { + serial *types.Uint40 + isAuthor bool + } + pubkeysForGraph := make(map[string]pubkeyWithDirection) + + // Add author pubkey + var authorSerial *types.Uint40 + if authorSerial, err = d.GetOrCreatePubkeySerial(ev.Pubkey); chk.E(err) { + return + } + pubkeysForGraph[hex.Enc(ev.Pubkey)] = pubkeyWithDirection{ + serial: authorSerial, + isAuthor: true, + } + + // Extract p-tag pubkeys using GetAll + pTags := ev.Tags.GetAll([]byte("p")) + for _, pTag := range pTags { + if len(pTag.T) >= 2 { + // Decode hex pubkey from p-tag + var ptagPubkey []byte + if ptagPubkey, err = hex.Dec(string(pTag.T[tag.Value])); err == nil && len(ptagPubkey) == 32 { + pkHex := hex.Enc(ptagPubkey) + // Skip if already added as author + if _, exists := pubkeysForGraph[pkHex]; !exists { + var ptagSerial *types.Uint40 + if ptagSerial, err = d.GetOrCreatePubkeySerial(ptagPubkey); chk.E(err) { + return + } + pubkeysForGraph[pkHex] = pubkeyWithDirection{ + serial: ptagSerial, + isAuthor: false, + } + } + } + } + } // log.T.F( // "SaveEvent: generated %d indexes for event %x (kind %d)", len(idxs), // ev.ID, ev.Kind, @@ -320,6 +361,48 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) ( } log.T.F("SaveEvent: also stored replaceable event with specialized key") } + + // Create graph edges between event and all related pubkeys + // This creates bidirectional edges: event->pubkey and pubkey->event + // Include the event kind and direction for efficient graph queries + eventKind := new(types.Uint16) + eventKind.Set(ev.Kind) + + for _, pkInfo := range pubkeysForGraph { + // Determine direction for forward edge (event -> pubkey perspective) + directionForward := new(types.Letter) + // Determine direction for reverse edge (pubkey -> event perspective) + directionReverse := new(types.Letter) + + if pkInfo.isAuthor { + // Event author relationship + directionForward.Set(types.EdgeDirectionAuthor) // 0: author + directionReverse.Set(types.EdgeDirectionAuthor) // 0: is author of event + } else { + // P-tag relationship + directionForward.Set(types.EdgeDirectionPTagOut) // 1: event references pubkey (outbound) + directionReverse.Set(types.EdgeDirectionPTagIn) // 2: pubkey is referenced (inbound) + } + + // Create event -> pubkey edge (with kind and direction) + keyBuf := new(bytes.Buffer) + if err = indexes.EventPubkeyGraphEnc(ser, pkInfo.serial, eventKind, directionForward).MarshalWrite(keyBuf); chk.E(err) { + return + } + if err = txn.Set(keyBuf.Bytes(), nil); chk.E(err) { + return + } + + // Create pubkey -> event edge (reverse, with kind and direction for filtering) + keyBuf.Reset() + if err = indexes.PubkeyEventGraphEnc(pkInfo.serial, eventKind, directionReverse, ser).MarshalWrite(keyBuf); chk.E(err) { + return + } + if err = txn.Set(keyBuf.Bytes(), nil); chk.E(err) { + return + } + } + return }, )