Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
9d6280eab1
|
|||
|
96bdf5cba2
|
21
CLAUDE.md
21
CLAUDE.md
@@ -235,11 +235,18 @@ export ORLY_AUTH_TO_WRITE=false # Require auth only for writes
|
||||
**`pkg/neo4j/`** - Neo4j graph database backend with social graph support
|
||||
- `neo4j.go` - Main database implementation
|
||||
- `schema.go` - Graph schema and index definitions (includes WoT extensions)
|
||||
- `migrations.go` - Database schema migrations (v1: base, v2: WoT, v3: Tag-based e/p)
|
||||
- `query-events.go` - REQ filter to Cypher translation
|
||||
- `save-event.go` - Event storage with relationship creation
|
||||
- `save-event.go` - Event storage with Tag-based relationship creation
|
||||
- `delete.go` - Event deletion (NIP-09) with Tag traversal for deletion detection
|
||||
- `social-event-processor.go` - Processes kinds 0, 3, 1984, 10000 for social graph
|
||||
- `hex_utils.go` - Helpers for binary-to-hex tag value extraction
|
||||
- `WOT_SPEC.md` - Web of Trust data model specification (NostrUser nodes, trust metrics)
|
||||
- `MODIFYING_SCHEMA.md` - Guide for schema modifications
|
||||
- **Tests:**
|
||||
- `tag_model_test.go` - Tag-based e/p model and filter query tests
|
||||
- `save-event_test.go` - Event storage and relationship tests
|
||||
- `social-event-processor_test.go` - Social graph event processing tests
|
||||
|
||||
**`pkg/protocol/`** - Nostr protocol implementation
|
||||
- `ws/` - WebSocket message framing and parsing
|
||||
@@ -349,6 +356,11 @@ export ORLY_AUTH_TO_WRITE=false # Require auth only for writes
|
||||
- Supports multiple backends via `ORLY_DB_TYPE` environment variable
|
||||
- **Badger** (default): Embedded key-value store with custom indexing, ideal for single-instance deployments
|
||||
- **Neo4j**: Graph database with social graph and Web of Trust (WoT) extensions
|
||||
- **Tag-Based e/p Model**: All tags stored through intermediate Tag nodes
|
||||
- `Event-[:TAGGED_WITH]->Tag{type:'e'}-[:REFERENCES]->Event` for e-tags
|
||||
- `Event-[:TAGGED_WITH]->Tag{type:'p'}-[:REFERENCES]->NostrUser` for p-tags
|
||||
- Enables unified querying: `#e` and `#p` filter queries work correctly
|
||||
- Automatic migration from direct REFERENCES/MENTIONS (v3 migration)
|
||||
- Processes kinds 0 (profile), 3 (contacts), 1984 (reports), 10000 (mute list) for social graph
|
||||
- NostrUser nodes with trust metrics (influence, PageRank)
|
||||
- FOLLOWS, MUTES, REPORTS relationships for WoT analysis
|
||||
@@ -816,11 +828,18 @@ The directory spider (`pkg/spider/directory.go`) automatically discovers and syn
|
||||
|
||||
### Neo4j Social Graph Backend
|
||||
The Neo4j backend (`pkg/neo4j/`) includes Web of Trust (WoT) extensions:
|
||||
- **Tag-Based e/p Model**: All tags (including e/p) stored through intermediate Tag nodes
|
||||
- `Event-[:TAGGED_WITH]->Tag{type:'e'}-[:REFERENCES]->Event`
|
||||
- `Event-[:TAGGED_WITH]->Tag{type:'p'}-[:REFERENCES]->NostrUser`
|
||||
- Enables unified tag querying (`#e` and `#p` filter queries now work)
|
||||
- v3 migration automatically converts existing direct REFERENCES/MENTIONS
|
||||
- **Social Event Processor**: Handles kinds 0, 3, 1984, 10000 for social graph management
|
||||
- **NostrUser nodes**: Store profile data and trust metrics (influence, PageRank)
|
||||
- **Relationships**: FOLLOWS, MUTES, REPORTS for social graph analysis
|
||||
- **Deletion Detection**: `CheckForDeleted()` uses Tag traversal for kind 5 event checks
|
||||
- **WoT Schema**: See `pkg/neo4j/WOT_SPEC.md` for full specification
|
||||
- **Schema Modifications**: See `pkg/neo4j/MODIFYING_SCHEMA.md` for how to update
|
||||
- **Comprehensive Tests**: `tag_model_test.go` covers Tag-based model, filter queries, migrations
|
||||
|
||||
### WasmDB IndexedDB Backend
|
||||
WebAssembly-compatible database backend (`pkg/wasmdb/`):
|
||||
|
||||
@@ -35,10 +35,12 @@ export ORLY_NEO4J_PASSWORD=password
|
||||
## Features
|
||||
|
||||
- **Graph-Native Storage**: Events, authors, and tags stored as nodes and relationships
|
||||
- **Unified Tag Model**: All tags (including e/p tags) stored as Tag nodes with REFERENCES relationships
|
||||
- **Efficient Queries**: Leverages Neo4j's native graph traversal for tag and social graph queries
|
||||
- **Cypher Query Language**: Powerful, expressive query language for complex filters
|
||||
- **Automatic Indexing**: Unique constraints and indexes for optimal performance
|
||||
- **Relationship Queries**: Native support for event references, mentions, and tags
|
||||
- **Automatic Migrations**: Schema migrations run automatically on startup
|
||||
- **Web of Trust (WoT) Extensions**: Optional support for trust metrics, social graph analysis, and content filtering (see [WOT_SPEC.md](./WOT_SPEC.md))
|
||||
|
||||
## Architecture
|
||||
@@ -50,6 +52,23 @@ See [docs/NEO4J_BACKEND.md](../../docs/NEO4J_BACKEND.md) for comprehensive docum
|
||||
- Development guide
|
||||
- Comparison with other backends
|
||||
|
||||
### Tag-Based e/p Model
|
||||
|
||||
All tags, including `e` (event references) and `p` (pubkey mentions), are stored through intermediate Tag nodes:
|
||||
|
||||
```
|
||||
Event -[:TAGGED_WITH]-> Tag{type:'e',value:eventId} -[:REFERENCES]-> Event
|
||||
Event -[:TAGGED_WITH]-> Tag{type:'p',value:pubkey} -[:REFERENCES]-> NostrUser
|
||||
Event -[:TAGGED_WITH]-> Tag{type:'t',value:topic} (no REFERENCES for regular tags)
|
||||
```
|
||||
|
||||
**Benefits:**
|
||||
- Unified tag querying: `#e` and `#p` filter queries work correctly
|
||||
- Consistent data model: All tags use the same TAGGED_WITH pattern
|
||||
- Graph traversal: Can traverse from events through tags to referenced entities
|
||||
|
||||
**Migration:** Existing databases with direct `REFERENCES`/`MENTIONS` relationships are automatically migrated at startup via v3 migration.
|
||||
|
||||
### Web of Trust (WoT) Extensions
|
||||
|
||||
This package includes schema support for Web of Trust trust metrics computation:
|
||||
@@ -96,6 +115,8 @@ This package includes schema support for Web of Trust trust metrics computation:
|
||||
|
||||
### Tests
|
||||
- `social-event-processor_test.go` - Comprehensive tests for kinds 0, 3, 1984, 10000
|
||||
- `tag_model_test.go` - Tag-based e/p model tests and filter query tests
|
||||
- `save-event_test.go` - Event storage and relationship tests
|
||||
|
||||
## Testing
|
||||
|
||||
@@ -166,11 +187,25 @@ MATCH (e:Event)-[:TAGGED_WITH]->(t:Tag {type: "t", value: "bitcoin"})
|
||||
RETURN e
|
||||
```
|
||||
|
||||
### Event reference query (e-tags)
|
||||
```cypher
|
||||
MATCH (e:Event)-[:TAGGED_WITH]->(t:Tag {type: "e"})-[:REFERENCES]->(ref:Event)
|
||||
WHERE e.id = "abc123..."
|
||||
RETURN e, ref
|
||||
```
|
||||
|
||||
### Mentions query (p-tags)
|
||||
```cypher
|
||||
MATCH (e:Event)-[:TAGGED_WITH]->(t:Tag {type: "p"})-[:REFERENCES]->(u:NostrUser)
|
||||
WHERE e.id = "abc123..."
|
||||
RETURN e, u
|
||||
```
|
||||
|
||||
### Social graph query
|
||||
```cypher
|
||||
MATCH (author:NostrUser {pubkey: "abc123..."})
|
||||
<-[:AUTHORED_BY]-(e:Event)
|
||||
-[:MENTIONS]->(mentioned:NostrUser)
|
||||
-[:TAGGED_WITH]->(:Tag {type: "p"})-[:REFERENCES]->(mentioned:NostrUser)
|
||||
RETURN author, e, mentioned
|
||||
```
|
||||
|
||||
|
||||
@@ -125,6 +125,40 @@ Legacy node label that is redundant with SetOfNostrUserWotMetricsCards. Should b
|
||||
|
||||
### Relationship Types
|
||||
|
||||
#### Tag-Based References (e and p tags)
|
||||
|
||||
The Neo4j backend uses a unified Tag-based model for `e` and `p` tags, enabling consistent tag querying while maintaining graph traversal capabilities.
|
||||
|
||||
**E-tags (Event References):**
|
||||
```
|
||||
(Event)-[:TAGGED_WITH]->(Tag {type: 'e', value: <event_id>})-[:REFERENCES]->(Event)
|
||||
```
|
||||
|
||||
**P-tags (Pubkey Mentions):**
|
||||
```
|
||||
(Event)-[:TAGGED_WITH]->(Tag {type: 'p', value: <pubkey>})-[:REFERENCES]->(NostrUser)
|
||||
```
|
||||
|
||||
This model provides:
|
||||
- Unified tag querying via `#e` and `#p` filters (same as other tags)
|
||||
- Graph traversal from events to referenced events/users
|
||||
- Consistent indexing through existing Tag node indexes
|
||||
|
||||
**Query Examples:**
|
||||
```cypher
|
||||
-- Find all events that reference a specific event
|
||||
MATCH (e:Event)-[:TAGGED_WITH]->(t:Tag {type: 'e', value: $eventId})-[:REFERENCES]->(ref:Event)
|
||||
RETURN e
|
||||
|
||||
-- Find all events that mention a specific pubkey
|
||||
MATCH (e:Event)-[:TAGGED_WITH]->(t:Tag {type: 'p', value: $pubkey})-[:REFERENCES]->(u:NostrUser)
|
||||
RETURN e
|
||||
|
||||
-- Count references to an event (thread replies)
|
||||
MATCH (t:Tag {type: 'e', value: $eventId})<-[:TAGGED_WITH]-(e:Event)
|
||||
RETURN count(e) AS replyCount
|
||||
```
|
||||
|
||||
#### 1. FOLLOWS
|
||||
|
||||
Represents a follow relationship between users (derived from kind 3 events).
|
||||
@@ -151,11 +185,18 @@ Represents a report filed against a user (derived from kind 1984 events).
|
||||
|
||||
**Direction:** `(reporter:NostrUser)-[:REPORTS]->(reported:NostrUser)`
|
||||
|
||||
**Properties:**
|
||||
- `reportType` (string) - NIP-56 report type (impersonation, spam, illegal, malware, nsfw, etc.)
|
||||
- `timestamp` (integer) - When the report was filed
|
||||
**Deduplication:** Only one REPORTS relationship exists per (reporter, reported, report_type) combination.
|
||||
Multiple reports of the same type from the same user to the same target update the existing
|
||||
relationship with the most recent event's data. This prevents double-counting in GrapeRank
|
||||
calculations while maintaining audit trails via ProcessedSocialEvent nodes.
|
||||
|
||||
**Source:** Created from kind 1984 (reporting) events
|
||||
**Properties:**
|
||||
- `report_type` (string) - NIP-56 report type (impersonation, spam, illegal, malware, nsfw, etc.)
|
||||
- `created_at` (integer) - Timestamp of the most recent report event
|
||||
- `created_by_event` (string) - Event ID of the most recent report
|
||||
- `relay_received_at` (integer) - When the relay first received any report of this type
|
||||
|
||||
**Source:** Created/updated from kind 1984 (reporting) events
|
||||
|
||||
#### 4. WOT_METRICS_CARDS
|
||||
|
||||
@@ -187,7 +228,7 @@ The WoT model processes the following Nostr event kinds:
|
||||
|------|------|---------|--------------|
|
||||
| 0 | Profile Metadata | User profile information | Update NostrUser properties (npub, name, etc.) |
|
||||
| 3 | Contact List | Follow list | Create/update FOLLOWS relationships |
|
||||
| 1984 | Reporting | Report users/content | Create REPORTS relationships with reportType |
|
||||
| 1984 | Reporting | Report users/content | Create/update REPORTS relationships (deduplicated by report_type) |
|
||||
| 10000 | Mute List | Mute list | Create/update MUTES relationships |
|
||||
| 30382 | Trusted Assertion (NIP-85) | Published trust metrics | Create/update NostrUserWotMetricsCard nodes |
|
||||
|
||||
@@ -247,8 +288,9 @@ Comprehensive implementation with additional features:
|
||||
- `IS_A_REACTION_TO` (kind 7 reactions)
|
||||
- `IS_A_RESPONSE_TO` (kind 1 replies)
|
||||
- `IS_A_REPOST_OF` (kind 6, kind 16 reposts)
|
||||
- `P_TAGGED` (p-tag mentions from events to users)
|
||||
- `E_TAGGED` (e-tag references from events to events)
|
||||
- Tag-based references (see "Tag-Based References" section above):
|
||||
- `Event-[:TAGGED_WITH]->Tag{type:'p'}-[:REFERENCES]->NostrUser` (p-tag mentions)
|
||||
- `Event-[:TAGGED_WITH]->Tag{type:'e'}-[:REFERENCES]->Event` (e-tag references)
|
||||
- NostrRelay, CashuMint nodes for ecosystem mapping
|
||||
- Enhanced GrapeRank incorporating zaps, replies, reactions
|
||||
|
||||
|
||||
@@ -175,14 +175,15 @@ func (n *N) ProcessDelete(ev *event.E, admins [][]byte) error {
|
||||
|
||||
// CheckForDeleted checks if an event has been deleted
|
||||
func (n *N) CheckForDeleted(ev *event.E, admins [][]byte) error {
|
||||
// Query for kind 5 events that reference this event
|
||||
// Query for kind 5 events that reference this event via Tag nodes
|
||||
ctx := context.Background()
|
||||
idStr := hex.Enc(ev.ID[:])
|
||||
|
||||
// Build cypher query to find deletion events
|
||||
// Traverses through Tag nodes: Event-[:TAGGED_WITH]->Tag-[:REFERENCES]->Event
|
||||
cypher := `
|
||||
MATCH (target:Event {id: $targetId})
|
||||
MATCH (delete:Event {kind: 5})-[:REFERENCES]->(target)
|
||||
MATCH (delete:Event {kind: 5})-[:TAGGED_WITH]->(t:Tag {type: 'e'})-[:REFERENCES]->(target)
|
||||
WHERE delete.pubkey = $pubkey OR delete.pubkey IN $admins
|
||||
RETURN delete.id AS id
|
||||
LIMIT 1`
|
||||
|
||||
@@ -25,6 +25,16 @@ var migrations = []Migration{
|
||||
Description: "Clean up binary-encoded pubkeys and event IDs to lowercase hex",
|
||||
Migrate: migrateBinaryToHex,
|
||||
},
|
||||
{
|
||||
Version: "v3",
|
||||
Description: "Convert direct REFERENCES/MENTIONS relationships to Tag-based model",
|
||||
Migrate: migrateToTagBasedReferences,
|
||||
},
|
||||
{
|
||||
Version: "v4",
|
||||
Description: "Deduplicate REPORTS relationships by (reporter, reported, report_type)",
|
||||
Migrate: migrateDeduplicateReports,
|
||||
},
|
||||
}
|
||||
|
||||
// RunMigrations executes all pending migrations
|
||||
@@ -343,3 +353,245 @@ func migrateBinaryToHex(ctx context.Context, n *N) error {
|
||||
n.Logger.Infof("binary-to-hex migration completed successfully")
|
||||
return nil
|
||||
}
|
||||
|
||||
// migrateToTagBasedReferences converts direct REFERENCES and MENTIONS relationships
|
||||
// to the new Tag-based model where:
|
||||
// - Event-[:REFERENCES]->Event becomes Event-[:TAGGED_WITH]->Tag-[:REFERENCES]->Event
|
||||
// - Event-[:MENTIONS]->NostrUser becomes Event-[:TAGGED_WITH]->Tag-[:REFERENCES]->NostrUser
|
||||
//
|
||||
// This enables unified tag querying via #e and #p filters while maintaining graph traversal.
|
||||
func migrateToTagBasedReferences(ctx context.Context, n *N) error {
|
||||
// Step 1: Count existing direct REFERENCES relationships (Event->Event)
|
||||
countRefCypher := `
|
||||
MATCH (source:Event)-[r:REFERENCES]->(target:Event)
|
||||
RETURN count(r) AS count
|
||||
`
|
||||
result, err := n.ExecuteRead(ctx, countRefCypher, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to count REFERENCES relationships: %w", err)
|
||||
}
|
||||
|
||||
var refCount int64
|
||||
if result.Next(ctx) {
|
||||
if count, ok := result.Record().Values[0].(int64); ok {
|
||||
refCount = count
|
||||
}
|
||||
}
|
||||
n.Logger.Infof("found %d direct Event-[:REFERENCES]->Event relationships to migrate", refCount)
|
||||
|
||||
// Step 2: Count existing direct MENTIONS relationships (Event->NostrUser)
|
||||
countMentionsCypher := `
|
||||
MATCH (source:Event)-[r:MENTIONS]->(target:NostrUser)
|
||||
RETURN count(r) AS count
|
||||
`
|
||||
result, err = n.ExecuteRead(ctx, countMentionsCypher, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to count MENTIONS relationships: %w", err)
|
||||
}
|
||||
|
||||
var mentionsCount int64
|
||||
if result.Next(ctx) {
|
||||
if count, ok := result.Record().Values[0].(int64); ok {
|
||||
mentionsCount = count
|
||||
}
|
||||
}
|
||||
n.Logger.Infof("found %d direct Event-[:MENTIONS]->NostrUser relationships to migrate", mentionsCount)
|
||||
|
||||
// If nothing to migrate, we're done
|
||||
if refCount == 0 && mentionsCount == 0 {
|
||||
n.Logger.Infof("no direct relationships to migrate, migration complete")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Step 3: Migrate REFERENCES relationships to Tag-based model
|
||||
// Process in batches to avoid memory issues with large datasets
|
||||
if refCount > 0 {
|
||||
n.Logger.Infof("migrating %d REFERENCES relationships to Tag-based model...", refCount)
|
||||
|
||||
// This query:
|
||||
// 1. Finds Event->Event REFERENCES relationships
|
||||
// 2. Creates/merges Tag node with type='e' and value=target event ID
|
||||
// 3. Creates TAGGED_WITH from source Event to Tag
|
||||
// 4. Creates REFERENCES from Tag to target Event
|
||||
// 5. Deletes the old direct REFERENCES relationship
|
||||
migrateRefCypher := `
|
||||
MATCH (source:Event)-[r:REFERENCES]->(target:Event)
|
||||
WITH source, r, target LIMIT 1000
|
||||
MERGE (t:Tag {type: 'e', value: target.id})
|
||||
CREATE (source)-[:TAGGED_WITH]->(t)
|
||||
MERGE (t)-[:REFERENCES]->(target)
|
||||
DELETE r
|
||||
RETURN count(r) AS migrated
|
||||
`
|
||||
|
||||
// Run migration in batches until no more relationships exist
|
||||
totalMigrated := int64(0)
|
||||
for {
|
||||
result, err := n.ExecuteWrite(ctx, migrateRefCypher, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to migrate REFERENCES batch: %w", err)
|
||||
}
|
||||
|
||||
var batchMigrated int64
|
||||
if result.Next(ctx) {
|
||||
if count, ok := result.Record().Values[0].(int64); ok {
|
||||
batchMigrated = count
|
||||
}
|
||||
}
|
||||
|
||||
if batchMigrated == 0 {
|
||||
break
|
||||
}
|
||||
totalMigrated += batchMigrated
|
||||
n.Logger.Infof("migrated %d REFERENCES relationships (total: %d)", batchMigrated, totalMigrated)
|
||||
}
|
||||
|
||||
n.Logger.Infof("completed migrating %d REFERENCES relationships", totalMigrated)
|
||||
}
|
||||
|
||||
// Step 4: Migrate MENTIONS relationships to Tag-based model
|
||||
if mentionsCount > 0 {
|
||||
n.Logger.Infof("migrating %d MENTIONS relationships to Tag-based model...", mentionsCount)
|
||||
|
||||
// This query:
|
||||
// 1. Finds Event->NostrUser MENTIONS relationships
|
||||
// 2. Creates/merges Tag node with type='p' and value=target pubkey
|
||||
// 3. Creates TAGGED_WITH from source Event to Tag
|
||||
// 4. Creates REFERENCES from Tag to target NostrUser
|
||||
// 5. Deletes the old direct MENTIONS relationship
|
||||
migrateMentionsCypher := `
|
||||
MATCH (source:Event)-[r:MENTIONS]->(target:NostrUser)
|
||||
WITH source, r, target LIMIT 1000
|
||||
MERGE (t:Tag {type: 'p', value: target.pubkey})
|
||||
CREATE (source)-[:TAGGED_WITH]->(t)
|
||||
MERGE (t)-[:REFERENCES]->(target)
|
||||
DELETE r
|
||||
RETURN count(r) AS migrated
|
||||
`
|
||||
|
||||
// Run migration in batches until no more relationships exist
|
||||
totalMigrated := int64(0)
|
||||
for {
|
||||
result, err := n.ExecuteWrite(ctx, migrateMentionsCypher, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to migrate MENTIONS batch: %w", err)
|
||||
}
|
||||
|
||||
var batchMigrated int64
|
||||
if result.Next(ctx) {
|
||||
if count, ok := result.Record().Values[0].(int64); ok {
|
||||
batchMigrated = count
|
||||
}
|
||||
}
|
||||
|
||||
if batchMigrated == 0 {
|
||||
break
|
||||
}
|
||||
totalMigrated += batchMigrated
|
||||
n.Logger.Infof("migrated %d MENTIONS relationships (total: %d)", batchMigrated, totalMigrated)
|
||||
}
|
||||
|
||||
n.Logger.Infof("completed migrating %d MENTIONS relationships", totalMigrated)
|
||||
}
|
||||
|
||||
n.Logger.Infof("Tag-based references migration completed successfully")
|
||||
return nil
|
||||
}
|
||||
|
||||
// migrateDeduplicateReports removes duplicate REPORTS relationships
|
||||
// Prior to this migration, processReport() used CREATE which allowed multiple
|
||||
// REPORTS relationships with the same report_type between the same two users.
|
||||
// This migration keeps only the most recent report (by created_at) for each
|
||||
// (reporter, reported, report_type) combination.
|
||||
func migrateDeduplicateReports(ctx context.Context, n *N) error {
|
||||
// Step 1: Count duplicate REPORTS relationships
|
||||
// Duplicates are defined as multiple REPORTS with the same (reporter, reported, report_type)
|
||||
countDuplicatesCypher := `
|
||||
MATCH (reporter:NostrUser)-[r:REPORTS]->(reported:NostrUser)
|
||||
WITH reporter, reported, r.report_type AS type, collect(r) AS rels
|
||||
WHERE size(rels) > 1
|
||||
RETURN sum(size(rels) - 1) AS duplicate_count
|
||||
`
|
||||
result, err := n.ExecuteRead(ctx, countDuplicatesCypher, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to count duplicate REPORTS: %w", err)
|
||||
}
|
||||
|
||||
var duplicateCount int64
|
||||
if result.Next(ctx) {
|
||||
if count, ok := result.Record().Values[0].(int64); ok {
|
||||
duplicateCount = count
|
||||
}
|
||||
}
|
||||
|
||||
if duplicateCount == 0 {
|
||||
n.Logger.Infof("no duplicate REPORTS relationships found, migration complete")
|
||||
return nil
|
||||
}
|
||||
|
||||
n.Logger.Infof("found %d duplicate REPORTS relationships to remove", duplicateCount)
|
||||
|
||||
// Step 2: Delete duplicate REPORTS, keeping the one with the highest created_at
|
||||
// This query:
|
||||
// 1. Groups REPORTS by (reporter, reported, report_type)
|
||||
// 2. Finds the maximum created_at for each group
|
||||
// 3. Deletes all relationships in the group except the newest one
|
||||
deleteDuplicatesCypher := `
|
||||
MATCH (reporter:NostrUser)-[r:REPORTS]->(reported:NostrUser)
|
||||
WITH reporter, reported, r.report_type AS type,
|
||||
collect(r) AS rels, max(r.created_at) AS maxCreatedAt
|
||||
WHERE size(rels) > 1
|
||||
UNWIND rels AS rel
|
||||
WITH rel, maxCreatedAt
|
||||
WHERE rel.created_at < maxCreatedAt
|
||||
DELETE rel
|
||||
RETURN count(*) AS deleted
|
||||
`
|
||||
|
||||
writeResult, err := n.ExecuteWrite(ctx, deleteDuplicatesCypher, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to delete duplicate REPORTS: %w", err)
|
||||
}
|
||||
|
||||
var deletedCount int64
|
||||
if writeResult.Next(ctx) {
|
||||
if count, ok := writeResult.Record().Values[0].(int64); ok {
|
||||
deletedCount = count
|
||||
}
|
||||
}
|
||||
|
||||
n.Logger.Infof("deleted %d duplicate REPORTS relationships", deletedCount)
|
||||
|
||||
// Step 3: Mark superseded ProcessedSocialEvent nodes for deleted reports
|
||||
// Find ProcessedSocialEvent nodes (kind 1984) whose event IDs are no longer
|
||||
// referenced by any REPORTS relationship's created_by_event
|
||||
markSupersededCypher := `
|
||||
MATCH (evt:ProcessedSocialEvent {event_kind: 1984})
|
||||
WHERE evt.superseded_by IS NULL
|
||||
AND NOT EXISTS {
|
||||
MATCH ()-[r:REPORTS]->()
|
||||
WHERE r.created_by_event = evt.event_id
|
||||
}
|
||||
SET evt.superseded_by = 'migration_v4_dedupe'
|
||||
RETURN count(evt) AS superseded
|
||||
`
|
||||
|
||||
markResult, err := n.ExecuteWrite(ctx, markSupersededCypher, nil)
|
||||
if err != nil {
|
||||
// Non-fatal - just log warning
|
||||
n.Logger.Warningf("failed to mark superseded ProcessedSocialEvent nodes: %v", err)
|
||||
} else {
|
||||
var supersededCount int64
|
||||
if markResult.Next(ctx) {
|
||||
if count, ok := markResult.Record().Values[0].(int64); ok {
|
||||
supersededCount = count
|
||||
}
|
||||
}
|
||||
if supersededCount > 0 {
|
||||
n.Logger.Infof("marked %d ProcessedSocialEvent nodes as superseded", supersededCount)
|
||||
}
|
||||
}
|
||||
|
||||
n.Logger.Infof("REPORTS deduplication migration completed successfully")
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -238,7 +238,8 @@ func (n *N) addTagsInBatches(c context.Context, eventID string, ev *event.E) err
|
||||
}
|
||||
|
||||
// addPTagsInBatches adds p-tag (pubkey mention) relationships using UNWIND for efficiency.
|
||||
// Creates NostrUser nodes for mentioned pubkeys and MENTIONS relationships.
|
||||
// Creates Tag nodes with type='p' and REFERENCES relationships to NostrUser nodes.
|
||||
// This enables unified tag querying via #p filters while maintaining the social graph.
|
||||
func (n *N) addPTagsInBatches(c context.Context, eventID string, pTags []string) error {
|
||||
// Process in batches to avoid memory issues
|
||||
for i := 0; i < len(pTags); i += tagBatchSize {
|
||||
@@ -249,12 +250,17 @@ func (n *N) addPTagsInBatches(c context.Context, eventID string, pTags []string)
|
||||
batch := pTags[i:end]
|
||||
|
||||
// Use UNWIND to process multiple p-tags in a single query
|
||||
// Creates Tag nodes as intermediaries, enabling unified #p filter queries
|
||||
// Tag-[:REFERENCES]->NostrUser allows graph traversal from tag to user
|
||||
cypher := `
|
||||
MATCH (e:Event {id: $eventId})
|
||||
UNWIND $pubkeys AS pubkey
|
||||
MERGE (t:Tag {type: 'p', value: pubkey})
|
||||
CREATE (e)-[:TAGGED_WITH]->(t)
|
||||
WITH t, pubkey
|
||||
MERGE (u:NostrUser {pubkey: pubkey})
|
||||
ON CREATE SET u.created_at = timestamp()
|
||||
CREATE (e)-[:MENTIONS]->(u)`
|
||||
MERGE (t)-[:REFERENCES]->(u)`
|
||||
|
||||
params := map[string]any{
|
||||
"eventId": eventID,
|
||||
@@ -270,7 +276,8 @@ CREATE (e)-[:MENTIONS]->(u)`
|
||||
}
|
||||
|
||||
// addETagsInBatches adds e-tag (event reference) relationships using UNWIND for efficiency.
|
||||
// Only creates REFERENCES relationships if the referenced event exists.
|
||||
// Creates Tag nodes with type='e' and REFERENCES relationships to Event nodes (if they exist).
|
||||
// This enables unified tag querying via #e filters while maintaining event graph structure.
|
||||
func (n *N) addETagsInBatches(c context.Context, eventID string, eTags []string) error {
|
||||
// Process in batches to avoid memory issues
|
||||
for i := 0; i < len(eTags); i += tagBatchSize {
|
||||
@@ -281,14 +288,18 @@ func (n *N) addETagsInBatches(c context.Context, eventID string, eTags []string)
|
||||
batch := eTags[i:end]
|
||||
|
||||
// Use UNWIND to process multiple e-tags in a single query
|
||||
// OPTIONAL MATCH ensures we only create relationships if referenced event exists
|
||||
// Creates Tag nodes as intermediaries, enabling unified #e filter queries
|
||||
// Tag-[:REFERENCES]->Event allows graph traversal from tag to referenced event
|
||||
// OPTIONAL MATCH ensures we only create REFERENCES if referenced event exists
|
||||
cypher := `
|
||||
MATCH (e:Event {id: $eventId})
|
||||
UNWIND $eventIds AS refId
|
||||
MERGE (t:Tag {type: 'e', value: refId})
|
||||
CREATE (e)-[:TAGGED_WITH]->(t)
|
||||
WITH t, refId
|
||||
OPTIONAL MATCH (ref:Event {id: refId})
|
||||
WITH e, ref
|
||||
WHERE ref IS NOT NULL
|
||||
CREATE (e)-[:REFERENCES]->(ref)`
|
||||
MERGE (t)-[:REFERENCES]->(ref)`
|
||||
|
||||
params := map[string]any{
|
||||
"eventId": eventID,
|
||||
|
||||
@@ -151,7 +151,7 @@ func TestSafePrefix(t *testing.T) {
|
||||
}
|
||||
|
||||
// TestSaveEvent_ETagReference tests that events with e-tags are saved correctly
|
||||
// and the REFERENCES relationships are created when the referenced event exists.
|
||||
// using the Tag-based model: Event-[:TAGGED_WITH]->Tag-[:REFERENCES]->Event.
|
||||
// Uses shared testDB from testmain_test.go to avoid auth rate limiting.
|
||||
func TestSaveEvent_ETagReference(t *testing.T) {
|
||||
if testDB == nil {
|
||||
@@ -226,10 +226,10 @@ func TestSaveEvent_ETagReference(t *testing.T) {
|
||||
t.Fatal("Reply event should not exist yet")
|
||||
}
|
||||
|
||||
// Verify REFERENCES relationship was created
|
||||
// Verify Tag-based e-tag model: Event-[:TAGGED_WITH]->Tag{type:'e'}-[:REFERENCES]->Event
|
||||
cypher := `
|
||||
MATCH (reply:Event {id: $replyId})-[:REFERENCES]->(root:Event {id: $rootId})
|
||||
RETURN reply.id AS replyId, root.id AS rootId
|
||||
MATCH (reply:Event {id: $replyId})-[:TAGGED_WITH]->(t:Tag {type: 'e', value: $rootId})-[:REFERENCES]->(root:Event {id: $rootId})
|
||||
RETURN reply.id AS replyId, t.value AS tagValue, root.id AS rootId
|
||||
`
|
||||
params := map[string]any{
|
||||
"replyId": hex.Enc(replyEvent.ID[:]),
|
||||
@@ -238,42 +238,43 @@ func TestSaveEvent_ETagReference(t *testing.T) {
|
||||
|
||||
result, err := testDB.ExecuteRead(ctx, cypher, params)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to query REFERENCES relationship: %v", err)
|
||||
t.Fatalf("Failed to query Tag-based REFERENCES: %v", err)
|
||||
}
|
||||
|
||||
if !result.Next(ctx) {
|
||||
t.Error("Expected REFERENCES relationship between reply and root events")
|
||||
t.Error("Expected Tag-based REFERENCES relationship between reply and root events")
|
||||
} else {
|
||||
record := result.Record()
|
||||
returnedReplyId := record.Values[0].(string)
|
||||
returnedRootId := record.Values[1].(string)
|
||||
t.Logf("✓ REFERENCES relationship verified: %s -> %s", returnedReplyId[:8], returnedRootId[:8])
|
||||
tagValue := record.Values[1].(string)
|
||||
returnedRootId := record.Values[2].(string)
|
||||
t.Logf("✓ Tag-based REFERENCES verified: Event(%s) -> Tag{e:%s} -> Event(%s)", returnedReplyId[:8], tagValue[:8], returnedRootId[:8])
|
||||
}
|
||||
|
||||
// Verify MENTIONS relationship was also created for the p-tag
|
||||
mentionsCypher := `
|
||||
MATCH (reply:Event {id: $replyId})-[:MENTIONS]->(author:NostrUser {pubkey: $authorPubkey})
|
||||
RETURN author.pubkey AS pubkey
|
||||
// Verify Tag-based p-tag model: Event-[:TAGGED_WITH]->Tag{type:'p'}-[:REFERENCES]->NostrUser
|
||||
pTagCypher := `
|
||||
MATCH (reply:Event {id: $replyId})-[:TAGGED_WITH]->(t:Tag {type: 'p', value: $authorPubkey})-[:REFERENCES]->(author:NostrUser {pubkey: $authorPubkey})
|
||||
RETURN author.pubkey AS pubkey, t.value AS tagValue
|
||||
`
|
||||
mentionsParams := map[string]any{
|
||||
pTagParams := map[string]any{
|
||||
"replyId": hex.Enc(replyEvent.ID[:]),
|
||||
"authorPubkey": hex.Enc(alice.Pub()),
|
||||
}
|
||||
|
||||
mentionsResult, err := testDB.ExecuteRead(ctx, mentionsCypher, mentionsParams)
|
||||
pTagResult, err := testDB.ExecuteRead(ctx, pTagCypher, pTagParams)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to query MENTIONS relationship: %v", err)
|
||||
t.Fatalf("Failed to query Tag-based p-tag: %v", err)
|
||||
}
|
||||
|
||||
if !mentionsResult.Next(ctx) {
|
||||
t.Error("Expected MENTIONS relationship for p-tag")
|
||||
if !pTagResult.Next(ctx) {
|
||||
t.Error("Expected Tag-based p-tag relationship")
|
||||
} else {
|
||||
t.Logf("✓ MENTIONS relationship verified")
|
||||
t.Logf("✓ Tag-based p-tag relationship verified")
|
||||
}
|
||||
}
|
||||
|
||||
// TestSaveEvent_ETagMissingReference tests that e-tags to non-existent events
|
||||
// don't create broken relationships (batched processing handles this gracefully).
|
||||
// create Tag nodes but don't create REFERENCES relationships to missing events.
|
||||
// Uses shared testDB from testmain_test.go to avoid auth rate limiting.
|
||||
func TestSaveEvent_ETagMissingReference(t *testing.T) {
|
||||
if testDB == nil {
|
||||
@@ -331,29 +332,50 @@ func TestSaveEvent_ETagMissingReference(t *testing.T) {
|
||||
t.Error("Event should have been saved despite missing reference")
|
||||
}
|
||||
|
||||
// Verify no REFERENCES relationship was created (as the target doesn't exist)
|
||||
// Verify Tag node was created with TAGGED_WITH relationship
|
||||
tagCypher := `
|
||||
MATCH (e:Event {id: $eventId})-[:TAGGED_WITH]->(t:Tag {type: 'e', value: $refId})
|
||||
RETURN t.value AS tagValue
|
||||
`
|
||||
tagParams := map[string]any{
|
||||
"eventId": hex.Enc(ev.ID[:]),
|
||||
"refId": nonExistentEventID,
|
||||
}
|
||||
|
||||
tagResult, err := testDB.ExecuteRead(ctx, tagCypher, tagParams)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to check Tag node: %v", err)
|
||||
}
|
||||
|
||||
if !tagResult.Next(ctx) {
|
||||
t.Error("Expected Tag node to be created for e-tag even when target doesn't exist")
|
||||
} else {
|
||||
t.Logf("✓ Tag node created for missing reference")
|
||||
}
|
||||
|
||||
// Verify no REFERENCES relationship was created from Tag (as the target Event doesn't exist)
|
||||
refCypher := `
|
||||
MATCH (e:Event {id: $eventId})-[:REFERENCES]->(ref:Event)
|
||||
MATCH (t:Tag {type: 'e', value: $refId})-[:REFERENCES]->(ref:Event)
|
||||
RETURN count(ref) AS refCount
|
||||
`
|
||||
refParams := map[string]any{"eventId": hex.Enc(ev.ID[:])}
|
||||
refParams := map[string]any{"refId": nonExistentEventID}
|
||||
|
||||
refResult, err := testDB.ExecuteRead(ctx, refCypher, refParams)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to check references: %v", err)
|
||||
t.Fatalf("Failed to check REFERENCES from Tag: %v", err)
|
||||
}
|
||||
|
||||
if refResult.Next(ctx) {
|
||||
count := refResult.Record().Values[0].(int64)
|
||||
if count > 0 {
|
||||
t.Errorf("Expected no REFERENCES relationship for non-existent event, got %d", count)
|
||||
t.Errorf("Expected no REFERENCES from Tag for non-existent event, got %d", count)
|
||||
} else {
|
||||
t.Logf("✓ Correctly handled missing reference (no relationship created)")
|
||||
t.Logf("✓ Correctly handled missing reference (no REFERENCES from Tag)")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestSaveEvent_MultipleETags tests events with multiple e-tags.
|
||||
// TestSaveEvent_MultipleETags tests events with multiple e-tags using Tag-based model.
|
||||
// Uses shared testDB from testmain_test.go to avoid auth rate limiting.
|
||||
func TestSaveEvent_MultipleETags(t *testing.T) {
|
||||
if testDB == nil {
|
||||
@@ -409,7 +431,7 @@ func TestSaveEvent_MultipleETags(t *testing.T) {
|
||||
t.Fatalf("Failed to sign reply event: %v", err)
|
||||
}
|
||||
|
||||
// Save reply event - tests batched e-tag creation
|
||||
// Save reply event - tests batched e-tag creation with Tag nodes
|
||||
exists, err := testDB.SaveEvent(ctx, replyEvent)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to save multi-reference event: %v", err)
|
||||
@@ -418,16 +440,17 @@ func TestSaveEvent_MultipleETags(t *testing.T) {
|
||||
t.Fatal("Reply event should not exist yet")
|
||||
}
|
||||
|
||||
// Verify all REFERENCES relationships were created
|
||||
// Verify all Tag-based REFERENCES relationships were created
|
||||
// Event-[:TAGGED_WITH]->Tag{type:'e'}-[:REFERENCES]->Event
|
||||
cypher := `
|
||||
MATCH (reply:Event {id: $replyId})-[:REFERENCES]->(ref:Event)
|
||||
MATCH (reply:Event {id: $replyId})-[:TAGGED_WITH]->(t:Tag {type: 'e'})-[:REFERENCES]->(ref:Event)
|
||||
RETURN ref.id AS refId
|
||||
`
|
||||
params := map[string]any{"replyId": hex.Enc(replyEvent.ID[:])}
|
||||
|
||||
result, err := testDB.ExecuteRead(ctx, cypher, params)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to query REFERENCES relationships: %v", err)
|
||||
t.Fatalf("Failed to query Tag-based REFERENCES: %v", err)
|
||||
}
|
||||
|
||||
referencedIDs := make(map[string]bool)
|
||||
@@ -437,20 +460,20 @@ func TestSaveEvent_MultipleETags(t *testing.T) {
|
||||
}
|
||||
|
||||
if len(referencedIDs) != 3 {
|
||||
t.Errorf("Expected 3 REFERENCES relationships, got %d", len(referencedIDs))
|
||||
t.Errorf("Expected 3 Tag-based REFERENCES, got %d", len(referencedIDs))
|
||||
}
|
||||
|
||||
for i, id := range eventIDs {
|
||||
if !referencedIDs[id] {
|
||||
t.Errorf("Missing REFERENCES relationship to event %d (%s)", i, id[:8])
|
||||
t.Errorf("Missing Tag-based REFERENCES to event %d (%s)", i, id[:8])
|
||||
}
|
||||
}
|
||||
|
||||
t.Logf("✓ All %d REFERENCES relationships created successfully", len(referencedIDs))
|
||||
t.Logf("✓ All %d Tag-based REFERENCES created successfully", len(referencedIDs))
|
||||
}
|
||||
|
||||
// TestSaveEvent_LargePTagBatch tests that events with many p-tags are saved correctly
|
||||
// using batched processing to avoid Neo4j stack overflow.
|
||||
// using batched Tag-based processing to avoid Neo4j stack overflow.
|
||||
// Uses shared testDB from testmain_test.go to avoid auth rate limiting.
|
||||
func TestSaveEvent_LargePTagBatch(t *testing.T) {
|
||||
if testDB == nil {
|
||||
@@ -498,24 +521,45 @@ func TestSaveEvent_LargePTagBatch(t *testing.T) {
|
||||
t.Fatal("Event should not exist yet")
|
||||
}
|
||||
|
||||
// Verify all MENTIONS relationships were created
|
||||
countCypher := `
|
||||
MATCH (e:Event {id: $eventId})-[:MENTIONS]->(u:NostrUser)
|
||||
RETURN count(u) AS mentionCount
|
||||
// Verify all Tag nodes were created with TAGGED_WITH relationships
|
||||
tagCountCypher := `
|
||||
MATCH (e:Event {id: $eventId})-[:TAGGED_WITH]->(t:Tag {type: 'p'})
|
||||
RETURN count(t) AS tagCount
|
||||
`
|
||||
countParams := map[string]any{"eventId": hex.Enc(ev.ID[:])}
|
||||
tagCountParams := map[string]any{"eventId": hex.Enc(ev.ID[:])}
|
||||
|
||||
result, err := testDB.ExecuteRead(ctx, countCypher, countParams)
|
||||
tagResult, err := testDB.ExecuteRead(ctx, tagCountCypher, tagCountParams)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to count MENTIONS: %v", err)
|
||||
t.Fatalf("Failed to count p-tag Tag nodes: %v", err)
|
||||
}
|
||||
|
||||
if result.Next(ctx) {
|
||||
count := result.Record().Values[0].(int64)
|
||||
if tagResult.Next(ctx) {
|
||||
count := tagResult.Record().Values[0].(int64)
|
||||
if count != int64(numTags) {
|
||||
t.Errorf("Expected %d MENTIONS relationships, got %d", numTags, count)
|
||||
t.Errorf("Expected %d Tag nodes, got %d", numTags, count)
|
||||
} else {
|
||||
t.Logf("✓ All %d MENTIONS relationships created via batched processing", count)
|
||||
t.Logf("✓ All %d p-tag Tag nodes created via batched processing", count)
|
||||
}
|
||||
}
|
||||
|
||||
// Verify all REFERENCES relationships to NostrUser were created
|
||||
refCountCypher := `
|
||||
MATCH (e:Event {id: $eventId})-[:TAGGED_WITH]->(t:Tag {type: 'p'})-[:REFERENCES]->(u:NostrUser)
|
||||
RETURN count(u) AS refCount
|
||||
`
|
||||
refCountParams := map[string]any{"eventId": hex.Enc(ev.ID[:])}
|
||||
|
||||
refResult, err := testDB.ExecuteRead(ctx, refCountCypher, refCountParams)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to count Tag-based REFERENCES to NostrUser: %v", err)
|
||||
}
|
||||
|
||||
if refResult.Next(ctx) {
|
||||
count := refResult.Record().Values[0].(int64)
|
||||
if count != int64(numTags) {
|
||||
t.Errorf("Expected %d REFERENCES to NostrUser, got %d", numTags, count)
|
||||
} else {
|
||||
t.Logf("✓ All %d Tag-based REFERENCES to NostrUser created via batched processing", count)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -211,6 +211,8 @@ func (p *SocialEventProcessor) processMuteList(ctx context.Context, ev *event.E)
|
||||
}
|
||||
|
||||
// processReport handles kind 1984 events (reports)
|
||||
// Deduplicates by (reporter, reported, report_type) - only one REPORTS relationship
|
||||
// per combination, with the most recent event's data preserved.
|
||||
func (p *SocialEventProcessor) processReport(ctx context.Context, ev *event.E) error {
|
||||
reporterPubkey := hex.Enc(ev.Pubkey[:])
|
||||
eventID := hex.Enc(ev.ID[:])
|
||||
@@ -236,8 +238,14 @@ func (p *SocialEventProcessor) processReport(ctx context.Context, ev *event.E) e
|
||||
return nil
|
||||
}
|
||||
|
||||
// Create REPORTS relationship
|
||||
// Note: WITH is required between CREATE and MERGE in Cypher
|
||||
// Check for existing report of the same type to determine if this is an update
|
||||
existingEventID, err := p.getExistingReportEvent(ctx, reporterPubkey, reportedPubkey, reportType)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to check existing report: %w", err)
|
||||
}
|
||||
|
||||
// Create REPORTS relationship with MERGE to deduplicate
|
||||
// MERGE on (reporter, reported, report_type) ensures only one relationship per combination
|
||||
cypher := `
|
||||
// Create event tracking node
|
||||
CREATE (evt:ProcessedSocialEvent {
|
||||
@@ -257,13 +265,18 @@ func (p *SocialEventProcessor) processReport(ctx context.Context, ev *event.E) e
|
||||
MERGE (reporter:NostrUser {pubkey: $reporter_pubkey})
|
||||
MERGE (reported:NostrUser {pubkey: $reported_pubkey})
|
||||
|
||||
// Create REPORTS relationship
|
||||
CREATE (reporter)-[:REPORTS {
|
||||
created_by_event: $event_id,
|
||||
created_at: $created_at,
|
||||
relay_received_at: timestamp(),
|
||||
report_type: $report_type
|
||||
}]->(reported)
|
||||
// MERGE on (reporter, reported, report_type) - deduplicate!
|
||||
MERGE (reporter)-[r:REPORTS {report_type: $report_type}]->(reported)
|
||||
ON CREATE SET
|
||||
r.created_by_event = $event_id,
|
||||
r.created_at = $created_at,
|
||||
r.relay_received_at = timestamp()
|
||||
ON MATCH SET
|
||||
// Only update if this event is newer
|
||||
r.created_by_event = CASE WHEN $created_at > r.created_at
|
||||
THEN $event_id ELSE r.created_by_event END,
|
||||
r.created_at = CASE WHEN $created_at > r.created_at
|
||||
THEN $created_at ELSE r.created_at END
|
||||
`
|
||||
|
||||
params := map[string]any{
|
||||
@@ -274,9 +287,14 @@ func (p *SocialEventProcessor) processReport(ctx context.Context, ev *event.E) e
|
||||
"report_type": reportType,
|
||||
}
|
||||
|
||||
_, err := p.db.ExecuteWrite(ctx, cypher, params)
|
||||
_, err = p.db.ExecuteWrite(ctx, cypher, params)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create report: %w", err)
|
||||
return fmt.Errorf("failed to create/update report: %w", err)
|
||||
}
|
||||
|
||||
// Mark old ProcessedSocialEvent as superseded if this is an update with newer data
|
||||
if existingEventID != "" && existingEventID != eventID {
|
||||
p.markReportEventSuperseded(ctx, existingEventID, eventID)
|
||||
}
|
||||
|
||||
p.db.Logger.Infof("processed report: reporter=%s, reported=%s, type=%s",
|
||||
@@ -285,6 +303,52 @@ func (p *SocialEventProcessor) processReport(ctx context.Context, ev *event.E) e
|
||||
return nil
|
||||
}
|
||||
|
||||
// getExistingReportEvent checks if a REPORTS relationship already exists for this combination
|
||||
// Returns the event ID that created the relationship, or empty string if none exists
|
||||
func (p *SocialEventProcessor) getExistingReportEvent(ctx context.Context, reporterPubkey, reportedPubkey, reportType string) (string, error) {
|
||||
cypher := `
|
||||
MATCH (reporter:NostrUser {pubkey: $reporter_pubkey})-[r:REPORTS {report_type: $report_type}]->(reported:NostrUser {pubkey: $reported_pubkey})
|
||||
RETURN r.created_by_event AS event_id
|
||||
LIMIT 1
|
||||
`
|
||||
|
||||
params := map[string]any{
|
||||
"reporter_pubkey": reporterPubkey,
|
||||
"reported_pubkey": reportedPubkey,
|
||||
"report_type": reportType,
|
||||
}
|
||||
|
||||
result, err := p.db.ExecuteRead(ctx, cypher, params)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
if result.Next(ctx) {
|
||||
record := result.Record()
|
||||
if eventID, ok := record.Values[0].(string); ok {
|
||||
return eventID, nil
|
||||
}
|
||||
}
|
||||
|
||||
return "", nil
|
||||
}
|
||||
|
||||
// markReportEventSuperseded marks an older ProcessedSocialEvent as superseded by a newer one
|
||||
func (p *SocialEventProcessor) markReportEventSuperseded(ctx context.Context, oldEventID, newEventID string) {
|
||||
cypher := `
|
||||
MATCH (old:ProcessedSocialEvent {event_id: $old_event_id, event_kind: 1984})
|
||||
SET old.superseded_by = $new_event_id
|
||||
`
|
||||
|
||||
params := map[string]any{
|
||||
"old_event_id": oldEventID,
|
||||
"new_event_id": newEventID,
|
||||
}
|
||||
|
||||
// Ignore errors - old event may not exist
|
||||
p.db.ExecuteWrite(ctx, cypher, params)
|
||||
}
|
||||
|
||||
// UpdateContactListParams holds parameters for contact list graph update
|
||||
type UpdateContactListParams struct {
|
||||
AuthorPubkey string
|
||||
|
||||
@@ -737,3 +737,264 @@ func BenchmarkDiffComputation(b *testing.B) {
|
||||
_, _ = diffStringSlices(old, new)
|
||||
}
|
||||
}
|
||||
|
||||
// TestReportDeduplication tests that duplicate REPORTS are deduplicated
|
||||
func TestReportDeduplication(t *testing.T) {
|
||||
if testDB == nil {
|
||||
t.Skip("Neo4j not available")
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
t.Run("DeduplicateSameType", func(t *testing.T) {
|
||||
// Clean database for this subtest
|
||||
cleanTestDatabase()
|
||||
|
||||
reporter := generateTestKeypair(t, "reporter")
|
||||
reported := generateTestKeypair(t, "reported")
|
||||
|
||||
reporterPubkey := hex.Enc(reporter.pubkey[:])
|
||||
reportedPubkey := hex.Enc(reported.pubkey[:])
|
||||
|
||||
// Create first report (older timestamp)
|
||||
ev1 := event.New()
|
||||
ev1.Pubkey = reporter.pubkey
|
||||
ev1.CreatedAt = 1000
|
||||
ev1.Kind = 1984
|
||||
ev1.Tags = tag.NewS(
|
||||
tag.NewFromAny("p", reportedPubkey, "impersonation"),
|
||||
)
|
||||
ev1.Content = []byte("First report")
|
||||
|
||||
if err := ev1.Sign(reporter.signer); err != nil {
|
||||
t.Fatalf("Failed to sign first event: %v", err)
|
||||
}
|
||||
|
||||
if _, err := testDB.SaveEvent(ctx, ev1); err != nil {
|
||||
t.Fatalf("Failed to save first report: %v", err)
|
||||
}
|
||||
|
||||
// Create second report (newer timestamp, same type)
|
||||
ev2 := event.New()
|
||||
ev2.Pubkey = reporter.pubkey
|
||||
ev2.CreatedAt = 2000 // Newer timestamp
|
||||
ev2.Kind = 1984
|
||||
ev2.Tags = tag.NewS(
|
||||
tag.NewFromAny("p", reportedPubkey, "impersonation"),
|
||||
)
|
||||
ev2.Content = []byte("Second report")
|
||||
|
||||
if err := ev2.Sign(reporter.signer); err != nil {
|
||||
t.Fatalf("Failed to sign second event: %v", err)
|
||||
}
|
||||
|
||||
if _, err := testDB.SaveEvent(ctx, ev2); err != nil {
|
||||
t.Fatalf("Failed to save second report: %v", err)
|
||||
}
|
||||
|
||||
// Verify only ONE REPORTS relationship exists
|
||||
cypher := `
|
||||
MATCH (r:NostrUser {pubkey: $reporter})-[rel:REPORTS]->(d:NostrUser {pubkey: $reported})
|
||||
RETURN count(rel) AS count, rel.created_at AS created_at, rel.created_by_event AS event_id
|
||||
`
|
||||
params := map[string]any{
|
||||
"reporter": reporterPubkey,
|
||||
"reported": reportedPubkey,
|
||||
}
|
||||
|
||||
result, err := testDB.ExecuteRead(ctx, cypher, params)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to query REPORTS: %v", err)
|
||||
}
|
||||
|
||||
if !result.Next(ctx) {
|
||||
t.Fatal("No REPORTS relationship found")
|
||||
}
|
||||
|
||||
record := result.Record()
|
||||
count := record.Values[0].(int64)
|
||||
createdAt := record.Values[1].(int64)
|
||||
eventID := record.Values[2].(string)
|
||||
|
||||
if count != 1 {
|
||||
t.Errorf("Expected 1 REPORTS relationship, got %d", count)
|
||||
}
|
||||
|
||||
// Verify the relationship has the newer event's data
|
||||
if createdAt != 2000 {
|
||||
t.Errorf("Expected created_at=2000 (newer), got %d", createdAt)
|
||||
}
|
||||
|
||||
ev2ID := hex.Enc(ev2.ID[:])
|
||||
if eventID != ev2ID {
|
||||
t.Errorf("Expected event_id=%s, got %s", ev2ID, eventID)
|
||||
}
|
||||
|
||||
t.Log("✓ Duplicate reports correctly deduplicated to single relationship with newest data")
|
||||
})
|
||||
|
||||
t.Run("DifferentTypesAllowed", func(t *testing.T) {
|
||||
// Clean database for this subtest
|
||||
cleanTestDatabase()
|
||||
|
||||
reporter := generateTestKeypair(t, "reporter2")
|
||||
reported := generateTestKeypair(t, "reported2")
|
||||
|
||||
reporterPubkey := hex.Enc(reporter.pubkey[:])
|
||||
reportedPubkey := hex.Enc(reported.pubkey[:])
|
||||
|
||||
// Report for impersonation
|
||||
ev1 := event.New()
|
||||
ev1.Pubkey = reporter.pubkey
|
||||
ev1.CreatedAt = 1000
|
||||
ev1.Kind = 1984
|
||||
ev1.Tags = tag.NewS(
|
||||
tag.NewFromAny("p", reportedPubkey, "impersonation"),
|
||||
)
|
||||
|
||||
if err := ev1.Sign(reporter.signer); err != nil {
|
||||
t.Fatalf("Failed to sign event: %v", err)
|
||||
}
|
||||
|
||||
if _, err := testDB.SaveEvent(ctx, ev1); err != nil {
|
||||
t.Fatalf("Failed to save report: %v", err)
|
||||
}
|
||||
|
||||
// Report for spam (different type)
|
||||
ev2 := event.New()
|
||||
ev2.Pubkey = reporter.pubkey
|
||||
ev2.CreatedAt = 2000
|
||||
ev2.Kind = 1984
|
||||
ev2.Tags = tag.NewS(
|
||||
tag.NewFromAny("p", reportedPubkey, "spam"),
|
||||
)
|
||||
|
||||
if err := ev2.Sign(reporter.signer); err != nil {
|
||||
t.Fatalf("Failed to sign event: %v", err)
|
||||
}
|
||||
|
||||
if _, err := testDB.SaveEvent(ctx, ev2); err != nil {
|
||||
t.Fatalf("Failed to save report: %v", err)
|
||||
}
|
||||
|
||||
// Verify TWO REPORTS relationships exist (different types)
|
||||
cypher := `
|
||||
MATCH (r:NostrUser {pubkey: $reporter})-[rel:REPORTS]->(d:NostrUser {pubkey: $reported})
|
||||
RETURN rel.report_type AS type ORDER BY type
|
||||
`
|
||||
params := map[string]any{
|
||||
"reporter": reporterPubkey,
|
||||
"reported": reportedPubkey,
|
||||
}
|
||||
|
||||
result, err := testDB.ExecuteRead(ctx, cypher, params)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to query REPORTS: %v", err)
|
||||
}
|
||||
|
||||
var types []string
|
||||
for result.Next(ctx) {
|
||||
types = append(types, result.Record().Values[0].(string))
|
||||
}
|
||||
|
||||
if len(types) != 2 {
|
||||
t.Errorf("Expected 2 REPORTS relationships, got %d", len(types))
|
||||
}
|
||||
|
||||
if len(types) >= 2 && (types[0] != "impersonation" || types[1] != "spam") {
|
||||
t.Errorf("Expected [impersonation, spam], got %v", types)
|
||||
}
|
||||
|
||||
t.Log("✓ Different report types correctly create separate relationships")
|
||||
})
|
||||
|
||||
t.Run("SupersededEventTracking", func(t *testing.T) {
|
||||
// Clean database for this subtest
|
||||
cleanTestDatabase()
|
||||
|
||||
reporter := generateTestKeypair(t, "reporter3")
|
||||
reported := generateTestKeypair(t, "reported3")
|
||||
|
||||
reporterPubkey := hex.Enc(reporter.pubkey[:])
|
||||
reportedPubkey := hex.Enc(reported.pubkey[:])
|
||||
|
||||
// Create first report
|
||||
ev1 := event.New()
|
||||
ev1.Pubkey = reporter.pubkey
|
||||
ev1.CreatedAt = 1000
|
||||
ev1.Kind = 1984
|
||||
ev1.Tags = tag.NewS(
|
||||
tag.NewFromAny("p", reportedPubkey, "spam"),
|
||||
)
|
||||
|
||||
if err := ev1.Sign(reporter.signer); err != nil {
|
||||
t.Fatalf("Failed to sign first event: %v", err)
|
||||
}
|
||||
|
||||
if _, err := testDB.SaveEvent(ctx, ev1); err != nil {
|
||||
t.Fatalf("Failed to save first report: %v", err)
|
||||
}
|
||||
|
||||
ev1ID := hex.Enc(ev1.ID[:])
|
||||
|
||||
// Create second report (supersedes first)
|
||||
ev2 := event.New()
|
||||
ev2.Pubkey = reporter.pubkey
|
||||
ev2.CreatedAt = 2000
|
||||
ev2.Kind = 1984
|
||||
ev2.Tags = tag.NewS(
|
||||
tag.NewFromAny("p", reportedPubkey, "spam"),
|
||||
)
|
||||
|
||||
if err := ev2.Sign(reporter.signer); err != nil {
|
||||
t.Fatalf("Failed to sign second event: %v", err)
|
||||
}
|
||||
|
||||
if _, err := testDB.SaveEvent(ctx, ev2); err != nil {
|
||||
t.Fatalf("Failed to save second report: %v", err)
|
||||
}
|
||||
|
||||
ev2ID := hex.Enc(ev2.ID[:])
|
||||
|
||||
// Verify first ProcessedSocialEvent is superseded
|
||||
cypher := `
|
||||
MATCH (evt:ProcessedSocialEvent {event_id: $event_id, event_kind: 1984})
|
||||
RETURN evt.superseded_by AS superseded_by
|
||||
`
|
||||
params := map[string]any{"event_id": ev1ID}
|
||||
|
||||
result, err := testDB.ExecuteRead(ctx, cypher, params)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to query ProcessedSocialEvent: %v", err)
|
||||
}
|
||||
|
||||
if !result.Next(ctx) {
|
||||
t.Fatal("First ProcessedSocialEvent not found")
|
||||
}
|
||||
|
||||
supersededBy := result.Record().Values[0]
|
||||
if supersededBy == nil {
|
||||
t.Error("Expected first event to be superseded, but superseded_by is null")
|
||||
} else if supersededBy.(string) != ev2ID {
|
||||
t.Errorf("Expected superseded_by=%s, got %v", ev2ID, supersededBy)
|
||||
}
|
||||
|
||||
// Verify second ProcessedSocialEvent is NOT superseded
|
||||
params = map[string]any{"event_id": ev2ID}
|
||||
result, err = testDB.ExecuteRead(ctx, cypher, params)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to query second ProcessedSocialEvent: %v", err)
|
||||
}
|
||||
|
||||
if !result.Next(ctx) {
|
||||
t.Fatal("Second ProcessedSocialEvent not found")
|
||||
}
|
||||
|
||||
supersededBy = result.Record().Values[0]
|
||||
if supersededBy != nil {
|
||||
t.Errorf("Expected second event not to be superseded, but superseded_by=%v", supersededBy)
|
||||
}
|
||||
|
||||
t.Log("✓ ProcessedSocialEvent correctly tracks superseded events")
|
||||
})
|
||||
}
|
||||
|
||||
1105
pkg/neo4j/tag_model_test.go
Normal file
1105
pkg/neo4j/tag_model_test.go
Normal file
File diff suppressed because it is too large
Load Diff
@@ -1 +1 @@
|
||||
v0.35.5
|
||||
v0.36.1
|
||||
|
||||
Reference in New Issue
Block a user