Files
next.orly.dev/pkg/neo4j/graph-thread.go
mleku ba84e12ea9
Some checks failed
Go / build-and-release (push) Has been cancelled
Add _graph extension support to Neo4j driver
- Implement TraverseFollows using Cypher path queries on FOLLOWS relationships
- Implement TraverseFollowers using reverse path traversal
- Implement FindMentions using MENTIONS relationships from p-tags
- Implement TraverseThread using REFERENCES relationships from e-tags
  with bidirectional traversal (inbound replies, outbound parents)
- Add GraphAdapter to bridge Neo4j to graph.GraphDatabase interface
- Add GraphResult type implementing graph.GraphResultI for Neo4j
- Initialize graph executor for Neo4j backend in app/main.go

The implementation uses existing Neo4j schema and relationships created
by SaveEvent() - no schema changes required. The _graph extension now
works transparently with either Badger or Neo4j backends.

Bump version to v0.35.0

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-12 07:07:31 +01:00

278 lines
7.9 KiB
Go

package neo4j
import (
"context"
"fmt"
"strings"
"git.mleku.dev/mleku/nostr/encoders/hex"
"next.orly.dev/pkg/protocol/graph"
)
// TraverseThread performs BFS traversal of thread structure via e-tags.
// Starting from a seed event, it finds all replies/references at each depth.
//
// The traversal works bidirectionally using REFERENCES relationships:
// - Inbound: Events that reference the seed (replies, reactions, reposts)
// - Outbound: Events that the seed references (parents, quoted posts)
//
// Note: REFERENCES relationships are only created if the referenced event exists
// in the database at the time of saving. This means some references may be missing
// if events were stored out of order.
//
// Parameters:
// - seedEventID: The event ID to start traversal from
// - maxDepth: Maximum depth to traverse
// - direction: "both" (default), "inbound" (replies to seed), "outbound" (seed's references)
func (n *N) TraverseThread(seedEventID []byte, maxDepth int, direction string) (graph.GraphResultI, error) {
result := NewGraphResult()
if len(seedEventID) != 32 {
return result, fmt.Errorf("invalid event ID length: expected 32, got %d", len(seedEventID))
}
seedHex := strings.ToLower(hex.Enc(seedEventID))
ctx := context.Background()
// Normalize direction
if direction == "" {
direction = "both"
}
// Track visited events
visited := make(map[string]bool)
visited[seedHex] = true
// Process each depth level separately for BFS semantics
for depth := 1; depth <= maxDepth; depth++ {
newEventsAtDepth := 0
// Get events at current depth
visitedList := make([]string, 0, len(visited))
for id := range visited {
visitedList = append(visitedList, id)
}
// Process inbound references (events that reference the seed or its children)
if direction == "both" || direction == "inbound" {
inboundEvents, err := n.getInboundReferencesAtDepth(ctx, seedHex, depth, visitedList)
if err != nil {
n.Logger.Warningf("TraverseThread: error getting inbound refs at depth %d: %v", depth, err)
} else {
for _, eventID := range inboundEvents {
if !visited[eventID] {
visited[eventID] = true
result.AddEventAtDepth(eventID, depth)
newEventsAtDepth++
}
}
}
}
// Process outbound references (events that the seed or its children reference)
if direction == "both" || direction == "outbound" {
outboundEvents, err := n.getOutboundReferencesAtDepth(ctx, seedHex, depth, visitedList)
if err != nil {
n.Logger.Warningf("TraverseThread: error getting outbound refs at depth %d: %v", depth, err)
} else {
for _, eventID := range outboundEvents {
if !visited[eventID] {
visited[eventID] = true
result.AddEventAtDepth(eventID, depth)
newEventsAtDepth++
}
}
}
}
n.Logger.Debugf("TraverseThread: depth %d found %d new events", depth, newEventsAtDepth)
// Early termination if no new events found at this depth
if newEventsAtDepth == 0 {
break
}
}
n.Logger.Debugf("TraverseThread: completed with %d total events", result.TotalEvents)
return result, nil
}
// getInboundReferencesAtDepth finds events that reference the seed event at exactly the given depth.
// Uses variable-length path patterns to find events N hops away.
func (n *N) getInboundReferencesAtDepth(ctx context.Context, seedID string, depth int, visited []string) ([]string, error) {
// Query for events at exactly this depth that haven't been seen yet
// Direction: (referencing_event)-[:REFERENCES]->(seed)
// At depth 1: direct replies
// At depth 2: replies to replies, etc.
cypher := fmt.Sprintf(`
MATCH path = (ref:Event)-[:REFERENCES*%d]->(seed:Event {id: $seed})
WHERE ref.id <> $seed
AND NOT ref.id IN $visited
RETURN DISTINCT ref.id AS event_id
`, depth)
params := map[string]any{
"seed": seedID,
"visited": visited,
}
result, err := n.ExecuteRead(ctx, cypher, params)
if err != nil {
return nil, err
}
var events []string
for result.Next(ctx) {
record := result.Record()
eventID, ok := record.Values[0].(string)
if !ok || eventID == "" {
continue
}
events = append(events, strings.ToLower(eventID))
}
return events, nil
}
// getOutboundReferencesAtDepth finds events that the seed event references at exactly the given depth.
// Uses variable-length path patterns to find events N hops away.
func (n *N) getOutboundReferencesAtDepth(ctx context.Context, seedID string, depth int, visited []string) ([]string, error) {
// Query for events at exactly this depth that haven't been seen yet
// Direction: (seed)-[:REFERENCES]->(referenced_event)
// At depth 1: direct parents/quotes
// At depth 2: grandparents, etc.
cypher := fmt.Sprintf(`
MATCH path = (seed:Event {id: $seed})-[:REFERENCES*%d]->(ref:Event)
WHERE ref.id <> $seed
AND NOT ref.id IN $visited
RETURN DISTINCT ref.id AS event_id
`, depth)
params := map[string]any{
"seed": seedID,
"visited": visited,
}
result, err := n.ExecuteRead(ctx, cypher, params)
if err != nil {
return nil, err
}
var events []string
for result.Next(ctx) {
record := result.Record()
eventID, ok := record.Values[0].(string)
if !ok || eventID == "" {
continue
}
events = append(events, strings.ToLower(eventID))
}
return events, nil
}
// TraverseThreadFromHex is a convenience wrapper that accepts hex-encoded event ID.
func (n *N) TraverseThreadFromHex(seedEventIDHex string, maxDepth int, direction string) (*GraphResult, error) {
seedEventID, err := hex.Dec(seedEventIDHex)
if err != nil {
return nil, err
}
result, err := n.TraverseThread(seedEventID, maxDepth, direction)
if err != nil {
return nil, err
}
return result.(*GraphResult), nil
}
// GetThreadReplies finds all direct replies to an event.
// This is a convenience method that returns events at depth 1 with inbound direction.
func (n *N) GetThreadReplies(eventID []byte, kinds []uint16) (*GraphResult, error) {
result := NewGraphResult()
if len(eventID) != 32 {
return result, fmt.Errorf("invalid event ID length: expected 32, got %d", len(eventID))
}
eventIDHex := strings.ToLower(hex.Enc(eventID))
ctx := context.Background()
// Build kinds filter if specified
var kindsFilter string
params := map[string]any{
"eventId": eventIDHex,
}
if len(kinds) > 0 {
kindsInt := make([]int64, len(kinds))
for i, k := range kinds {
kindsInt[i] = int64(k)
}
params["kinds"] = kindsInt
kindsFilter = "AND reply.kind IN $kinds"
}
// Query for direct replies
cypher := fmt.Sprintf(`
MATCH (reply:Event)-[:REFERENCES]->(e:Event {id: $eventId})
WHERE true %s
RETURN reply.id AS event_id
ORDER BY reply.created_at DESC
`, kindsFilter)
queryResult, err := n.ExecuteRead(ctx, cypher, params)
if err != nil {
return result, fmt.Errorf("failed to query replies: %w", err)
}
for queryResult.Next(ctx) {
record := queryResult.Record()
replyID, ok := record.Values[0].(string)
if !ok || replyID == "" {
continue
}
result.AddEventAtDepth(strings.ToLower(replyID), 1)
}
return result, nil
}
// GetThreadParents finds events that a given event references (its parents/quotes).
func (n *N) GetThreadParents(eventID []byte) (*GraphResult, error) {
result := NewGraphResult()
if len(eventID) != 32 {
return result, fmt.Errorf("invalid event ID length: expected 32, got %d", len(eventID))
}
eventIDHex := strings.ToLower(hex.Enc(eventID))
ctx := context.Background()
params := map[string]any{
"eventId": eventIDHex,
}
// Query for events that this event references
cypher := `
MATCH (e:Event {id: $eventId})-[:REFERENCES]->(parent:Event)
RETURN parent.id AS event_id
ORDER BY parent.created_at ASC
`
queryResult, err := n.ExecuteRead(ctx, cypher, params)
if err != nil {
return result, fmt.Errorf("failed to query parents: %w", err)
}
for queryResult.Next(ctx) {
record := queryResult.Record()
parentID, ok := record.Values[0].(string)
if !ok || parentID == "" {
continue
}
result.AddEventAtDepth(strings.ToLower(parentID), 1)
}
return result, nil
}