Some checks failed
Go / build-and-release (push) Has been cancelled
- Change processReport() to use MERGE instead of CREATE for REPORTS relationships, deduplicating by (reporter, reported, report_type) - Add ON CREATE/ON MATCH clauses to preserve newest event data while preventing duplicate relationships - Add getExistingReportEvent() helper to check for existing reports - Add markReportEventSuperseded() to track superseded events - Add v4 migration migrateDeduplicateReports() to clean up existing duplicate REPORTS relationships in databases - Add comprehensive tests: TestReportDeduplication with subtests for deduplication, different types, and superseded event tracking - Update WOT_SPEC.md with REPORTS deduplication behavior and correct property names (report_type, created_at, created_by_event) - Bump version to v0.36.1 Fixes: https://git.nostrdev.com/mleku/next.orly.dev/issues/16 Files modified: - pkg/neo4j/social-event-processor.go: MERGE-based deduplication - pkg/neo4j/migrations.go: v4 migration for duplicate cleanup - pkg/neo4j/social-event-processor_test.go: Deduplication tests - pkg/neo4j/WOT_SPEC.md: Updated REPORTS documentation - pkg/version/version: Bump to v0.36.1 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
598 lines
19 KiB
Go
598 lines
19 KiB
Go
package neo4j
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
)
|
|
|
|
// Migration represents a database migration with a version identifier
|
|
type Migration struct {
|
|
Version string
|
|
Description string
|
|
Migrate func(ctx context.Context, n *N) error
|
|
}
|
|
|
|
// migrations is the ordered list of database migrations
|
|
// Migrations are applied in order and tracked via Marker nodes
|
|
var migrations = []Migration{
|
|
{
|
|
Version: "v1",
|
|
Description: "Merge Author nodes into NostrUser nodes",
|
|
Migrate: migrateAuthorToNostrUser,
|
|
},
|
|
{
|
|
Version: "v2",
|
|
Description: "Clean up binary-encoded pubkeys and event IDs to lowercase hex",
|
|
Migrate: migrateBinaryToHex,
|
|
},
|
|
{
|
|
Version: "v3",
|
|
Description: "Convert direct REFERENCES/MENTIONS relationships to Tag-based model",
|
|
Migrate: migrateToTagBasedReferences,
|
|
},
|
|
{
|
|
Version: "v4",
|
|
Description: "Deduplicate REPORTS relationships by (reporter, reported, report_type)",
|
|
Migrate: migrateDeduplicateReports,
|
|
},
|
|
}
|
|
|
|
// RunMigrations executes all pending migrations
|
|
func (n *N) RunMigrations() {
|
|
ctx := context.Background()
|
|
|
|
for _, migration := range migrations {
|
|
// Check if migration has already been applied
|
|
if n.migrationApplied(ctx, migration.Version) {
|
|
n.Logger.Infof("migration %s already applied, skipping", migration.Version)
|
|
continue
|
|
}
|
|
|
|
n.Logger.Infof("applying migration %s: %s", migration.Version, migration.Description)
|
|
|
|
if err := migration.Migrate(ctx, n); err != nil {
|
|
n.Logger.Errorf("migration %s failed: %v", migration.Version, err)
|
|
// Continue to next migration - don't fail startup
|
|
continue
|
|
}
|
|
|
|
// Mark migration as complete
|
|
if err := n.markMigrationComplete(ctx, migration.Version, migration.Description); err != nil {
|
|
n.Logger.Warningf("failed to mark migration %s as complete: %v", migration.Version, err)
|
|
}
|
|
|
|
n.Logger.Infof("migration %s completed successfully", migration.Version)
|
|
}
|
|
}
|
|
|
|
// migrationApplied checks if a migration has already been applied
|
|
func (n *N) migrationApplied(ctx context.Context, version string) bool {
|
|
cypher := `
|
|
MATCH (m:Migration {version: $version})
|
|
RETURN m.version
|
|
`
|
|
result, err := n.ExecuteRead(ctx, cypher, map[string]any{"version": version})
|
|
if err != nil {
|
|
return false
|
|
}
|
|
return result.Next(ctx)
|
|
}
|
|
|
|
// markMigrationComplete marks a migration as applied
|
|
func (n *N) markMigrationComplete(ctx context.Context, version, description string) error {
|
|
cypher := `
|
|
CREATE (m:Migration {
|
|
version: $version,
|
|
description: $description,
|
|
applied_at: timestamp()
|
|
})
|
|
`
|
|
_, err := n.ExecuteWrite(ctx, cypher, map[string]any{
|
|
"version": version,
|
|
"description": description,
|
|
})
|
|
return err
|
|
}
|
|
|
|
// migrateAuthorToNostrUser migrates Author nodes to NostrUser nodes
|
|
// This consolidates the separate Author (NIP-01) and NostrUser (WoT) labels
|
|
// into a unified NostrUser label for the social graph
|
|
func migrateAuthorToNostrUser(ctx context.Context, n *N) error {
|
|
// Step 1: Check if there are any Author nodes to migrate
|
|
countCypher := `MATCH (a:Author) RETURN count(a) AS count`
|
|
countResult, err := n.ExecuteRead(ctx, countCypher, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to count Author nodes: %w", err)
|
|
}
|
|
|
|
var authorCount int64
|
|
if countResult.Next(ctx) {
|
|
record := countResult.Record()
|
|
if count, ok := record.Values[0].(int64); ok {
|
|
authorCount = count
|
|
}
|
|
}
|
|
|
|
if authorCount == 0 {
|
|
n.Logger.Infof("no Author nodes to migrate")
|
|
return nil
|
|
}
|
|
|
|
n.Logger.Infof("migrating %d Author nodes to NostrUser", authorCount)
|
|
|
|
// Step 2: For each Author node, merge into NostrUser with same pubkey
|
|
// This uses MERGE to either match existing NostrUser or create new one
|
|
// Then copies any relationships from Author to NostrUser
|
|
mergeCypher := `
|
|
// Match all Author nodes
|
|
MATCH (a:Author)
|
|
|
|
// For each Author, merge into NostrUser (creates if doesn't exist)
|
|
MERGE (u:NostrUser {pubkey: a.pubkey})
|
|
ON CREATE SET u.created_at = timestamp(), u.migrated_from_author = true
|
|
|
|
// Return count for logging
|
|
RETURN count(DISTINCT a) AS migrated
|
|
`
|
|
|
|
result, err := n.ExecuteWrite(ctx, mergeCypher, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to merge Author nodes to NostrUser: %w", err)
|
|
}
|
|
|
|
// Log result (result consumption happens within the session)
|
|
_ = result
|
|
|
|
// Step 3: Migrate AUTHORED_BY relationships from Author to NostrUser
|
|
// Events should now point to NostrUser instead of Author
|
|
relationshipCypher := `
|
|
// Find events linked to Author via AUTHORED_BY
|
|
MATCH (e:Event)-[r:AUTHORED_BY]->(a:Author)
|
|
|
|
// Get or create the corresponding NostrUser
|
|
MATCH (u:NostrUser {pubkey: a.pubkey})
|
|
|
|
// Create new relationship to NostrUser if it doesn't exist
|
|
MERGE (e)-[:AUTHORED_BY]->(u)
|
|
|
|
// Delete old relationship to Author
|
|
DELETE r
|
|
|
|
RETURN count(r) AS migrated_relationships
|
|
`
|
|
|
|
_, err = n.ExecuteWrite(ctx, relationshipCypher, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to migrate AUTHORED_BY relationships: %w", err)
|
|
}
|
|
|
|
// Step 4: Migrate MENTIONS relationships from Author to NostrUser
|
|
mentionsCypher := `
|
|
// Find events with MENTIONS to Author
|
|
MATCH (e:Event)-[r:MENTIONS]->(a:Author)
|
|
|
|
// Get or create the corresponding NostrUser
|
|
MATCH (u:NostrUser {pubkey: a.pubkey})
|
|
|
|
// Create new relationship to NostrUser if it doesn't exist
|
|
MERGE (e)-[:MENTIONS]->(u)
|
|
|
|
// Delete old relationship to Author
|
|
DELETE r
|
|
|
|
RETURN count(r) AS migrated_mentions
|
|
`
|
|
|
|
_, err = n.ExecuteWrite(ctx, mentionsCypher, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to migrate MENTIONS relationships: %w", err)
|
|
}
|
|
|
|
// Step 5: Delete orphaned Author nodes (no longer needed)
|
|
deleteCypher := `
|
|
// Find Author nodes with no remaining relationships
|
|
MATCH (a:Author)
|
|
WHERE NOT (a)<-[:AUTHORED_BY]-() AND NOT (a)<-[:MENTIONS]-()
|
|
DETACH DELETE a
|
|
RETURN count(a) AS deleted
|
|
`
|
|
|
|
_, err = n.ExecuteWrite(ctx, deleteCypher, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to delete orphaned Author nodes: %w", err)
|
|
}
|
|
|
|
// Step 6: Drop the old Author constraint if it exists
|
|
dropConstraintCypher := `DROP CONSTRAINT author_pubkey_unique IF EXISTS`
|
|
_, _ = n.ExecuteWrite(ctx, dropConstraintCypher, nil)
|
|
// Ignore error as constraint may not exist
|
|
|
|
n.Logger.Infof("completed Author to NostrUser migration")
|
|
return nil
|
|
}
|
|
|
|
// migrateBinaryToHex cleans up any binary-encoded pubkeys and event IDs
|
|
// The nostr library stores e/p tag values in binary format (33 bytes with null terminator),
|
|
// but Neo4j should store them as lowercase hex strings for consistent querying.
|
|
// This migration:
|
|
// 1. Finds NostrUser nodes with invalid (non-hex) pubkeys and deletes them
|
|
// 2. Finds Event nodes with invalid pubkeys/IDs and deletes them
|
|
// 3. Finds Tag nodes (type 'e' or 'p') with invalid values and deletes them
|
|
// 4. Cleans up MENTIONS relationships pointing to invalid NostrUser nodes
|
|
func migrateBinaryToHex(ctx context.Context, n *N) error {
|
|
// Step 1: Count problematic nodes before cleanup
|
|
n.Logger.Infof("scanning for binary-encoded values in Neo4j...")
|
|
|
|
// Check for NostrUser nodes with invalid pubkeys (not 64 char hex)
|
|
// A valid hex pubkey is exactly 64 lowercase hex characters
|
|
countInvalidUsersCypher := `
|
|
MATCH (u:NostrUser)
|
|
WHERE size(u.pubkey) <> 64
|
|
OR NOT u.pubkey =~ '^[0-9a-f]{64}$'
|
|
RETURN count(u) AS count
|
|
`
|
|
result, err := n.ExecuteRead(ctx, countInvalidUsersCypher, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to count invalid NostrUser nodes: %w", err)
|
|
}
|
|
|
|
var invalidUserCount int64
|
|
if result.Next(ctx) {
|
|
if count, ok := result.Record().Values[0].(int64); ok {
|
|
invalidUserCount = count
|
|
}
|
|
}
|
|
n.Logger.Infof("found %d NostrUser nodes with invalid pubkeys", invalidUserCount)
|
|
|
|
// Check for Event nodes with invalid pubkeys or IDs
|
|
countInvalidEventsCypher := `
|
|
MATCH (e:Event)
|
|
WHERE (size(e.pubkey) <> 64 OR NOT e.pubkey =~ '^[0-9a-f]{64}$')
|
|
OR (size(e.id) <> 64 OR NOT e.id =~ '^[0-9a-f]{64}$')
|
|
RETURN count(e) AS count
|
|
`
|
|
result, err = n.ExecuteRead(ctx, countInvalidEventsCypher, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to count invalid Event nodes: %w", err)
|
|
}
|
|
|
|
var invalidEventCount int64
|
|
if result.Next(ctx) {
|
|
if count, ok := result.Record().Values[0].(int64); ok {
|
|
invalidEventCount = count
|
|
}
|
|
}
|
|
n.Logger.Infof("found %d Event nodes with invalid pubkeys or IDs", invalidEventCount)
|
|
|
|
// Check for Tag nodes (e/p type) with invalid values
|
|
countInvalidTagsCypher := `
|
|
MATCH (t:Tag)
|
|
WHERE t.type IN ['e', 'p']
|
|
AND (size(t.value) <> 64 OR NOT t.value =~ '^[0-9a-f]{64}$')
|
|
RETURN count(t) AS count
|
|
`
|
|
result, err = n.ExecuteRead(ctx, countInvalidTagsCypher, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to count invalid Tag nodes: %w", err)
|
|
}
|
|
|
|
var invalidTagCount int64
|
|
if result.Next(ctx) {
|
|
if count, ok := result.Record().Values[0].(int64); ok {
|
|
invalidTagCount = count
|
|
}
|
|
}
|
|
n.Logger.Infof("found %d Tag nodes (e/p type) with invalid values", invalidTagCount)
|
|
|
|
// If nothing to clean up, we're done
|
|
if invalidUserCount == 0 && invalidEventCount == 0 && invalidTagCount == 0 {
|
|
n.Logger.Infof("no binary-encoded values found, migration complete")
|
|
return nil
|
|
}
|
|
|
|
// Step 2: Delete invalid NostrUser nodes and their relationships
|
|
if invalidUserCount > 0 {
|
|
n.Logger.Infof("deleting %d invalid NostrUser nodes...", invalidUserCount)
|
|
deleteInvalidUsersCypher := `
|
|
MATCH (u:NostrUser)
|
|
WHERE size(u.pubkey) <> 64
|
|
OR NOT u.pubkey =~ '^[0-9a-f]{64}$'
|
|
DETACH DELETE u
|
|
`
|
|
_, err = n.ExecuteWrite(ctx, deleteInvalidUsersCypher, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to delete invalid NostrUser nodes: %w", err)
|
|
}
|
|
n.Logger.Infof("deleted %d invalid NostrUser nodes", invalidUserCount)
|
|
}
|
|
|
|
// Step 3: Delete invalid Event nodes and their relationships
|
|
if invalidEventCount > 0 {
|
|
n.Logger.Infof("deleting %d invalid Event nodes...", invalidEventCount)
|
|
deleteInvalidEventsCypher := `
|
|
MATCH (e:Event)
|
|
WHERE (size(e.pubkey) <> 64 OR NOT e.pubkey =~ '^[0-9a-f]{64}$')
|
|
OR (size(e.id) <> 64 OR NOT e.id =~ '^[0-9a-f]{64}$')
|
|
DETACH DELETE e
|
|
`
|
|
_, err = n.ExecuteWrite(ctx, deleteInvalidEventsCypher, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to delete invalid Event nodes: %w", err)
|
|
}
|
|
n.Logger.Infof("deleted %d invalid Event nodes", invalidEventCount)
|
|
}
|
|
|
|
// Step 4: Delete invalid Tag nodes (e/p type) and their relationships
|
|
if invalidTagCount > 0 {
|
|
n.Logger.Infof("deleting %d invalid Tag nodes...", invalidTagCount)
|
|
deleteInvalidTagsCypher := `
|
|
MATCH (t:Tag)
|
|
WHERE t.type IN ['e', 'p']
|
|
AND (size(t.value) <> 64 OR NOT t.value =~ '^[0-9a-f]{64}$')
|
|
DETACH DELETE t
|
|
`
|
|
_, err = n.ExecuteWrite(ctx, deleteInvalidTagsCypher, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to delete invalid Tag nodes: %w", err)
|
|
}
|
|
n.Logger.Infof("deleted %d invalid Tag nodes", invalidTagCount)
|
|
}
|
|
|
|
// Step 5: Clean up any orphaned MENTIONS/REFERENCES relationships
|
|
// These would be relationships pointing to nodes we just deleted
|
|
cleanupOrphanedCypher := `
|
|
// Clean up any ProcessedSocialEvent nodes with invalid pubkeys
|
|
MATCH (p:ProcessedSocialEvent)
|
|
WHERE size(p.pubkey) <> 64
|
|
OR NOT p.pubkey =~ '^[0-9a-f]{64}$'
|
|
DETACH DELETE p
|
|
`
|
|
_, _ = n.ExecuteWrite(ctx, cleanupOrphanedCypher, nil)
|
|
// Ignore errors - best effort cleanup
|
|
|
|
n.Logger.Infof("binary-to-hex migration completed successfully")
|
|
return nil
|
|
}
|
|
|
|
// migrateToTagBasedReferences converts direct REFERENCES and MENTIONS relationships
|
|
// to the new Tag-based model where:
|
|
// - Event-[:REFERENCES]->Event becomes Event-[:TAGGED_WITH]->Tag-[:REFERENCES]->Event
|
|
// - Event-[:MENTIONS]->NostrUser becomes Event-[:TAGGED_WITH]->Tag-[:REFERENCES]->NostrUser
|
|
//
|
|
// This enables unified tag querying via #e and #p filters while maintaining graph traversal.
|
|
func migrateToTagBasedReferences(ctx context.Context, n *N) error {
|
|
// Step 1: Count existing direct REFERENCES relationships (Event->Event)
|
|
countRefCypher := `
|
|
MATCH (source:Event)-[r:REFERENCES]->(target:Event)
|
|
RETURN count(r) AS count
|
|
`
|
|
result, err := n.ExecuteRead(ctx, countRefCypher, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to count REFERENCES relationships: %w", err)
|
|
}
|
|
|
|
var refCount int64
|
|
if result.Next(ctx) {
|
|
if count, ok := result.Record().Values[0].(int64); ok {
|
|
refCount = count
|
|
}
|
|
}
|
|
n.Logger.Infof("found %d direct Event-[:REFERENCES]->Event relationships to migrate", refCount)
|
|
|
|
// Step 2: Count existing direct MENTIONS relationships (Event->NostrUser)
|
|
countMentionsCypher := `
|
|
MATCH (source:Event)-[r:MENTIONS]->(target:NostrUser)
|
|
RETURN count(r) AS count
|
|
`
|
|
result, err = n.ExecuteRead(ctx, countMentionsCypher, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to count MENTIONS relationships: %w", err)
|
|
}
|
|
|
|
var mentionsCount int64
|
|
if result.Next(ctx) {
|
|
if count, ok := result.Record().Values[0].(int64); ok {
|
|
mentionsCount = count
|
|
}
|
|
}
|
|
n.Logger.Infof("found %d direct Event-[:MENTIONS]->NostrUser relationships to migrate", mentionsCount)
|
|
|
|
// If nothing to migrate, we're done
|
|
if refCount == 0 && mentionsCount == 0 {
|
|
n.Logger.Infof("no direct relationships to migrate, migration complete")
|
|
return nil
|
|
}
|
|
|
|
// Step 3: Migrate REFERENCES relationships to Tag-based model
|
|
// Process in batches to avoid memory issues with large datasets
|
|
if refCount > 0 {
|
|
n.Logger.Infof("migrating %d REFERENCES relationships to Tag-based model...", refCount)
|
|
|
|
// This query:
|
|
// 1. Finds Event->Event REFERENCES relationships
|
|
// 2. Creates/merges Tag node with type='e' and value=target event ID
|
|
// 3. Creates TAGGED_WITH from source Event to Tag
|
|
// 4. Creates REFERENCES from Tag to target Event
|
|
// 5. Deletes the old direct REFERENCES relationship
|
|
migrateRefCypher := `
|
|
MATCH (source:Event)-[r:REFERENCES]->(target:Event)
|
|
WITH source, r, target LIMIT 1000
|
|
MERGE (t:Tag {type: 'e', value: target.id})
|
|
CREATE (source)-[:TAGGED_WITH]->(t)
|
|
MERGE (t)-[:REFERENCES]->(target)
|
|
DELETE r
|
|
RETURN count(r) AS migrated
|
|
`
|
|
|
|
// Run migration in batches until no more relationships exist
|
|
totalMigrated := int64(0)
|
|
for {
|
|
result, err := n.ExecuteWrite(ctx, migrateRefCypher, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to migrate REFERENCES batch: %w", err)
|
|
}
|
|
|
|
var batchMigrated int64
|
|
if result.Next(ctx) {
|
|
if count, ok := result.Record().Values[0].(int64); ok {
|
|
batchMigrated = count
|
|
}
|
|
}
|
|
|
|
if batchMigrated == 0 {
|
|
break
|
|
}
|
|
totalMigrated += batchMigrated
|
|
n.Logger.Infof("migrated %d REFERENCES relationships (total: %d)", batchMigrated, totalMigrated)
|
|
}
|
|
|
|
n.Logger.Infof("completed migrating %d REFERENCES relationships", totalMigrated)
|
|
}
|
|
|
|
// Step 4: Migrate MENTIONS relationships to Tag-based model
|
|
if mentionsCount > 0 {
|
|
n.Logger.Infof("migrating %d MENTIONS relationships to Tag-based model...", mentionsCount)
|
|
|
|
// This query:
|
|
// 1. Finds Event->NostrUser MENTIONS relationships
|
|
// 2. Creates/merges Tag node with type='p' and value=target pubkey
|
|
// 3. Creates TAGGED_WITH from source Event to Tag
|
|
// 4. Creates REFERENCES from Tag to target NostrUser
|
|
// 5. Deletes the old direct MENTIONS relationship
|
|
migrateMentionsCypher := `
|
|
MATCH (source:Event)-[r:MENTIONS]->(target:NostrUser)
|
|
WITH source, r, target LIMIT 1000
|
|
MERGE (t:Tag {type: 'p', value: target.pubkey})
|
|
CREATE (source)-[:TAGGED_WITH]->(t)
|
|
MERGE (t)-[:REFERENCES]->(target)
|
|
DELETE r
|
|
RETURN count(r) AS migrated
|
|
`
|
|
|
|
// Run migration in batches until no more relationships exist
|
|
totalMigrated := int64(0)
|
|
for {
|
|
result, err := n.ExecuteWrite(ctx, migrateMentionsCypher, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to migrate MENTIONS batch: %w", err)
|
|
}
|
|
|
|
var batchMigrated int64
|
|
if result.Next(ctx) {
|
|
if count, ok := result.Record().Values[0].(int64); ok {
|
|
batchMigrated = count
|
|
}
|
|
}
|
|
|
|
if batchMigrated == 0 {
|
|
break
|
|
}
|
|
totalMigrated += batchMigrated
|
|
n.Logger.Infof("migrated %d MENTIONS relationships (total: %d)", batchMigrated, totalMigrated)
|
|
}
|
|
|
|
n.Logger.Infof("completed migrating %d MENTIONS relationships", totalMigrated)
|
|
}
|
|
|
|
n.Logger.Infof("Tag-based references migration completed successfully")
|
|
return nil
|
|
}
|
|
|
|
// migrateDeduplicateReports removes duplicate REPORTS relationships
|
|
// Prior to this migration, processReport() used CREATE which allowed multiple
|
|
// REPORTS relationships with the same report_type between the same two users.
|
|
// This migration keeps only the most recent report (by created_at) for each
|
|
// (reporter, reported, report_type) combination.
|
|
func migrateDeduplicateReports(ctx context.Context, n *N) error {
|
|
// Step 1: Count duplicate REPORTS relationships
|
|
// Duplicates are defined as multiple REPORTS with the same (reporter, reported, report_type)
|
|
countDuplicatesCypher := `
|
|
MATCH (reporter:NostrUser)-[r:REPORTS]->(reported:NostrUser)
|
|
WITH reporter, reported, r.report_type AS type, collect(r) AS rels
|
|
WHERE size(rels) > 1
|
|
RETURN sum(size(rels) - 1) AS duplicate_count
|
|
`
|
|
result, err := n.ExecuteRead(ctx, countDuplicatesCypher, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to count duplicate REPORTS: %w", err)
|
|
}
|
|
|
|
var duplicateCount int64
|
|
if result.Next(ctx) {
|
|
if count, ok := result.Record().Values[0].(int64); ok {
|
|
duplicateCount = count
|
|
}
|
|
}
|
|
|
|
if duplicateCount == 0 {
|
|
n.Logger.Infof("no duplicate REPORTS relationships found, migration complete")
|
|
return nil
|
|
}
|
|
|
|
n.Logger.Infof("found %d duplicate REPORTS relationships to remove", duplicateCount)
|
|
|
|
// Step 2: Delete duplicate REPORTS, keeping the one with the highest created_at
|
|
// This query:
|
|
// 1. Groups REPORTS by (reporter, reported, report_type)
|
|
// 2. Finds the maximum created_at for each group
|
|
// 3. Deletes all relationships in the group except the newest one
|
|
deleteDuplicatesCypher := `
|
|
MATCH (reporter:NostrUser)-[r:REPORTS]->(reported:NostrUser)
|
|
WITH reporter, reported, r.report_type AS type,
|
|
collect(r) AS rels, max(r.created_at) AS maxCreatedAt
|
|
WHERE size(rels) > 1
|
|
UNWIND rels AS rel
|
|
WITH rel, maxCreatedAt
|
|
WHERE rel.created_at < maxCreatedAt
|
|
DELETE rel
|
|
RETURN count(*) AS deleted
|
|
`
|
|
|
|
writeResult, err := n.ExecuteWrite(ctx, deleteDuplicatesCypher, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to delete duplicate REPORTS: %w", err)
|
|
}
|
|
|
|
var deletedCount int64
|
|
if writeResult.Next(ctx) {
|
|
if count, ok := writeResult.Record().Values[0].(int64); ok {
|
|
deletedCount = count
|
|
}
|
|
}
|
|
|
|
n.Logger.Infof("deleted %d duplicate REPORTS relationships", deletedCount)
|
|
|
|
// Step 3: Mark superseded ProcessedSocialEvent nodes for deleted reports
|
|
// Find ProcessedSocialEvent nodes (kind 1984) whose event IDs are no longer
|
|
// referenced by any REPORTS relationship's created_by_event
|
|
markSupersededCypher := `
|
|
MATCH (evt:ProcessedSocialEvent {event_kind: 1984})
|
|
WHERE evt.superseded_by IS NULL
|
|
AND NOT EXISTS {
|
|
MATCH ()-[r:REPORTS]->()
|
|
WHERE r.created_by_event = evt.event_id
|
|
}
|
|
SET evt.superseded_by = 'migration_v4_dedupe'
|
|
RETURN count(evt) AS superseded
|
|
`
|
|
|
|
markResult, err := n.ExecuteWrite(ctx, markSupersededCypher, nil)
|
|
if err != nil {
|
|
// Non-fatal - just log warning
|
|
n.Logger.Warningf("failed to mark superseded ProcessedSocialEvent nodes: %v", err)
|
|
} else {
|
|
var supersededCount int64
|
|
if markResult.Next(ctx) {
|
|
if count, ok := markResult.Record().Values[0].(int64); ok {
|
|
supersededCount = count
|
|
}
|
|
}
|
|
if supersededCount > 0 {
|
|
n.Logger.Infof("marked %d ProcessedSocialEvent nodes as superseded", supersededCount)
|
|
}
|
|
}
|
|
|
|
n.Logger.Infof("REPORTS deduplication migration completed successfully")
|
|
return nil
|
|
}
|