Fix duplicate REPORTS relationships in Neo4j backend (v0.36.1)
Some checks failed
Go / build-and-release (push) Has been cancelled
Some checks failed
Go / build-and-release (push) Has been cancelled
- Change processReport() to use MERGE instead of CREATE for REPORTS relationships, deduplicating by (reporter, reported, report_type) - Add ON CREATE/ON MATCH clauses to preserve newest event data while preventing duplicate relationships - Add getExistingReportEvent() helper to check for existing reports - Add markReportEventSuperseded() to track superseded events - Add v4 migration migrateDeduplicateReports() to clean up existing duplicate REPORTS relationships in databases - Add comprehensive tests: TestReportDeduplication with subtests for deduplication, different types, and superseded event tracking - Update WOT_SPEC.md with REPORTS deduplication behavior and correct property names (report_type, created_at, created_by_event) - Bump version to v0.36.1 Fixes: https://git.nostrdev.com/mleku/next.orly.dev/issues/16 Files modified: - pkg/neo4j/social-event-processor.go: MERGE-based deduplication - pkg/neo4j/migrations.go: v4 migration for duplicate cleanup - pkg/neo4j/social-event-processor_test.go: Deduplication tests - pkg/neo4j/WOT_SPEC.md: Updated REPORTS documentation - pkg/version/version: Bump to v0.36.1 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -211,6 +211,8 @@ func (p *SocialEventProcessor) processMuteList(ctx context.Context, ev *event.E)
|
||||
}
|
||||
|
||||
// processReport handles kind 1984 events (reports)
|
||||
// Deduplicates by (reporter, reported, report_type) - only one REPORTS relationship
|
||||
// per combination, with the most recent event's data preserved.
|
||||
func (p *SocialEventProcessor) processReport(ctx context.Context, ev *event.E) error {
|
||||
reporterPubkey := hex.Enc(ev.Pubkey[:])
|
||||
eventID := hex.Enc(ev.ID[:])
|
||||
@@ -236,8 +238,14 @@ func (p *SocialEventProcessor) processReport(ctx context.Context, ev *event.E) e
|
||||
return nil
|
||||
}
|
||||
|
||||
// Create REPORTS relationship
|
||||
// Note: WITH is required between CREATE and MERGE in Cypher
|
||||
// Check for existing report of the same type to determine if this is an update
|
||||
existingEventID, err := p.getExistingReportEvent(ctx, reporterPubkey, reportedPubkey, reportType)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to check existing report: %w", err)
|
||||
}
|
||||
|
||||
// Create REPORTS relationship with MERGE to deduplicate
|
||||
// MERGE on (reporter, reported, report_type) ensures only one relationship per combination
|
||||
cypher := `
|
||||
// Create event tracking node
|
||||
CREATE (evt:ProcessedSocialEvent {
|
||||
@@ -257,13 +265,18 @@ func (p *SocialEventProcessor) processReport(ctx context.Context, ev *event.E) e
|
||||
MERGE (reporter:NostrUser {pubkey: $reporter_pubkey})
|
||||
MERGE (reported:NostrUser {pubkey: $reported_pubkey})
|
||||
|
||||
// Create REPORTS relationship
|
||||
CREATE (reporter)-[:REPORTS {
|
||||
created_by_event: $event_id,
|
||||
created_at: $created_at,
|
||||
relay_received_at: timestamp(),
|
||||
report_type: $report_type
|
||||
}]->(reported)
|
||||
// MERGE on (reporter, reported, report_type) - deduplicate!
|
||||
MERGE (reporter)-[r:REPORTS {report_type: $report_type}]->(reported)
|
||||
ON CREATE SET
|
||||
r.created_by_event = $event_id,
|
||||
r.created_at = $created_at,
|
||||
r.relay_received_at = timestamp()
|
||||
ON MATCH SET
|
||||
// Only update if this event is newer
|
||||
r.created_by_event = CASE WHEN $created_at > r.created_at
|
||||
THEN $event_id ELSE r.created_by_event END,
|
||||
r.created_at = CASE WHEN $created_at > r.created_at
|
||||
THEN $created_at ELSE r.created_at END
|
||||
`
|
||||
|
||||
params := map[string]any{
|
||||
@@ -274,9 +287,14 @@ func (p *SocialEventProcessor) processReport(ctx context.Context, ev *event.E) e
|
||||
"report_type": reportType,
|
||||
}
|
||||
|
||||
_, err := p.db.ExecuteWrite(ctx, cypher, params)
|
||||
_, err = p.db.ExecuteWrite(ctx, cypher, params)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create report: %w", err)
|
||||
return fmt.Errorf("failed to create/update report: %w", err)
|
||||
}
|
||||
|
||||
// Mark old ProcessedSocialEvent as superseded if this is an update with newer data
|
||||
if existingEventID != "" && existingEventID != eventID {
|
||||
p.markReportEventSuperseded(ctx, existingEventID, eventID)
|
||||
}
|
||||
|
||||
p.db.Logger.Infof("processed report: reporter=%s, reported=%s, type=%s",
|
||||
@@ -285,6 +303,52 @@ func (p *SocialEventProcessor) processReport(ctx context.Context, ev *event.E) e
|
||||
return nil
|
||||
}
|
||||
|
||||
// getExistingReportEvent checks if a REPORTS relationship already exists for this combination
|
||||
// Returns the event ID that created the relationship, or empty string if none exists
|
||||
func (p *SocialEventProcessor) getExistingReportEvent(ctx context.Context, reporterPubkey, reportedPubkey, reportType string) (string, error) {
|
||||
cypher := `
|
||||
MATCH (reporter:NostrUser {pubkey: $reporter_pubkey})-[r:REPORTS {report_type: $report_type}]->(reported:NostrUser {pubkey: $reported_pubkey})
|
||||
RETURN r.created_by_event AS event_id
|
||||
LIMIT 1
|
||||
`
|
||||
|
||||
params := map[string]any{
|
||||
"reporter_pubkey": reporterPubkey,
|
||||
"reported_pubkey": reportedPubkey,
|
||||
"report_type": reportType,
|
||||
}
|
||||
|
||||
result, err := p.db.ExecuteRead(ctx, cypher, params)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
if result.Next(ctx) {
|
||||
record := result.Record()
|
||||
if eventID, ok := record.Values[0].(string); ok {
|
||||
return eventID, nil
|
||||
}
|
||||
}
|
||||
|
||||
return "", nil
|
||||
}
|
||||
|
||||
// markReportEventSuperseded marks an older ProcessedSocialEvent as superseded by a newer one
|
||||
func (p *SocialEventProcessor) markReportEventSuperseded(ctx context.Context, oldEventID, newEventID string) {
|
||||
cypher := `
|
||||
MATCH (old:ProcessedSocialEvent {event_id: $old_event_id, event_kind: 1984})
|
||||
SET old.superseded_by = $new_event_id
|
||||
`
|
||||
|
||||
params := map[string]any{
|
||||
"old_event_id": oldEventID,
|
||||
"new_event_id": newEventID,
|
||||
}
|
||||
|
||||
// Ignore errors - old event may not exist
|
||||
p.db.ExecuteWrite(ctx, cypher, params)
|
||||
}
|
||||
|
||||
// UpdateContactListParams holds parameters for contact list graph update
|
||||
type UpdateContactListParams struct {
|
||||
AuthorPubkey string
|
||||
|
||||
Reference in New Issue
Block a user