From 9d6280eab178b92cf6563da615ea371c2d5fd1ce Mon Sep 17 00:00:00 2001 From: mleku Date: Tue, 16 Dec 2025 10:13:15 +0100 Subject: [PATCH] Fix duplicate REPORTS relationships in Neo4j backend (v0.36.1) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Change processReport() to use MERGE instead of CREATE for REPORTS relationships, deduplicating by (reporter, reported, report_type) - Add ON CREATE/ON MATCH clauses to preserve newest event data while preventing duplicate relationships - Add getExistingReportEvent() helper to check for existing reports - Add markReportEventSuperseded() to track superseded events - Add v4 migration migrateDeduplicateReports() to clean up existing duplicate REPORTS relationships in databases - Add comprehensive tests: TestReportDeduplication with subtests for deduplication, different types, and superseded event tracking - Update WOT_SPEC.md with REPORTS deduplication behavior and correct property names (report_type, created_at, created_by_event) - Bump version to v0.36.1 Fixes: https://git.nostrdev.com/mleku/next.orly.dev/issues/16 Files modified: - pkg/neo4j/social-event-processor.go: MERGE-based deduplication - pkg/neo4j/migrations.go: v4 migration for duplicate cleanup - pkg/neo4j/social-event-processor_test.go: Deduplication tests - pkg/neo4j/WOT_SPEC.md: Updated REPORTS documentation - pkg/version/version: Bump to v0.36.1 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- pkg/neo4j/WOT_SPEC.md | 17 +- pkg/neo4j/migrations.go | 103 +++++++++ pkg/neo4j/social-event-processor.go | 86 +++++++- pkg/neo4j/social-event-processor_test.go | 261 +++++++++++++++++++++++ pkg/version/version | 2 +- 5 files changed, 452 insertions(+), 17 deletions(-) diff --git a/pkg/neo4j/WOT_SPEC.md b/pkg/neo4j/WOT_SPEC.md index 13e5b5a..6b95bc5 100644 --- a/pkg/neo4j/WOT_SPEC.md +++ b/pkg/neo4j/WOT_SPEC.md @@ -185,11 +185,18 @@ Represents a report filed against a user (derived from kind 1984 events). **Direction:** `(reporter:NostrUser)-[:REPORTS]->(reported:NostrUser)` -**Properties:** -- `reportType` (string) - NIP-56 report type (impersonation, spam, illegal, malware, nsfw, etc.) -- `timestamp` (integer) - When the report was filed +**Deduplication:** Only one REPORTS relationship exists per (reporter, reported, report_type) combination. +Multiple reports of the same type from the same user to the same target update the existing +relationship with the most recent event's data. This prevents double-counting in GrapeRank +calculations while maintaining audit trails via ProcessedSocialEvent nodes. -**Source:** Created from kind 1984 (reporting) events +**Properties:** +- `report_type` (string) - NIP-56 report type (impersonation, spam, illegal, malware, nsfw, etc.) +- `created_at` (integer) - Timestamp of the most recent report event +- `created_by_event` (string) - Event ID of the most recent report +- `relay_received_at` (integer) - When the relay first received any report of this type + +**Source:** Created/updated from kind 1984 (reporting) events #### 4. WOT_METRICS_CARDS @@ -221,7 +228,7 @@ The WoT model processes the following Nostr event kinds: |------|------|---------|--------------| | 0 | Profile Metadata | User profile information | Update NostrUser properties (npub, name, etc.) | | 3 | Contact List | Follow list | Create/update FOLLOWS relationships | -| 1984 | Reporting | Report users/content | Create REPORTS relationships with reportType | +| 1984 | Reporting | Report users/content | Create/update REPORTS relationships (deduplicated by report_type) | | 10000 | Mute List | Mute list | Create/update MUTES relationships | | 30382 | Trusted Assertion (NIP-85) | Published trust metrics | Create/update NostrUserWotMetricsCard nodes | diff --git a/pkg/neo4j/migrations.go b/pkg/neo4j/migrations.go index 1ee9f8d..f8e7c30 100644 --- a/pkg/neo4j/migrations.go +++ b/pkg/neo4j/migrations.go @@ -30,6 +30,11 @@ var migrations = []Migration{ Description: "Convert direct REFERENCES/MENTIONS relationships to Tag-based model", Migrate: migrateToTagBasedReferences, }, + { + Version: "v4", + Description: "Deduplicate REPORTS relationships by (reporter, reported, report_type)", + Migrate: migrateDeduplicateReports, + }, } // RunMigrations executes all pending migrations @@ -492,3 +497,101 @@ func migrateToTagBasedReferences(ctx context.Context, n *N) error { n.Logger.Infof("Tag-based references migration completed successfully") return nil } + +// migrateDeduplicateReports removes duplicate REPORTS relationships +// Prior to this migration, processReport() used CREATE which allowed multiple +// REPORTS relationships with the same report_type between the same two users. +// This migration keeps only the most recent report (by created_at) for each +// (reporter, reported, report_type) combination. +func migrateDeduplicateReports(ctx context.Context, n *N) error { + // Step 1: Count duplicate REPORTS relationships + // Duplicates are defined as multiple REPORTS with the same (reporter, reported, report_type) + countDuplicatesCypher := ` + MATCH (reporter:NostrUser)-[r:REPORTS]->(reported:NostrUser) + WITH reporter, reported, r.report_type AS type, collect(r) AS rels + WHERE size(rels) > 1 + RETURN sum(size(rels) - 1) AS duplicate_count + ` + result, err := n.ExecuteRead(ctx, countDuplicatesCypher, nil) + if err != nil { + return fmt.Errorf("failed to count duplicate REPORTS: %w", err) + } + + var duplicateCount int64 + if result.Next(ctx) { + if count, ok := result.Record().Values[0].(int64); ok { + duplicateCount = count + } + } + + if duplicateCount == 0 { + n.Logger.Infof("no duplicate REPORTS relationships found, migration complete") + return nil + } + + n.Logger.Infof("found %d duplicate REPORTS relationships to remove", duplicateCount) + + // Step 2: Delete duplicate REPORTS, keeping the one with the highest created_at + // This query: + // 1. Groups REPORTS by (reporter, reported, report_type) + // 2. Finds the maximum created_at for each group + // 3. Deletes all relationships in the group except the newest one + deleteDuplicatesCypher := ` + MATCH (reporter:NostrUser)-[r:REPORTS]->(reported:NostrUser) + WITH reporter, reported, r.report_type AS type, + collect(r) AS rels, max(r.created_at) AS maxCreatedAt + WHERE size(rels) > 1 + UNWIND rels AS rel + WITH rel, maxCreatedAt + WHERE rel.created_at < maxCreatedAt + DELETE rel + RETURN count(*) AS deleted + ` + + writeResult, err := n.ExecuteWrite(ctx, deleteDuplicatesCypher, nil) + if err != nil { + return fmt.Errorf("failed to delete duplicate REPORTS: %w", err) + } + + var deletedCount int64 + if writeResult.Next(ctx) { + if count, ok := writeResult.Record().Values[0].(int64); ok { + deletedCount = count + } + } + + n.Logger.Infof("deleted %d duplicate REPORTS relationships", deletedCount) + + // Step 3: Mark superseded ProcessedSocialEvent nodes for deleted reports + // Find ProcessedSocialEvent nodes (kind 1984) whose event IDs are no longer + // referenced by any REPORTS relationship's created_by_event + markSupersededCypher := ` + MATCH (evt:ProcessedSocialEvent {event_kind: 1984}) + WHERE evt.superseded_by IS NULL + AND NOT EXISTS { + MATCH ()-[r:REPORTS]->() + WHERE r.created_by_event = evt.event_id + } + SET evt.superseded_by = 'migration_v4_dedupe' + RETURN count(evt) AS superseded + ` + + markResult, err := n.ExecuteWrite(ctx, markSupersededCypher, nil) + if err != nil { + // Non-fatal - just log warning + n.Logger.Warningf("failed to mark superseded ProcessedSocialEvent nodes: %v", err) + } else { + var supersededCount int64 + if markResult.Next(ctx) { + if count, ok := markResult.Record().Values[0].(int64); ok { + supersededCount = count + } + } + if supersededCount > 0 { + n.Logger.Infof("marked %d ProcessedSocialEvent nodes as superseded", supersededCount) + } + } + + n.Logger.Infof("REPORTS deduplication migration completed successfully") + return nil +} diff --git a/pkg/neo4j/social-event-processor.go b/pkg/neo4j/social-event-processor.go index f8a350a..d781b0d 100644 --- a/pkg/neo4j/social-event-processor.go +++ b/pkg/neo4j/social-event-processor.go @@ -211,6 +211,8 @@ func (p *SocialEventProcessor) processMuteList(ctx context.Context, ev *event.E) } // processReport handles kind 1984 events (reports) +// Deduplicates by (reporter, reported, report_type) - only one REPORTS relationship +// per combination, with the most recent event's data preserved. func (p *SocialEventProcessor) processReport(ctx context.Context, ev *event.E) error { reporterPubkey := hex.Enc(ev.Pubkey[:]) eventID := hex.Enc(ev.ID[:]) @@ -236,8 +238,14 @@ func (p *SocialEventProcessor) processReport(ctx context.Context, ev *event.E) e return nil } - // Create REPORTS relationship - // Note: WITH is required between CREATE and MERGE in Cypher + // Check for existing report of the same type to determine if this is an update + existingEventID, err := p.getExistingReportEvent(ctx, reporterPubkey, reportedPubkey, reportType) + if err != nil { + return fmt.Errorf("failed to check existing report: %w", err) + } + + // Create REPORTS relationship with MERGE to deduplicate + // MERGE on (reporter, reported, report_type) ensures only one relationship per combination cypher := ` // Create event tracking node CREATE (evt:ProcessedSocialEvent { @@ -257,13 +265,18 @@ func (p *SocialEventProcessor) processReport(ctx context.Context, ev *event.E) e MERGE (reporter:NostrUser {pubkey: $reporter_pubkey}) MERGE (reported:NostrUser {pubkey: $reported_pubkey}) - // Create REPORTS relationship - CREATE (reporter)-[:REPORTS { - created_by_event: $event_id, - created_at: $created_at, - relay_received_at: timestamp(), - report_type: $report_type - }]->(reported) + // MERGE on (reporter, reported, report_type) - deduplicate! + MERGE (reporter)-[r:REPORTS {report_type: $report_type}]->(reported) + ON CREATE SET + r.created_by_event = $event_id, + r.created_at = $created_at, + r.relay_received_at = timestamp() + ON MATCH SET + // Only update if this event is newer + r.created_by_event = CASE WHEN $created_at > r.created_at + THEN $event_id ELSE r.created_by_event END, + r.created_at = CASE WHEN $created_at > r.created_at + THEN $created_at ELSE r.created_at END ` params := map[string]any{ @@ -274,9 +287,14 @@ func (p *SocialEventProcessor) processReport(ctx context.Context, ev *event.E) e "report_type": reportType, } - _, err := p.db.ExecuteWrite(ctx, cypher, params) + _, err = p.db.ExecuteWrite(ctx, cypher, params) if err != nil { - return fmt.Errorf("failed to create report: %w", err) + return fmt.Errorf("failed to create/update report: %w", err) + } + + // Mark old ProcessedSocialEvent as superseded if this is an update with newer data + if existingEventID != "" && existingEventID != eventID { + p.markReportEventSuperseded(ctx, existingEventID, eventID) } p.db.Logger.Infof("processed report: reporter=%s, reported=%s, type=%s", @@ -285,6 +303,52 @@ func (p *SocialEventProcessor) processReport(ctx context.Context, ev *event.E) e return nil } +// getExistingReportEvent checks if a REPORTS relationship already exists for this combination +// Returns the event ID that created the relationship, or empty string if none exists +func (p *SocialEventProcessor) getExistingReportEvent(ctx context.Context, reporterPubkey, reportedPubkey, reportType string) (string, error) { + cypher := ` + MATCH (reporter:NostrUser {pubkey: $reporter_pubkey})-[r:REPORTS {report_type: $report_type}]->(reported:NostrUser {pubkey: $reported_pubkey}) + RETURN r.created_by_event AS event_id + LIMIT 1 + ` + + params := map[string]any{ + "reporter_pubkey": reporterPubkey, + "reported_pubkey": reportedPubkey, + "report_type": reportType, + } + + result, err := p.db.ExecuteRead(ctx, cypher, params) + if err != nil { + return "", err + } + + if result.Next(ctx) { + record := result.Record() + if eventID, ok := record.Values[0].(string); ok { + return eventID, nil + } + } + + return "", nil +} + +// markReportEventSuperseded marks an older ProcessedSocialEvent as superseded by a newer one +func (p *SocialEventProcessor) markReportEventSuperseded(ctx context.Context, oldEventID, newEventID string) { + cypher := ` + MATCH (old:ProcessedSocialEvent {event_id: $old_event_id, event_kind: 1984}) + SET old.superseded_by = $new_event_id + ` + + params := map[string]any{ + "old_event_id": oldEventID, + "new_event_id": newEventID, + } + + // Ignore errors - old event may not exist + p.db.ExecuteWrite(ctx, cypher, params) +} + // UpdateContactListParams holds parameters for contact list graph update type UpdateContactListParams struct { AuthorPubkey string diff --git a/pkg/neo4j/social-event-processor_test.go b/pkg/neo4j/social-event-processor_test.go index e9dcbfc..4516b86 100644 --- a/pkg/neo4j/social-event-processor_test.go +++ b/pkg/neo4j/social-event-processor_test.go @@ -737,3 +737,264 @@ func BenchmarkDiffComputation(b *testing.B) { _, _ = diffStringSlices(old, new) } } + +// TestReportDeduplication tests that duplicate REPORTS are deduplicated +func TestReportDeduplication(t *testing.T) { + if testDB == nil { + t.Skip("Neo4j not available") + } + + ctx := context.Background() + + t.Run("DeduplicateSameType", func(t *testing.T) { + // Clean database for this subtest + cleanTestDatabase() + + reporter := generateTestKeypair(t, "reporter") + reported := generateTestKeypair(t, "reported") + + reporterPubkey := hex.Enc(reporter.pubkey[:]) + reportedPubkey := hex.Enc(reported.pubkey[:]) + + // Create first report (older timestamp) + ev1 := event.New() + ev1.Pubkey = reporter.pubkey + ev1.CreatedAt = 1000 + ev1.Kind = 1984 + ev1.Tags = tag.NewS( + tag.NewFromAny("p", reportedPubkey, "impersonation"), + ) + ev1.Content = []byte("First report") + + if err := ev1.Sign(reporter.signer); err != nil { + t.Fatalf("Failed to sign first event: %v", err) + } + + if _, err := testDB.SaveEvent(ctx, ev1); err != nil { + t.Fatalf("Failed to save first report: %v", err) + } + + // Create second report (newer timestamp, same type) + ev2 := event.New() + ev2.Pubkey = reporter.pubkey + ev2.CreatedAt = 2000 // Newer timestamp + ev2.Kind = 1984 + ev2.Tags = tag.NewS( + tag.NewFromAny("p", reportedPubkey, "impersonation"), + ) + ev2.Content = []byte("Second report") + + if err := ev2.Sign(reporter.signer); err != nil { + t.Fatalf("Failed to sign second event: %v", err) + } + + if _, err := testDB.SaveEvent(ctx, ev2); err != nil { + t.Fatalf("Failed to save second report: %v", err) + } + + // Verify only ONE REPORTS relationship exists + cypher := ` + MATCH (r:NostrUser {pubkey: $reporter})-[rel:REPORTS]->(d:NostrUser {pubkey: $reported}) + RETURN count(rel) AS count, rel.created_at AS created_at, rel.created_by_event AS event_id + ` + params := map[string]any{ + "reporter": reporterPubkey, + "reported": reportedPubkey, + } + + result, err := testDB.ExecuteRead(ctx, cypher, params) + if err != nil { + t.Fatalf("Failed to query REPORTS: %v", err) + } + + if !result.Next(ctx) { + t.Fatal("No REPORTS relationship found") + } + + record := result.Record() + count := record.Values[0].(int64) + createdAt := record.Values[1].(int64) + eventID := record.Values[2].(string) + + if count != 1 { + t.Errorf("Expected 1 REPORTS relationship, got %d", count) + } + + // Verify the relationship has the newer event's data + if createdAt != 2000 { + t.Errorf("Expected created_at=2000 (newer), got %d", createdAt) + } + + ev2ID := hex.Enc(ev2.ID[:]) + if eventID != ev2ID { + t.Errorf("Expected event_id=%s, got %s", ev2ID, eventID) + } + + t.Log("✓ Duplicate reports correctly deduplicated to single relationship with newest data") + }) + + t.Run("DifferentTypesAllowed", func(t *testing.T) { + // Clean database for this subtest + cleanTestDatabase() + + reporter := generateTestKeypair(t, "reporter2") + reported := generateTestKeypair(t, "reported2") + + reporterPubkey := hex.Enc(reporter.pubkey[:]) + reportedPubkey := hex.Enc(reported.pubkey[:]) + + // Report for impersonation + ev1 := event.New() + ev1.Pubkey = reporter.pubkey + ev1.CreatedAt = 1000 + ev1.Kind = 1984 + ev1.Tags = tag.NewS( + tag.NewFromAny("p", reportedPubkey, "impersonation"), + ) + + if err := ev1.Sign(reporter.signer); err != nil { + t.Fatalf("Failed to sign event: %v", err) + } + + if _, err := testDB.SaveEvent(ctx, ev1); err != nil { + t.Fatalf("Failed to save report: %v", err) + } + + // Report for spam (different type) + ev2 := event.New() + ev2.Pubkey = reporter.pubkey + ev2.CreatedAt = 2000 + ev2.Kind = 1984 + ev2.Tags = tag.NewS( + tag.NewFromAny("p", reportedPubkey, "spam"), + ) + + if err := ev2.Sign(reporter.signer); err != nil { + t.Fatalf("Failed to sign event: %v", err) + } + + if _, err := testDB.SaveEvent(ctx, ev2); err != nil { + t.Fatalf("Failed to save report: %v", err) + } + + // Verify TWO REPORTS relationships exist (different types) + cypher := ` + MATCH (r:NostrUser {pubkey: $reporter})-[rel:REPORTS]->(d:NostrUser {pubkey: $reported}) + RETURN rel.report_type AS type ORDER BY type + ` + params := map[string]any{ + "reporter": reporterPubkey, + "reported": reportedPubkey, + } + + result, err := testDB.ExecuteRead(ctx, cypher, params) + if err != nil { + t.Fatalf("Failed to query REPORTS: %v", err) + } + + var types []string + for result.Next(ctx) { + types = append(types, result.Record().Values[0].(string)) + } + + if len(types) != 2 { + t.Errorf("Expected 2 REPORTS relationships, got %d", len(types)) + } + + if len(types) >= 2 && (types[0] != "impersonation" || types[1] != "spam") { + t.Errorf("Expected [impersonation, spam], got %v", types) + } + + t.Log("✓ Different report types correctly create separate relationships") + }) + + t.Run("SupersededEventTracking", func(t *testing.T) { + // Clean database for this subtest + cleanTestDatabase() + + reporter := generateTestKeypair(t, "reporter3") + reported := generateTestKeypair(t, "reported3") + + reporterPubkey := hex.Enc(reporter.pubkey[:]) + reportedPubkey := hex.Enc(reported.pubkey[:]) + + // Create first report + ev1 := event.New() + ev1.Pubkey = reporter.pubkey + ev1.CreatedAt = 1000 + ev1.Kind = 1984 + ev1.Tags = tag.NewS( + tag.NewFromAny("p", reportedPubkey, "spam"), + ) + + if err := ev1.Sign(reporter.signer); err != nil { + t.Fatalf("Failed to sign first event: %v", err) + } + + if _, err := testDB.SaveEvent(ctx, ev1); err != nil { + t.Fatalf("Failed to save first report: %v", err) + } + + ev1ID := hex.Enc(ev1.ID[:]) + + // Create second report (supersedes first) + ev2 := event.New() + ev2.Pubkey = reporter.pubkey + ev2.CreatedAt = 2000 + ev2.Kind = 1984 + ev2.Tags = tag.NewS( + tag.NewFromAny("p", reportedPubkey, "spam"), + ) + + if err := ev2.Sign(reporter.signer); err != nil { + t.Fatalf("Failed to sign second event: %v", err) + } + + if _, err := testDB.SaveEvent(ctx, ev2); err != nil { + t.Fatalf("Failed to save second report: %v", err) + } + + ev2ID := hex.Enc(ev2.ID[:]) + + // Verify first ProcessedSocialEvent is superseded + cypher := ` + MATCH (evt:ProcessedSocialEvent {event_id: $event_id, event_kind: 1984}) + RETURN evt.superseded_by AS superseded_by + ` + params := map[string]any{"event_id": ev1ID} + + result, err := testDB.ExecuteRead(ctx, cypher, params) + if err != nil { + t.Fatalf("Failed to query ProcessedSocialEvent: %v", err) + } + + if !result.Next(ctx) { + t.Fatal("First ProcessedSocialEvent not found") + } + + supersededBy := result.Record().Values[0] + if supersededBy == nil { + t.Error("Expected first event to be superseded, but superseded_by is null") + } else if supersededBy.(string) != ev2ID { + t.Errorf("Expected superseded_by=%s, got %v", ev2ID, supersededBy) + } + + // Verify second ProcessedSocialEvent is NOT superseded + params = map[string]any{"event_id": ev2ID} + result, err = testDB.ExecuteRead(ctx, cypher, params) + if err != nil { + t.Fatalf("Failed to query second ProcessedSocialEvent: %v", err) + } + + if !result.Next(ctx) { + t.Fatal("Second ProcessedSocialEvent not found") + } + + supersededBy = result.Record().Values[0] + if supersededBy != nil { + t.Errorf("Expected second event not to be superseded, but superseded_by=%v", supersededBy) + } + + t.Log("✓ ProcessedSocialEvent correctly tracks superseded events") + }) +} diff --git a/pkg/version/version b/pkg/version/version index e1d6235..303cd19 100644 --- a/pkg/version/version +++ b/pkg/version/version @@ -1 +1 @@ -v0.36.0 +v0.36.1