From ba84e12ea9d9c06ba0cb2057aec80c52191c2241 Mon Sep 17 00:00:00 2001 From: mleku Date: Fri, 12 Dec 2025 07:07:31 +0100 Subject: [PATCH] Add _graph extension support to Neo4j driver MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Implement TraverseFollows using Cypher path queries on FOLLOWS relationships - Implement TraverseFollowers using reverse path traversal - Implement FindMentions using MENTIONS relationships from p-tags - Implement TraverseThread using REFERENCES relationships from e-tags with bidirectional traversal (inbound replies, outbound parents) - Add GraphAdapter to bridge Neo4j to graph.GraphDatabase interface - Add GraphResult type implementing graph.GraphResultI for Neo4j - Initialize graph executor for Neo4j backend in app/main.go The implementation uses existing Neo4j schema and relationships created by SaveEvent() - no schema changes required. The _graph extension now works transparently with either Badger or Neo4j backends. Bump version to v0.35.0 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- app/main.go | 22 ++- pkg/neo4j/graph-adapter.go | 40 ++++++ pkg/neo4j/graph-follows.go | 201 ++++++++++++++++++++++++++ pkg/neo4j/graph-mentions.go | 143 +++++++++++++++++++ pkg/neo4j/graph-result.go | 197 +++++++++++++++++++++++++ pkg/neo4j/graph-thread.go | 277 ++++++++++++++++++++++++++++++++++++ pkg/version/version | 2 +- 7 files changed, 879 insertions(+), 3 deletions(-) create mode 100644 pkg/neo4j/graph-adapter.go create mode 100644 pkg/neo4j/graph-follows.go create mode 100644 pkg/neo4j/graph-mentions.go create mode 100644 pkg/neo4j/graph-result.go create mode 100644 pkg/neo4j/graph-thread.go diff --git a/app/main.go b/app/main.go index 080b6d7..d87e156 100644 --- a/app/main.go +++ b/app/main.go @@ -17,6 +17,7 @@ import ( "git.mleku.dev/mleku/nostr/crypto/keys" "next.orly.dev/pkg/database" "git.mleku.dev/mleku/nostr/encoders/bech32encoding" + "next.orly.dev/pkg/neo4j" "next.orly.dev/pkg/policy" "next.orly.dev/pkg/protocol/graph" "next.orly.dev/pkg/protocol/nip43" @@ -123,7 +124,7 @@ func Run( } } - // Initialize graph query executor (only for Badger backend) + // Initialize graph query executor (Badger backend) if badgerDB, ok := db.(*database.D); ok { // Get relay identity key for signing graph query responses relaySecretKey, err := badgerDB.GetOrCreateRelayIdentitySecret() @@ -135,7 +136,24 @@ func Run( if l.graphExecutor, err = graph.NewExecutor(graphAdapter, relaySecretKey); err != nil { log.E.F("failed to create graph executor: %v", err) } else { - log.I.F("graph query executor initialized") + log.I.F("graph query executor initialized (Badger backend)") + } + } + } + + // Initialize graph query executor (Neo4j backend) + if neo4jDB, ok := db.(*neo4j.N); ok { + // Get relay identity key for signing graph query responses + relaySecretKey, err := neo4jDB.GetOrCreateRelayIdentitySecret() + if err != nil { + log.E.F("failed to get relay identity key for graph executor: %v", err) + } else { + // Create the graph adapter and executor + graphAdapter := neo4j.NewGraphAdapter(neo4jDB) + if l.graphExecutor, err = graph.NewExecutor(graphAdapter, relaySecretKey); err != nil { + log.E.F("failed to create graph executor: %v", err) + } else { + log.I.F("graph query executor initialized (Neo4j backend)") } } } diff --git a/pkg/neo4j/graph-adapter.go b/pkg/neo4j/graph-adapter.go new file mode 100644 index 0000000..7902bdc --- /dev/null +++ b/pkg/neo4j/graph-adapter.go @@ -0,0 +1,40 @@ +package neo4j + +import ( + "next.orly.dev/pkg/protocol/graph" +) + +// GraphAdapter wraps a Neo4j database instance and implements graph.GraphDatabase interface. +// This allows the graph executor to call database traversal methods without +// the database package importing the graph package. +type GraphAdapter struct { + db *N +} + +// NewGraphAdapter creates a new GraphAdapter wrapping the given Neo4j database. +func NewGraphAdapter(db *N) *GraphAdapter { + return &GraphAdapter{db: db} +} + +// TraverseFollows implements graph.GraphDatabase. +func (a *GraphAdapter) TraverseFollows(seedPubkey []byte, maxDepth int) (graph.GraphResultI, error) { + return a.db.TraverseFollows(seedPubkey, maxDepth) +} + +// TraverseFollowers implements graph.GraphDatabase. +func (a *GraphAdapter) TraverseFollowers(seedPubkey []byte, maxDepth int) (graph.GraphResultI, error) { + return a.db.TraverseFollowers(seedPubkey, maxDepth) +} + +// FindMentions implements graph.GraphDatabase. +func (a *GraphAdapter) FindMentions(pubkey []byte, kinds []uint16) (graph.GraphResultI, error) { + return a.db.FindMentions(pubkey, kinds) +} + +// TraverseThread implements graph.GraphDatabase. +func (a *GraphAdapter) TraverseThread(seedEventID []byte, maxDepth int, direction string) (graph.GraphResultI, error) { + return a.db.TraverseThread(seedEventID, maxDepth, direction) +} + +// Verify GraphAdapter implements graph.GraphDatabase +var _ graph.GraphDatabase = (*GraphAdapter)(nil) diff --git a/pkg/neo4j/graph-follows.go b/pkg/neo4j/graph-follows.go new file mode 100644 index 0000000..7109ab4 --- /dev/null +++ b/pkg/neo4j/graph-follows.go @@ -0,0 +1,201 @@ +package neo4j + +import ( + "context" + "fmt" + "strings" + + "git.mleku.dev/mleku/nostr/encoders/hex" + "next.orly.dev/pkg/protocol/graph" +) + +// TraverseFollows performs BFS traversal of the follow graph starting from a seed pubkey. +// Returns pubkeys grouped by first-discovered depth (no duplicates across depths). +// +// Uses Neo4j's native path queries with FOLLOWS relationships created by +// the social event processor from kind 3 contact list events. +// +// The traversal works by using variable-length path patterns: +// - Depth 1: Direct follows (seed)-[:FOLLOWS]->(followed) +// - Depth 2: Follows of follows (seed)-[:FOLLOWS*2]->(followed) +// - etc. +// +// Each pubkey appears only at the depth where it was first discovered. +func (n *N) TraverseFollows(seedPubkey []byte, maxDepth int) (graph.GraphResultI, error) { + result := NewGraphResult() + + if len(seedPubkey) != 32 { + return result, fmt.Errorf("invalid pubkey length: expected 32, got %d", len(seedPubkey)) + } + + seedHex := strings.ToLower(hex.Enc(seedPubkey)) + ctx := context.Background() + + // Track visited pubkeys to ensure each appears only at first-discovered depth + visited := make(map[string]bool) + visited[seedHex] = true // Seed is at depth 0, not included in results + + // Process each depth level separately to maintain BFS semantics + for depth := 1; depth <= maxDepth; depth++ { + // Query for pubkeys at exactly this depth that haven't been seen yet + // We use a variable-length path of exactly 'depth' hops + cypher := fmt.Sprintf(` + MATCH path = (seed:NostrUser {pubkey: $seed})-[:FOLLOWS*%d]->(target:NostrUser) + WHERE target.pubkey <> $seed + AND NOT target.pubkey IN $visited + RETURN DISTINCT target.pubkey AS pubkey + `, depth) + + // Convert visited map to slice for query + visitedList := make([]string, 0, len(visited)) + for pk := range visited { + visitedList = append(visitedList, pk) + } + + params := map[string]any{ + "seed": seedHex, + "visited": visitedList, + } + + queryResult, err := n.ExecuteRead(ctx, cypher, params) + if err != nil { + n.Logger.Warningf("TraverseFollows: error at depth %d: %v", depth, err) + continue + } + + newPubkeysAtDepth := 0 + for queryResult.Next(ctx) { + record := queryResult.Record() + pubkey, ok := record.Values[0].(string) + if !ok || pubkey == "" { + continue + } + + // Normalize to lowercase for consistency + pubkey = strings.ToLower(pubkey) + + // Add to result if not already seen + if !visited[pubkey] { + visited[pubkey] = true + result.AddPubkeyAtDepth(pubkey, depth) + newPubkeysAtDepth++ + } + } + + n.Logger.Debugf("TraverseFollows: depth %d found %d new pubkeys", depth, newPubkeysAtDepth) + + // Early termination if no new pubkeys found at this depth + if newPubkeysAtDepth == 0 { + break + } + } + + n.Logger.Debugf("TraverseFollows: completed with %d total pubkeys across %d depths", + result.TotalPubkeys, len(result.PubkeysByDepth)) + + return result, nil +} + +// TraverseFollowers performs BFS traversal to find who follows the seed pubkey. +// This is the reverse of TraverseFollows - it finds users whose kind-3 lists +// contain the target pubkey(s). +// +// Uses Neo4j's native path queries, but in reverse direction: +// - Depth 1: Users who directly follow the seed (follower)-[:FOLLOWS]->(seed) +// - Depth 2: Users who follow anyone at depth 1 (followers of followers) +// - etc. +func (n *N) TraverseFollowers(seedPubkey []byte, maxDepth int) (graph.GraphResultI, error) { + result := NewGraphResult() + + if len(seedPubkey) != 32 { + return result, fmt.Errorf("invalid pubkey length: expected 32, got %d", len(seedPubkey)) + } + + seedHex := strings.ToLower(hex.Enc(seedPubkey)) + ctx := context.Background() + + // Track visited pubkeys + visited := make(map[string]bool) + visited[seedHex] = true + + // Process each depth level separately for BFS semantics + for depth := 1; depth <= maxDepth; depth++ { + // Query for pubkeys at exactly this depth that haven't been seen yet + // Direction is reversed: we find users who follow the targets + cypher := fmt.Sprintf(` + MATCH path = (follower:NostrUser)-[:FOLLOWS*%d]->(seed:NostrUser {pubkey: $seed}) + WHERE follower.pubkey <> $seed + AND NOT follower.pubkey IN $visited + RETURN DISTINCT follower.pubkey AS pubkey + `, depth) + + visitedList := make([]string, 0, len(visited)) + for pk := range visited { + visitedList = append(visitedList, pk) + } + + params := map[string]any{ + "seed": seedHex, + "visited": visitedList, + } + + queryResult, err := n.ExecuteRead(ctx, cypher, params) + if err != nil { + n.Logger.Warningf("TraverseFollowers: error at depth %d: %v", depth, err) + continue + } + + newPubkeysAtDepth := 0 + for queryResult.Next(ctx) { + record := queryResult.Record() + pubkey, ok := record.Values[0].(string) + if !ok || pubkey == "" { + continue + } + + pubkey = strings.ToLower(pubkey) + + if !visited[pubkey] { + visited[pubkey] = true + result.AddPubkeyAtDepth(pubkey, depth) + newPubkeysAtDepth++ + } + } + + n.Logger.Debugf("TraverseFollowers: depth %d found %d new pubkeys", depth, newPubkeysAtDepth) + + if newPubkeysAtDepth == 0 { + break + } + } + + n.Logger.Debugf("TraverseFollowers: completed with %d total pubkeys", result.TotalPubkeys) + + return result, nil +} + +// TraverseFollowsFromHex is a convenience wrapper that accepts hex-encoded pubkey. +func (n *N) TraverseFollowsFromHex(seedPubkeyHex string, maxDepth int) (*GraphResult, error) { + seedPubkey, err := hex.Dec(seedPubkeyHex) + if err != nil { + return nil, err + } + result, err := n.TraverseFollows(seedPubkey, maxDepth) + if err != nil { + return nil, err + } + return result.(*GraphResult), nil +} + +// TraverseFollowersFromHex is a convenience wrapper that accepts hex-encoded pubkey. +func (n *N) TraverseFollowersFromHex(seedPubkeyHex string, maxDepth int) (*GraphResult, error) { + seedPubkey, err := hex.Dec(seedPubkeyHex) + if err != nil { + return nil, err + } + result, err := n.TraverseFollowers(seedPubkey, maxDepth) + if err != nil { + return nil, err + } + return result.(*GraphResult), nil +} diff --git a/pkg/neo4j/graph-mentions.go b/pkg/neo4j/graph-mentions.go new file mode 100644 index 0000000..5e53ea4 --- /dev/null +++ b/pkg/neo4j/graph-mentions.go @@ -0,0 +1,143 @@ +package neo4j + +import ( + "context" + "fmt" + "strings" + + "git.mleku.dev/mleku/nostr/encoders/hex" + "next.orly.dev/pkg/protocol/graph" +) + +// FindMentions finds events that mention a pubkey via p-tags. +// This returns events grouped by depth, where depth represents how the events relate: +// - Depth 1: Events that directly mention the seed pubkey +// - Depth 2+: Not typically used for mentions (reserved for future expansion) +// +// The kinds parameter filters which event kinds to include (e.g., [1] for notes only, +// [1,7] for notes and reactions, etc.) +// +// Uses Neo4j MENTIONS relationships created by SaveEvent when processing p-tags. +func (n *N) FindMentions(pubkey []byte, kinds []uint16) (graph.GraphResultI, error) { + result := NewGraphResult() + + if len(pubkey) != 32 { + return result, fmt.Errorf("invalid pubkey length: expected 32, got %d", len(pubkey)) + } + + pubkeyHex := strings.ToLower(hex.Enc(pubkey)) + ctx := context.Background() + + // Build kinds filter if specified + var kindsFilter string + params := map[string]any{ + "pubkey": pubkeyHex, + } + + if len(kinds) > 0 { + // Convert uint16 slice to int64 slice for Neo4j + kindsInt := make([]int64, len(kinds)) + for i, k := range kinds { + kindsInt[i] = int64(k) + } + params["kinds"] = kindsInt + kindsFilter = "AND e.kind IN $kinds" + } + + // Query for events that mention this pubkey + // The MENTIONS relationship is created by SaveEvent when processing p-tags + cypher := fmt.Sprintf(` + MATCH (e:Event)-[:MENTIONS]->(u:NostrUser {pubkey: $pubkey}) + WHERE true %s + RETURN e.id AS event_id + ORDER BY e.created_at DESC + `, kindsFilter) + + queryResult, err := n.ExecuteRead(ctx, cypher, params) + if err != nil { + return result, fmt.Errorf("failed to query mentions: %w", err) + } + + // Add all found events at depth 1 + for queryResult.Next(ctx) { + record := queryResult.Record() + eventID, ok := record.Values[0].(string) + if !ok || eventID == "" { + continue + } + + // Normalize to lowercase for consistency + eventID = strings.ToLower(eventID) + result.AddEventAtDepth(eventID, 1) + } + + n.Logger.Debugf("FindMentions: found %d events mentioning pubkey %s", result.TotalEvents, safePrefix(pubkeyHex, 16)) + + return result, nil +} + +// FindMentionsFromHex is a convenience wrapper that accepts hex-encoded pubkey. +func (n *N) FindMentionsFromHex(pubkeyHex string, kinds []uint16) (*GraphResult, error) { + pubkey, err := hex.Dec(pubkeyHex) + if err != nil { + return nil, err + } + result, err := n.FindMentions(pubkey, kinds) + if err != nil { + return nil, err + } + return result.(*GraphResult), nil +} + +// FindMentionsByPubkeys returns events that mention any of the given pubkeys. +// Useful for finding mentions across a set of followed accounts. +func (n *N) FindMentionsByPubkeys(pubkeys []string, kinds []uint16) (*GraphResult, error) { + result := NewGraphResult() + + if len(pubkeys) == 0 { + return result, nil + } + + ctx := context.Background() + + // Build kinds filter if specified + var kindsFilter string + params := map[string]any{ + "pubkeys": pubkeys, + } + + if len(kinds) > 0 { + kindsInt := make([]int64, len(kinds)) + for i, k := range kinds { + kindsInt[i] = int64(k) + } + params["kinds"] = kindsInt + kindsFilter = "AND e.kind IN $kinds" + } + + // Query for events that mention any of the pubkeys + cypher := fmt.Sprintf(` + MATCH (e:Event)-[:MENTIONS]->(u:NostrUser) + WHERE u.pubkey IN $pubkeys %s + RETURN DISTINCT e.id AS event_id + ORDER BY e.created_at DESC + `, kindsFilter) + + queryResult, err := n.ExecuteRead(ctx, cypher, params) + if err != nil { + return result, fmt.Errorf("failed to query mentions: %w", err) + } + + for queryResult.Next(ctx) { + record := queryResult.Record() + eventID, ok := record.Values[0].(string) + if !ok || eventID == "" { + continue + } + + eventID = strings.ToLower(eventID) + result.AddEventAtDepth(eventID, 1) + } + + return result, nil +} diff --git a/pkg/neo4j/graph-result.go b/pkg/neo4j/graph-result.go new file mode 100644 index 0000000..2e44003 --- /dev/null +++ b/pkg/neo4j/graph-result.go @@ -0,0 +1,197 @@ +package neo4j + +import ( + "sort" +) + +// GraphResult contains depth-organized traversal results for graph queries. +// It tracks pubkeys and events discovered at each depth level, ensuring +// each entity appears only at the depth where it was first discovered. +// +// This is the Neo4j implementation that mirrors the Badger implementation +// in pkg/database/graph-result.go, implementing the graph.GraphResultI interface. +type GraphResult struct { + // PubkeysByDepth maps depth -> pubkeys first discovered at that depth. + // Each pubkey appears ONLY in the array for the depth where it was first seen. + // Depth 1 = direct connections, Depth 2 = connections of connections, etc. + PubkeysByDepth map[int][]string + + // EventsByDepth maps depth -> event IDs discovered at that depth. + // Used for thread traversal queries. + EventsByDepth map[int][]string + + // FirstSeenPubkey tracks which depth each pubkey was first discovered. + // Key is pubkey hex, value is the depth (1-indexed). + FirstSeenPubkey map[string]int + + // FirstSeenEvent tracks which depth each event was first discovered. + // Key is event ID hex, value is the depth (1-indexed). + FirstSeenEvent map[string]int + + // TotalPubkeys is the count of unique pubkeys discovered across all depths. + TotalPubkeys int + + // TotalEvents is the count of unique events discovered across all depths. + TotalEvents int +} + +// NewGraphResult creates a new initialized GraphResult. +func NewGraphResult() *GraphResult { + return &GraphResult{ + PubkeysByDepth: make(map[int][]string), + EventsByDepth: make(map[int][]string), + FirstSeenPubkey: make(map[string]int), + FirstSeenEvent: make(map[string]int), + } +} + +// AddPubkeyAtDepth adds a pubkey to the result at the specified depth if not already seen. +// Returns true if the pubkey was added (first time seen), false if already exists. +func (r *GraphResult) AddPubkeyAtDepth(pubkeyHex string, depth int) bool { + if _, exists := r.FirstSeenPubkey[pubkeyHex]; exists { + return false + } + + r.FirstSeenPubkey[pubkeyHex] = depth + r.PubkeysByDepth[depth] = append(r.PubkeysByDepth[depth], pubkeyHex) + r.TotalPubkeys++ + return true +} + +// AddEventAtDepth adds an event ID to the result at the specified depth if not already seen. +// Returns true if the event was added (first time seen), false if already exists. +func (r *GraphResult) AddEventAtDepth(eventIDHex string, depth int) bool { + if _, exists := r.FirstSeenEvent[eventIDHex]; exists { + return false + } + + r.FirstSeenEvent[eventIDHex] = depth + r.EventsByDepth[depth] = append(r.EventsByDepth[depth], eventIDHex) + r.TotalEvents++ + return true +} + +// HasPubkey returns true if the pubkey has been discovered at any depth. +func (r *GraphResult) HasPubkey(pubkeyHex string) bool { + _, exists := r.FirstSeenPubkey[pubkeyHex] + return exists +} + +// HasEvent returns true if the event has been discovered at any depth. +func (r *GraphResult) HasEvent(eventIDHex string) bool { + _, exists := r.FirstSeenEvent[eventIDHex] + return exists +} + +// ToDepthArrays converts the result to the response format: array of arrays. +// Index 0 = depth 1 pubkeys, Index 1 = depth 2 pubkeys, etc. +// Empty arrays are included for depths with no pubkeys to maintain index alignment. +func (r *GraphResult) ToDepthArrays() [][]string { + if len(r.PubkeysByDepth) == 0 { + return [][]string{} + } + + // Find the maximum depth + maxDepth := 0 + for d := range r.PubkeysByDepth { + if d > maxDepth { + maxDepth = d + } + } + + // Create result array with entries for each depth + result := make([][]string, maxDepth) + for i := 0; i < maxDepth; i++ { + depth := i + 1 // depths are 1-indexed + if pubkeys, exists := r.PubkeysByDepth[depth]; exists { + result[i] = pubkeys + } else { + result[i] = []string{} // Empty array for depths with no pubkeys + } + } + return result +} + +// ToEventDepthArrays converts event results to the response format: array of arrays. +// Index 0 = depth 1 events, Index 1 = depth 2 events, etc. +func (r *GraphResult) ToEventDepthArrays() [][]string { + if len(r.EventsByDepth) == 0 { + return [][]string{} + } + + maxDepth := 0 + for d := range r.EventsByDepth { + if d > maxDepth { + maxDepth = d + } + } + + result := make([][]string, maxDepth) + for i := 0; i < maxDepth; i++ { + depth := i + 1 + if events, exists := r.EventsByDepth[depth]; exists { + result[i] = events + } else { + result[i] = []string{} + } + } + return result +} + +// GetAllPubkeys returns all pubkeys discovered across all depths. +func (r *GraphResult) GetAllPubkeys() []string { + all := make([]string, 0, r.TotalPubkeys) + for _, pubkeys := range r.PubkeysByDepth { + all = append(all, pubkeys...) + } + return all +} + +// GetAllEvents returns all event IDs discovered across all depths. +func (r *GraphResult) GetAllEvents() []string { + all := make([]string, 0, r.TotalEvents) + for _, events := range r.EventsByDepth { + all = append(all, events...) + } + return all +} + +// GetPubkeysByDepth returns the PubkeysByDepth map for external access. +func (r *GraphResult) GetPubkeysByDepth() map[int][]string { + return r.PubkeysByDepth +} + +// GetEventsByDepth returns the EventsByDepth map for external access. +func (r *GraphResult) GetEventsByDepth() map[int][]string { + return r.EventsByDepth +} + +// GetTotalPubkeys returns the total pubkey count for external access. +func (r *GraphResult) GetTotalPubkeys() int { + return r.TotalPubkeys +} + +// GetTotalEvents returns the total event count for external access. +func (r *GraphResult) GetTotalEvents() int { + return r.TotalEvents +} + +// GetDepthsSorted returns all depths that have pubkeys, sorted ascending. +func (r *GraphResult) GetDepthsSorted() []int { + depths := make([]int, 0, len(r.PubkeysByDepth)) + for d := range r.PubkeysByDepth { + depths = append(depths, d) + } + sort.Ints(depths) + return depths +} + +// GetEventDepthsSorted returns all depths that have events, sorted ascending. +func (r *GraphResult) GetEventDepthsSorted() []int { + depths := make([]int, 0, len(r.EventsByDepth)) + for d := range r.EventsByDepth { + depths = append(depths, d) + } + sort.Ints(depths) + return depths +} diff --git a/pkg/neo4j/graph-thread.go b/pkg/neo4j/graph-thread.go new file mode 100644 index 0000000..e6efc8d --- /dev/null +++ b/pkg/neo4j/graph-thread.go @@ -0,0 +1,277 @@ +package neo4j + +import ( + "context" + "fmt" + "strings" + + "git.mleku.dev/mleku/nostr/encoders/hex" + "next.orly.dev/pkg/protocol/graph" +) + +// TraverseThread performs BFS traversal of thread structure via e-tags. +// Starting from a seed event, it finds all replies/references at each depth. +// +// The traversal works bidirectionally using REFERENCES relationships: +// - Inbound: Events that reference the seed (replies, reactions, reposts) +// - Outbound: Events that the seed references (parents, quoted posts) +// +// Note: REFERENCES relationships are only created if the referenced event exists +// in the database at the time of saving. This means some references may be missing +// if events were stored out of order. +// +// Parameters: +// - seedEventID: The event ID to start traversal from +// - maxDepth: Maximum depth to traverse +// - direction: "both" (default), "inbound" (replies to seed), "outbound" (seed's references) +func (n *N) TraverseThread(seedEventID []byte, maxDepth int, direction string) (graph.GraphResultI, error) { + result := NewGraphResult() + + if len(seedEventID) != 32 { + return result, fmt.Errorf("invalid event ID length: expected 32, got %d", len(seedEventID)) + } + + seedHex := strings.ToLower(hex.Enc(seedEventID)) + ctx := context.Background() + + // Normalize direction + if direction == "" { + direction = "both" + } + + // Track visited events + visited := make(map[string]bool) + visited[seedHex] = true + + // Process each depth level separately for BFS semantics + for depth := 1; depth <= maxDepth; depth++ { + newEventsAtDepth := 0 + + // Get events at current depth + visitedList := make([]string, 0, len(visited)) + for id := range visited { + visitedList = append(visitedList, id) + } + + // Process inbound references (events that reference the seed or its children) + if direction == "both" || direction == "inbound" { + inboundEvents, err := n.getInboundReferencesAtDepth(ctx, seedHex, depth, visitedList) + if err != nil { + n.Logger.Warningf("TraverseThread: error getting inbound refs at depth %d: %v", depth, err) + } else { + for _, eventID := range inboundEvents { + if !visited[eventID] { + visited[eventID] = true + result.AddEventAtDepth(eventID, depth) + newEventsAtDepth++ + } + } + } + } + + // Process outbound references (events that the seed or its children reference) + if direction == "both" || direction == "outbound" { + outboundEvents, err := n.getOutboundReferencesAtDepth(ctx, seedHex, depth, visitedList) + if err != nil { + n.Logger.Warningf("TraverseThread: error getting outbound refs at depth %d: %v", depth, err) + } else { + for _, eventID := range outboundEvents { + if !visited[eventID] { + visited[eventID] = true + result.AddEventAtDepth(eventID, depth) + newEventsAtDepth++ + } + } + } + } + + n.Logger.Debugf("TraverseThread: depth %d found %d new events", depth, newEventsAtDepth) + + // Early termination if no new events found at this depth + if newEventsAtDepth == 0 { + break + } + } + + n.Logger.Debugf("TraverseThread: completed with %d total events", result.TotalEvents) + + return result, nil +} + +// getInboundReferencesAtDepth finds events that reference the seed event at exactly the given depth. +// Uses variable-length path patterns to find events N hops away. +func (n *N) getInboundReferencesAtDepth(ctx context.Context, seedID string, depth int, visited []string) ([]string, error) { + // Query for events at exactly this depth that haven't been seen yet + // Direction: (referencing_event)-[:REFERENCES]->(seed) + // At depth 1: direct replies + // At depth 2: replies to replies, etc. + cypher := fmt.Sprintf(` + MATCH path = (ref:Event)-[:REFERENCES*%d]->(seed:Event {id: $seed}) + WHERE ref.id <> $seed + AND NOT ref.id IN $visited + RETURN DISTINCT ref.id AS event_id + `, depth) + + params := map[string]any{ + "seed": seedID, + "visited": visited, + } + + result, err := n.ExecuteRead(ctx, cypher, params) + if err != nil { + return nil, err + } + + var events []string + for result.Next(ctx) { + record := result.Record() + eventID, ok := record.Values[0].(string) + if !ok || eventID == "" { + continue + } + events = append(events, strings.ToLower(eventID)) + } + + return events, nil +} + +// getOutboundReferencesAtDepth finds events that the seed event references at exactly the given depth. +// Uses variable-length path patterns to find events N hops away. +func (n *N) getOutboundReferencesAtDepth(ctx context.Context, seedID string, depth int, visited []string) ([]string, error) { + // Query for events at exactly this depth that haven't been seen yet + // Direction: (seed)-[:REFERENCES]->(referenced_event) + // At depth 1: direct parents/quotes + // At depth 2: grandparents, etc. + cypher := fmt.Sprintf(` + MATCH path = (seed:Event {id: $seed})-[:REFERENCES*%d]->(ref:Event) + WHERE ref.id <> $seed + AND NOT ref.id IN $visited + RETURN DISTINCT ref.id AS event_id + `, depth) + + params := map[string]any{ + "seed": seedID, + "visited": visited, + } + + result, err := n.ExecuteRead(ctx, cypher, params) + if err != nil { + return nil, err + } + + var events []string + for result.Next(ctx) { + record := result.Record() + eventID, ok := record.Values[0].(string) + if !ok || eventID == "" { + continue + } + events = append(events, strings.ToLower(eventID)) + } + + return events, nil +} + +// TraverseThreadFromHex is a convenience wrapper that accepts hex-encoded event ID. +func (n *N) TraverseThreadFromHex(seedEventIDHex string, maxDepth int, direction string) (*GraphResult, error) { + seedEventID, err := hex.Dec(seedEventIDHex) + if err != nil { + return nil, err + } + result, err := n.TraverseThread(seedEventID, maxDepth, direction) + if err != nil { + return nil, err + } + return result.(*GraphResult), nil +} + +// GetThreadReplies finds all direct replies to an event. +// This is a convenience method that returns events at depth 1 with inbound direction. +func (n *N) GetThreadReplies(eventID []byte, kinds []uint16) (*GraphResult, error) { + result := NewGraphResult() + + if len(eventID) != 32 { + return result, fmt.Errorf("invalid event ID length: expected 32, got %d", len(eventID)) + } + + eventIDHex := strings.ToLower(hex.Enc(eventID)) + ctx := context.Background() + + // Build kinds filter if specified + var kindsFilter string + params := map[string]any{ + "eventId": eventIDHex, + } + + if len(kinds) > 0 { + kindsInt := make([]int64, len(kinds)) + for i, k := range kinds { + kindsInt[i] = int64(k) + } + params["kinds"] = kindsInt + kindsFilter = "AND reply.kind IN $kinds" + } + + // Query for direct replies + cypher := fmt.Sprintf(` + MATCH (reply:Event)-[:REFERENCES]->(e:Event {id: $eventId}) + WHERE true %s + RETURN reply.id AS event_id + ORDER BY reply.created_at DESC + `, kindsFilter) + + queryResult, err := n.ExecuteRead(ctx, cypher, params) + if err != nil { + return result, fmt.Errorf("failed to query replies: %w", err) + } + + for queryResult.Next(ctx) { + record := queryResult.Record() + replyID, ok := record.Values[0].(string) + if !ok || replyID == "" { + continue + } + result.AddEventAtDepth(strings.ToLower(replyID), 1) + } + + return result, nil +} + +// GetThreadParents finds events that a given event references (its parents/quotes). +func (n *N) GetThreadParents(eventID []byte) (*GraphResult, error) { + result := NewGraphResult() + + if len(eventID) != 32 { + return result, fmt.Errorf("invalid event ID length: expected 32, got %d", len(eventID)) + } + + eventIDHex := strings.ToLower(hex.Enc(eventID)) + ctx := context.Background() + + params := map[string]any{ + "eventId": eventIDHex, + } + + // Query for events that this event references + cypher := ` + MATCH (e:Event {id: $eventId})-[:REFERENCES]->(parent:Event) + RETURN parent.id AS event_id + ORDER BY parent.created_at ASC + ` + + queryResult, err := n.ExecuteRead(ctx, cypher, params) + if err != nil { + return result, fmt.Errorf("failed to query parents: %w", err) + } + + for queryResult.Next(ctx) { + record := queryResult.Record() + parentID, ok := record.Values[0].(string) + if !ok || parentID == "" { + continue + } + result.AddEventAtDepth(strings.ToLower(parentID), 1) + } + + return result, nil +} diff --git a/pkg/version/version b/pkg/version/version index a37f0fe..8a45319 100644 --- a/pkg/version/version +++ b/pkg/version/version @@ -1 +1 @@ -v0.34.7 \ No newline at end of file +v0.35.0 \ No newline at end of file