Files
next.orly.dev/pkg/neo4j/social-event-processor.go

627 lines
18 KiB
Go

package neo4j
import (
"context"
"encoding/json"
"fmt"
"sort"
"git.mleku.dev/mleku/nostr/encoders/event"
"git.mleku.dev/mleku/nostr/encoders/hex"
)
// SocialEventProcessor handles kind 0, 3, 1984, 10000 events for social graph management
type SocialEventProcessor struct {
db *N
}
// NewSocialEventProcessor creates a new social event processor
func NewSocialEventProcessor(db *N) *SocialEventProcessor {
return &SocialEventProcessor{db: db}
}
// ProcessedSocialEvent represents a processed social graph event in Neo4j
type ProcessedSocialEvent struct {
EventID string
EventKind int
Pubkey string
CreatedAt int64
ProcessedAt int64
RelationshipCount int
SupersededBy *string // nil if still active
}
// ProcessSocialEvent routes events to appropriate handlers based on kind
func (p *SocialEventProcessor) ProcessSocialEvent(ctx context.Context, ev *event.E) error {
switch ev.Kind {
case 0:
return p.processProfileMetadata(ctx, ev)
case 3:
return p.processContactList(ctx, ev)
case 1984:
return p.processReport(ctx, ev)
case 10000:
return p.processMuteList(ctx, ev)
default:
return fmt.Errorf("unsupported social event kind: %d", ev.Kind)
}
}
// processProfileMetadata handles kind 0 events (profile metadata)
func (p *SocialEventProcessor) processProfileMetadata(ctx context.Context, ev *event.E) error {
pubkey := hex.Enc(ev.Pubkey[:])
eventID := hex.Enc(ev.ID[:])
// Parse profile JSON from content
var profile map[string]interface{}
if err := json.Unmarshal(ev.Content, &profile); err != nil {
p.db.Logger.Warningf("invalid profile JSON in event %s: %v", eventID, err)
return nil // Don't fail, just skip profile update
}
// Update NostrUser node with profile data
cypher := `
MERGE (user:NostrUser {pubkey: $pubkey})
ON CREATE SET
user.created_at = timestamp(),
user.first_seen_event = $event_id
ON MATCH SET
user.last_profile_update = $created_at
SET
user.name = $name,
user.about = $about,
user.picture = $picture,
user.nip05 = $nip05,
user.lud16 = $lud16,
user.display_name = $display_name,
user.npub = $npub
`
params := map[string]any{
"pubkey": pubkey,
"event_id": eventID,
"created_at": ev.CreatedAt,
"name": getStringFromMap(profile, "name"),
"about": getStringFromMap(profile, "about"),
"picture": getStringFromMap(profile, "picture"),
"nip05": getStringFromMap(profile, "nip05"),
"lud16": getStringFromMap(profile, "lud16"),
"display_name": getStringFromMap(profile, "display_name"),
"npub": "", // TODO: compute npub from pubkey
}
_, err := p.db.ExecuteWrite(ctx, cypher, params)
if err != nil {
return fmt.Errorf("failed to update profile: %w", err)
}
p.db.Logger.Infof("updated profile for user %s", pubkey[:16])
return nil
}
// processContactList handles kind 3 events (follow lists)
func (p *SocialEventProcessor) processContactList(ctx context.Context, ev *event.E) error {
authorPubkey := hex.Enc(ev.Pubkey[:])
eventID := hex.Enc(ev.ID[:])
// 1. Check for existing contact list
existingEvent, err := p.getLatestSocialEvent(ctx, authorPubkey, 3)
if err != nil {
return fmt.Errorf("failed to check existing contact list: %w", err)
}
// 2. Reject if this event is older than existing
if existingEvent != nil && existingEvent.CreatedAt >= ev.CreatedAt {
p.db.Logger.Infof("rejecting older contact list event %s (existing: %s)",
eventID[:16], existingEvent.EventID[:16])
return nil // Not an error, just skip
}
// 3. Extract p-tags to get new follows list
newFollows := extractPTags(ev)
// 4. Get old follows list if replacing an existing event
var oldFollows []string
var oldEventID string
if existingEvent != nil {
oldEventID = existingEvent.EventID
oldFollows, err = p.getFollowsForEvent(ctx, oldEventID)
if err != nil {
return fmt.Errorf("failed to get old follows: %w", err)
}
}
// 5. Compute diff
added, removed := diffStringSlices(oldFollows, newFollows)
// 6. Update graph in transaction
err = p.updateContactListGraph(ctx, UpdateContactListParams{
AuthorPubkey: authorPubkey,
NewEventID: eventID,
OldEventID: oldEventID,
CreatedAt: ev.CreatedAt,
AddedFollows: added,
RemovedFollows: removed,
TotalFollows: len(newFollows),
})
if err != nil {
return fmt.Errorf("failed to update contact list graph: %w", err)
}
p.db.Logger.Infof("processed contact list: author=%s, event=%s, added=%d, removed=%d, total=%d",
authorPubkey[:16], eventID[:16], len(added), len(removed), len(newFollows))
return nil
}
// processMuteList handles kind 10000 events (mute lists)
func (p *SocialEventProcessor) processMuteList(ctx context.Context, ev *event.E) error {
authorPubkey := hex.Enc(ev.Pubkey[:])
eventID := hex.Enc(ev.ID[:])
// Check for existing mute list
existingEvent, err := p.getLatestSocialEvent(ctx, authorPubkey, 10000)
if err != nil {
return fmt.Errorf("failed to check existing mute list: %w", err)
}
// Reject if older
if existingEvent != nil && existingEvent.CreatedAt >= ev.CreatedAt {
p.db.Logger.Infof("rejecting older mute list event %s", eventID[:16])
return nil
}
// Extract p-tags
newMutes := extractPTags(ev)
// Get old mutes
var oldMutes []string
var oldEventID string
if existingEvent != nil {
oldEventID = existingEvent.EventID
oldMutes, err = p.getMutesForEvent(ctx, oldEventID)
if err != nil {
return fmt.Errorf("failed to get old mutes: %w", err)
}
}
// Compute diff
added, removed := diffStringSlices(oldMutes, newMutes)
// Update graph
err = p.updateMuteListGraph(ctx, UpdateMuteListParams{
AuthorPubkey: authorPubkey,
NewEventID: eventID,
OldEventID: oldEventID,
CreatedAt: ev.CreatedAt,
AddedMutes: added,
RemovedMutes: removed,
TotalMutes: len(newMutes),
})
if err != nil {
return fmt.Errorf("failed to update mute list graph: %w", err)
}
p.db.Logger.Infof("processed mute list: author=%s, event=%s, added=%d, removed=%d",
authorPubkey[:16], eventID[:16], len(added), len(removed))
return nil
}
// processReport handles kind 1984 events (reports)
func (p *SocialEventProcessor) processReport(ctx context.Context, ev *event.E) error {
reporterPubkey := hex.Enc(ev.Pubkey[:])
eventID := hex.Enc(ev.ID[:])
// Extract report target and type from tags
// Format: ["p", "reported_pubkey", "report_type"]
var reportedPubkey string
var reportType string = "other" // default
for _, tag := range *ev.Tags {
if len(tag.T) >= 2 && string(tag.T[0]) == "p" {
reportedPubkey = string(tag.T[1])
if len(tag.T) >= 3 {
reportType = string(tag.T[2])
}
break // Use first p-tag
}
}
if reportedPubkey == "" {
p.db.Logger.Warningf("report event %s has no p-tag, skipping", eventID[:16])
return nil
}
// Create REPORTS relationship
// Note: WITH is required between CREATE and MERGE in Cypher
cypher := `
// Create event tracking node
CREATE (evt:ProcessedSocialEvent {
event_id: $event_id,
event_kind: 1984,
pubkey: $reporter_pubkey,
created_at: $created_at,
processed_at: timestamp(),
relationship_count: 1,
superseded_by: null
})
// WITH required to transition from CREATE to MERGE
WITH evt
// Create or get reporter and reported users
MERGE (reporter:NostrUser {pubkey: $reporter_pubkey})
MERGE (reported:NostrUser {pubkey: $reported_pubkey})
// Create REPORTS relationship
CREATE (reporter)-[:REPORTS {
created_by_event: $event_id,
created_at: $created_at,
relay_received_at: timestamp(),
report_type: $report_type
}]->(reported)
`
params := map[string]any{
"event_id": eventID,
"reporter_pubkey": reporterPubkey,
"reported_pubkey": reportedPubkey,
"created_at": ev.CreatedAt,
"report_type": reportType,
}
_, err := p.db.ExecuteWrite(ctx, cypher, params)
if err != nil {
return fmt.Errorf("failed to create report: %w", err)
}
p.db.Logger.Infof("processed report: reporter=%s, reported=%s, type=%s",
reporterPubkey[:16], reportedPubkey[:16], reportType)
return nil
}
// UpdateContactListParams holds parameters for contact list graph update
type UpdateContactListParams struct {
AuthorPubkey string
NewEventID string
OldEventID string
CreatedAt int64
AddedFollows []string
RemovedFollows []string
TotalFollows int
}
// updateContactListGraph performs atomic graph update for contact list changes
func (p *SocialEventProcessor) updateContactListGraph(ctx context.Context, params UpdateContactListParams) error {
// Note: WITH is required between CREATE and MERGE in Cypher
cypher := `
// Mark old event as superseded (if exists)
OPTIONAL MATCH (old:ProcessedSocialEvent {event_id: $old_event_id})
SET old.superseded_by = $new_event_id
// Create new event tracking node
// WITH required after OPTIONAL MATCH + SET before CREATE
WITH old
CREATE (new:ProcessedSocialEvent {
event_id: $new_event_id,
event_kind: 3,
pubkey: $author_pubkey,
created_at: $created_at,
processed_at: timestamp(),
relationship_count: $total_follows,
superseded_by: null
})
// WITH required to transition from CREATE to MERGE
WITH new
// Get or create author node
MERGE (author:NostrUser {pubkey: $author_pubkey})
// Update unchanged FOLLOWS relationships to point to new event
// (so they remain visible when filtering by non-superseded events)
WITH author
OPTIONAL MATCH (author)-[unchanged:FOLLOWS]->(followed:NostrUser)
WHERE unchanged.created_by_event = $old_event_id
AND NOT followed.pubkey IN $removed_follows
SET unchanged.created_by_event = $new_event_id,
unchanged.created_at = $created_at
// Remove old FOLLOWS relationships for removed follows
WITH author
OPTIONAL MATCH (author)-[old_follows:FOLLOWS]->(followed:NostrUser)
WHERE old_follows.created_by_event = $old_event_id
AND followed.pubkey IN $removed_follows
DELETE old_follows
// Create new FOLLOWS relationships for added follows
WITH author
UNWIND $added_follows AS followed_pubkey
MERGE (followed:NostrUser {pubkey: followed_pubkey})
MERGE (author)-[new_follows:FOLLOWS]->(followed)
ON CREATE SET
new_follows.created_by_event = $new_event_id,
new_follows.created_at = $created_at,
new_follows.relay_received_at = timestamp()
ON MATCH SET
new_follows.created_by_event = $new_event_id,
new_follows.created_at = $created_at
`
cypherParams := map[string]any{
"author_pubkey": params.AuthorPubkey,
"new_event_id": params.NewEventID,
"old_event_id": params.OldEventID,
"created_at": params.CreatedAt,
"total_follows": params.TotalFollows,
"added_follows": params.AddedFollows,
"removed_follows": params.RemovedFollows,
}
_, err := p.db.ExecuteWrite(ctx, cypher, cypherParams)
return err
}
// UpdateMuteListParams holds parameters for mute list graph update
type UpdateMuteListParams struct {
AuthorPubkey string
NewEventID string
OldEventID string
CreatedAt int64
AddedMutes []string
RemovedMutes []string
TotalMutes int
}
// updateMuteListGraph performs atomic graph update for mute list changes
func (p *SocialEventProcessor) updateMuteListGraph(ctx context.Context, params UpdateMuteListParams) error {
// Note: WITH is required between CREATE and MERGE in Cypher
cypher := `
// Mark old event as superseded (if exists)
OPTIONAL MATCH (old:ProcessedSocialEvent {event_id: $old_event_id})
SET old.superseded_by = $new_event_id
// Create new event tracking node
// WITH required after OPTIONAL MATCH + SET before CREATE
WITH old
CREATE (new:ProcessedSocialEvent {
event_id: $new_event_id,
event_kind: 10000,
pubkey: $author_pubkey,
created_at: $created_at,
processed_at: timestamp(),
relationship_count: $total_mutes,
superseded_by: null
})
// WITH required to transition from CREATE to MERGE
WITH new
// Get or create author node
MERGE (author:NostrUser {pubkey: $author_pubkey})
// Update unchanged MUTES relationships to point to new event
WITH author
OPTIONAL MATCH (author)-[unchanged:MUTES]->(muted:NostrUser)
WHERE unchanged.created_by_event = $old_event_id
AND NOT muted.pubkey IN $removed_mutes
SET unchanged.created_by_event = $new_event_id,
unchanged.created_at = $created_at
// Remove old MUTES relationships
WITH author
OPTIONAL MATCH (author)-[old_mutes:MUTES]->(muted:NostrUser)
WHERE old_mutes.created_by_event = $old_event_id
AND muted.pubkey IN $removed_mutes
DELETE old_mutes
// Create new MUTES relationships
WITH author
UNWIND $added_mutes AS muted_pubkey
MERGE (muted:NostrUser {pubkey: muted_pubkey})
MERGE (author)-[new_mutes:MUTES]->(muted)
ON CREATE SET
new_mutes.created_by_event = $new_event_id,
new_mutes.created_at = $created_at,
new_mutes.relay_received_at = timestamp()
ON MATCH SET
new_mutes.created_by_event = $new_event_id,
new_mutes.created_at = $created_at
`
cypherParams := map[string]any{
"author_pubkey": params.AuthorPubkey,
"new_event_id": params.NewEventID,
"old_event_id": params.OldEventID,
"created_at": params.CreatedAt,
"total_mutes": params.TotalMutes,
"added_mutes": params.AddedMutes,
"removed_mutes": params.RemovedMutes,
}
_, err := p.db.ExecuteWrite(ctx, cypher, cypherParams)
return err
}
// getLatestSocialEvent retrieves the most recent non-superseded event of a given kind for a pubkey
func (p *SocialEventProcessor) getLatestSocialEvent(ctx context.Context, pubkey string, kind int) (*ProcessedSocialEvent, error) {
cypher := `
MATCH (evt:ProcessedSocialEvent {pubkey: $pubkey, event_kind: $kind})
WHERE evt.superseded_by IS NULL
RETURN evt.event_id AS event_id,
evt.created_at AS created_at,
evt.relationship_count AS relationship_count
ORDER BY evt.created_at DESC
LIMIT 1
`
params := map[string]any{
"pubkey": pubkey,
"kind": kind,
}
result, err := p.db.ExecuteRead(ctx, cypher, params)
if err != nil {
return nil, err
}
if result.Next(ctx) {
record := result.Record()
return &ProcessedSocialEvent{
EventID: record.Values[0].(string),
CreatedAt: record.Values[1].(int64),
RelationshipCount: int(record.Values[2].(int64)),
}, nil
}
return nil, nil // No existing event
}
// getFollowsForEvent retrieves the list of followed pubkeys for a specific event
func (p *SocialEventProcessor) getFollowsForEvent(ctx context.Context, eventID string) ([]string, error) {
cypher := `
MATCH (author:NostrUser)-[f:FOLLOWS]->(followed:NostrUser)
WHERE f.created_by_event = $event_id
RETURN collect(followed.pubkey) AS pubkeys
`
params := map[string]any{
"event_id": eventID,
}
result, err := p.db.ExecuteRead(ctx, cypher, params)
if err != nil {
return nil, err
}
if result.Next(ctx) {
record := result.Record()
pubkeysRaw := record.Values[0].([]interface{})
pubkeys := make([]string, len(pubkeysRaw))
for i, p := range pubkeysRaw {
pubkeys[i] = p.(string)
}
return pubkeys, nil
}
return []string{}, nil
}
// getMutesForEvent retrieves the list of muted pubkeys for a specific event
func (p *SocialEventProcessor) getMutesForEvent(ctx context.Context, eventID string) ([]string, error) {
cypher := `
MATCH (author:NostrUser)-[m:MUTES]->(muted:NostrUser)
WHERE m.created_by_event = $event_id
RETURN collect(muted.pubkey) AS pubkeys
`
params := map[string]any{
"event_id": eventID,
}
result, err := p.db.ExecuteRead(ctx, cypher, params)
if err != nil {
return nil, err
}
if result.Next(ctx) {
record := result.Record()
pubkeysRaw := record.Values[0].([]interface{})
pubkeys := make([]string, len(pubkeysRaw))
for i, p := range pubkeysRaw {
pubkeys[i] = p.(string)
}
return pubkeys, nil
}
return []string{}, nil
}
// BatchProcessContactLists processes multiple contact list events in order
func (p *SocialEventProcessor) BatchProcessContactLists(ctx context.Context, events []*event.E) error {
// Group by author
byAuthor := make(map[string][]*event.E)
for _, ev := range events {
if ev.Kind != 3 {
continue
}
pubkey := hex.Enc(ev.Pubkey[:])
byAuthor[pubkey] = append(byAuthor[pubkey], ev)
}
// Process each author's events in chronological order
for pubkey, authorEvents := range byAuthor {
// Sort by created_at (oldest first)
sort.Slice(authorEvents, func(i, j int) bool {
return authorEvents[i].CreatedAt < authorEvents[j].CreatedAt
})
// Process in order
for _, ev := range authorEvents {
if err := p.processContactList(ctx, ev); err != nil {
return fmt.Errorf("batch process failed for %s: %w", pubkey, err)
}
}
}
return nil
}
// Helper functions
// extractPTags extracts unique pubkeys from p-tags
func extractPTags(ev *event.E) []string {
seen := make(map[string]bool)
var pubkeys []string
for _, tag := range *ev.Tags {
if len(tag.T) >= 2 && string(tag.T[0]) == "p" {
pubkey := string(tag.T[1])
if len(pubkey) == 64 && !seen[pubkey] { // Basic validation: 64 hex chars
seen[pubkey] = true
pubkeys = append(pubkeys, pubkey)
}
}
}
return pubkeys
}
// diffStringSlices computes added and removed elements between old and new slices
func diffStringSlices(old, new []string) (added, removed []string) {
oldSet := make(map[string]bool)
for _, s := range old {
oldSet[s] = true
}
newSet := make(map[string]bool)
for _, s := range new {
newSet[s] = true
if !oldSet[s] {
added = append(added, s)
}
}
for _, s := range old {
if !newSet[s] {
removed = append(removed, s)
}
}
return
}
// getStringFromMap safely extracts a string value from a map
func getStringFromMap(m map[string]interface{}, key string) string {
if val, ok := m[key]; ok {
if str, ok := val.(string); ok {
return str
}
}
return ""
}