Add curation ACL mode and complete graph query implementation (v0.47.0)
Some checks failed
Go / build-and-release (push) Has been cancelled

Curation Mode:
- Three-tier publisher classification: Trusted, Blacklisted, Unclassified
- Per-pubkey rate limiting (default 50/day) for unclassified users
- IP flood protection (default 500/day) with automatic banning
- Event kind allow-listing via categories, ranges, and custom kinds
- Query filtering hides blacklisted pubkey events (admin/owner exempt)
- Web UI for managing trusted/blacklisted pubkeys and configuration
- NIP-86 API endpoints for all curation management operations

Graph Query Extension:
- Complete reference aggregation for Badger and Neo4j backends
- E-tag graph backfill migration (v8) runs automatically on startup
- Configuration options: ORLY_GRAPH_QUERIES_ENABLED, MAX_DEPTH, etc.
- NIP-11 advertisement of graph query capabilities

Files modified:
- app/handle-nip86-curating.go: NIP-86 curation API handlers (new)
- app/web/src/CurationView.svelte: Curation management UI (new)
- app/web/src/kindCategories.js: Kind category definitions (new)
- pkg/acl/curating.go: Curating ACL implementation (new)
- pkg/database/curating-acl.go: Database layer for curation (new)
- pkg/neo4j/graph-refs.go: Neo4j ref collection (new)
- pkg/database/migrations.go: E-tag graph backfill migration
- pkg/protocol/graph/executor.go: Reference aggregation support
- app/handle-event.go: Curation config event processing
- app/handle-req.go: Blacklist filtering for queries
- docs/GRAPH_QUERIES_REMAINING_PLAN.md: Updated completion status

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
woikos
2026-01-05 21:42:17 +01:00
parent ea7bc75fac
commit 047cdf3472
28 changed files with 5350 additions and 35 deletions

699
pkg/acl/curating.go Normal file
View File

@@ -0,0 +1,699 @@
package acl
import (
"context"
"encoding/hex"
"reflect"
"strconv"
"strings"
"sync"
"time"
"lol.mleku.dev/chk"
"lol.mleku.dev/errorf"
"lol.mleku.dev/log"
"next.orly.dev/app/config"
"next.orly.dev/pkg/database"
"git.mleku.dev/mleku/nostr/encoders/bech32encoding"
"git.mleku.dev/mleku/nostr/encoders/event"
"next.orly.dev/pkg/utils"
)
// Default values for curating mode
const (
DefaultDailyLimit = 50
DefaultIPDailyLimit = 500 // Max events per IP per day (flood protection)
DefaultFirstBanHours = 1
DefaultSecondBanHours = 168 // 1 week
CuratingConfigKind = 30078
CuratingConfigDTag = "curating-config"
)
// Curating implements the curating ACL mode with three-tier publisher classification:
// - Trusted: Unlimited publishing
// - Blacklisted: Cannot publish
// - Unclassified: Rate-limited publishing (default 50/day)
type Curating struct {
Ctx context.Context
cfg *config.C
db *database.D
curatingACL *database.CuratingACL
owners [][]byte
admins [][]byte
mx sync.RWMutex
// In-memory caches for performance
trustedCache map[string]bool
blacklistedCache map[string]bool
kindCache map[int]bool
configCache *database.CuratingConfig
cacheMx sync.RWMutex
}
func (c *Curating) Configure(cfg ...any) (err error) {
log.I.F("configuring curating ACL")
for _, ca := range cfg {
switch cv := ca.(type) {
case *config.C:
c.cfg = cv
case *database.D:
c.db = cv
c.curatingACL = database.NewCuratingACL(cv)
case context.Context:
c.Ctx = cv
default:
err = errorf.E("invalid type: %T", reflect.TypeOf(ca))
}
}
if c.cfg == nil || c.db == nil {
err = errorf.E("both config and database must be set")
return
}
// Initialize caches
c.trustedCache = make(map[string]bool)
c.blacklistedCache = make(map[string]bool)
c.kindCache = make(map[int]bool)
// Load owners from config
for _, owner := range c.cfg.Owners {
var own []byte
if o, e := bech32encoding.NpubOrHexToPublicKeyBinary(owner); chk.E(e) {
continue
} else {
own = o
}
c.owners = append(c.owners, own)
}
// Load admins from config
for _, admin := range c.cfg.Admins {
var adm []byte
if a, e := bech32encoding.NpubOrHexToPublicKeyBinary(admin); chk.E(e) {
continue
} else {
adm = a
}
c.admins = append(c.admins, adm)
}
// Refresh caches from database
if err = c.RefreshCaches(); err != nil {
log.W.F("curating ACL: failed to refresh caches: %v", err)
}
return nil
}
func (c *Curating) GetAccessLevel(pub []byte, address string) (level string) {
c.mx.RLock()
defer c.mx.RUnlock()
pubkeyHex := hex.EncodeToString(pub)
// Check owners first
for _, v := range c.owners {
if utils.FastEqual(v, pub) {
return "owner"
}
}
// Check admins
for _, v := range c.admins {
if utils.FastEqual(v, pub) {
return "admin"
}
}
// Check if IP is blocked
if address != "" {
blocked, _, err := c.curatingACL.IsIPBlocked(address)
if err == nil && blocked {
return "blocked"
}
}
// Check if pubkey is blacklisted (check cache first)
c.cacheMx.RLock()
if c.blacklistedCache[pubkeyHex] {
c.cacheMx.RUnlock()
return "banned"
}
c.cacheMx.RUnlock()
// Double-check database for blacklisted
blacklisted, _ := c.curatingACL.IsPubkeyBlacklisted(pubkeyHex)
if blacklisted {
// Update cache
c.cacheMx.Lock()
c.blacklistedCache[pubkeyHex] = true
c.cacheMx.Unlock()
return "banned"
}
// All other users get write access (rate limiting handled in CheckPolicy)
return "write"
}
// CheckPolicy implements the PolicyChecker interface for event-level filtering
func (c *Curating) CheckPolicy(ev *event.E) (allowed bool, err error) {
pubkeyHex := hex.EncodeToString(ev.Pubkey)
// Check if configured
config, err := c.GetConfig()
if err != nil {
return false, errorf.E("failed to get config: %v", err)
}
if config.ConfigEventID == "" {
return false, errorf.E("curating mode not configured: please publish a configuration event")
}
// Check if event is spam-flagged
isSpam, _ := c.curatingACL.IsEventSpam(hex.EncodeToString(ev.ID[:]))
if isSpam {
return false, errorf.E("blocked: event is flagged as spam")
}
// Check if event kind is allowed
if !c.curatingACL.IsKindAllowed(int(ev.Kind), &config) {
return false, errorf.E("blocked: event kind %d is not in the allow list", ev.Kind)
}
// Check if pubkey is blacklisted
c.cacheMx.RLock()
isBlacklisted := c.blacklistedCache[pubkeyHex]
c.cacheMx.RUnlock()
if !isBlacklisted {
isBlacklisted, _ = c.curatingACL.IsPubkeyBlacklisted(pubkeyHex)
}
if isBlacklisted {
return false, errorf.E("blocked: pubkey is blacklisted")
}
// Check if pubkey is trusted (bypass rate limiting)
c.cacheMx.RLock()
isTrusted := c.trustedCache[pubkeyHex]
c.cacheMx.RUnlock()
if !isTrusted {
isTrusted, _ = c.curatingACL.IsPubkeyTrusted(pubkeyHex)
if isTrusted {
// Update cache
c.cacheMx.Lock()
c.trustedCache[pubkeyHex] = true
c.cacheMx.Unlock()
}
}
if isTrusted {
return true, nil
}
// Check if owner or admin (bypass rate limiting)
for _, v := range c.owners {
if utils.FastEqual(v, ev.Pubkey) {
return true, nil
}
}
for _, v := range c.admins {
if utils.FastEqual(v, ev.Pubkey) {
return true, nil
}
}
// For unclassified users, check rate limit
today := time.Now().Format("2006-01-02")
dailyLimit := config.DailyLimit
if dailyLimit == 0 {
dailyLimit = DefaultDailyLimit
}
count, err := c.curatingACL.GetEventCount(pubkeyHex, today)
if err != nil {
log.W.F("curating ACL: failed to get event count: %v", err)
count = 0
}
if count >= dailyLimit {
return false, errorf.E("rate limit exceeded: maximum %d events per day for unclassified users", dailyLimit)
}
// Increment the counter
_, err = c.curatingACL.IncrementEventCount(pubkeyHex, today)
if err != nil {
log.W.F("curating ACL: failed to increment event count: %v", err)
}
return true, nil
}
// RateLimitCheck checks if an unclassified user can publish and handles IP tracking
// This is called separately when we have access to the IP address
func (c *Curating) RateLimitCheck(pubkeyHex, ip string) (allowed bool, message string, err error) {
config, err := c.GetConfig()
if err != nil {
return false, "", errorf.E("failed to get config: %v", err)
}
today := time.Now().Format("2006-01-02")
// Check IP flood limit first (applies to all non-trusted users from this IP)
if ip != "" {
ipDailyLimit := config.IPDailyLimit
if ipDailyLimit == 0 {
ipDailyLimit = DefaultIPDailyLimit
}
ipCount, err := c.curatingACL.GetIPEventCount(ip, today)
if err != nil {
ipCount = 0
}
if ipCount >= ipDailyLimit {
// IP has exceeded flood limit - record offense and ban
c.recordIPOffenseAndBan(ip, pubkeyHex, config, "IP flood limit exceeded")
return false, "rate limit exceeded: too many events from this IP address", nil
}
}
// Check per-pubkey daily limit
dailyLimit := config.DailyLimit
if dailyLimit == 0 {
dailyLimit = DefaultDailyLimit
}
count, err := c.curatingACL.GetEventCount(pubkeyHex, today)
if err != nil {
count = 0
}
if count >= dailyLimit {
// Record IP offense and potentially ban
if ip != "" {
c.recordIPOffenseAndBan(ip, pubkeyHex, config, "pubkey rate limit exceeded")
}
return false, "rate limit exceeded: maximum events per day for unclassified users", nil
}
// Increment IP event count for flood tracking (only for non-trusted users)
if ip != "" {
_, _ = c.curatingACL.IncrementIPEventCount(ip, today)
}
return true, "", nil
}
// recordIPOffenseAndBan records an offense for an IP and applies a ban if warranted
func (c *Curating) recordIPOffenseAndBan(ip, pubkeyHex string, config database.CuratingConfig, reason string) {
offenseCount, _ := c.curatingACL.RecordIPOffense(ip, pubkeyHex)
if offenseCount > 0 {
firstBanHours := config.FirstBanHours
if firstBanHours == 0 {
firstBanHours = DefaultFirstBanHours
}
secondBanHours := config.SecondBanHours
if secondBanHours == 0 {
secondBanHours = DefaultSecondBanHours
}
var banDuration time.Duration
if offenseCount >= 2 {
banDuration = time.Duration(secondBanHours) * time.Hour
log.W.F("curating ACL: IP %s banned for %d hours (offense #%d, reason: %s)", ip, secondBanHours, offenseCount, reason)
} else {
banDuration = time.Duration(firstBanHours) * time.Hour
log.W.F("curating ACL: IP %s banned for %d hours (offense #%d, reason: %s)", ip, firstBanHours, offenseCount, reason)
}
c.curatingACL.BlockIP(ip, banDuration, reason)
}
}
func (c *Curating) GetACLInfo() (name, description, documentation string) {
return "curating", "curated relay with rate-limited unclassified publishers",
`Curating ACL mode provides three-tier publisher classification:
- Trusted: Unlimited publishing, explicitly marked by admin
- Blacklisted: Cannot publish, events rejected
- Unclassified: Default state, rate-limited (default 50 events/day)
Features:
- Per-pubkey daily rate limiting for unclassified users (default 50/day)
- Per-IP daily rate limiting for flood protection (default 500/day)
- IP-based spam detection (tracks multiple rate-limited pubkeys)
- Automatic IP bans (1-hour first offense, 1-week second offense)
- Event kind allow-listing for content control
- Spam flagging (events hidden from queries without deletion)
Configuration via kind 30078 event with d-tag "curating-config".
The relay will not accept events until configured.
Management through NIP-86 API endpoints:
- trustpubkey, untrustpubkey, listtrustedpubkeys
- blacklistpubkey, unblacklistpubkey, listblacklistedpubkeys
- listunclassifiedusers
- markspam, unmarkspam, listspamevents
- setallowedkindcategories, getallowedkindcategories`
}
func (c *Curating) Type() string { return "curating" }
// IsEventVisible checks if an event should be visible to the given access level.
// Events from blacklisted pubkeys are only visible to admin/owner.
func (c *Curating) IsEventVisible(ev *event.E, accessLevel string) bool {
// Admin and owner can see all events
if accessLevel == "admin" || accessLevel == "owner" {
return true
}
// Check if the event author is blacklisted
pubkeyHex := hex.EncodeToString(ev.Pubkey)
// Check cache first
c.cacheMx.RLock()
isBlacklisted := c.blacklistedCache[pubkeyHex]
c.cacheMx.RUnlock()
if isBlacklisted {
return false
}
// Check database if not in cache
if blacklisted, _ := c.curatingACL.IsPubkeyBlacklisted(pubkeyHex); blacklisted {
c.cacheMx.Lock()
c.blacklistedCache[pubkeyHex] = true
c.cacheMx.Unlock()
return false
}
return true
}
// FilterVisibleEvents filters a list of events, removing those from blacklisted pubkeys.
// Returns only events visible to the given access level.
func (c *Curating) FilterVisibleEvents(events []*event.E, accessLevel string) []*event.E {
// Admin and owner can see all events
if accessLevel == "admin" || accessLevel == "owner" {
return events
}
// Filter out events from blacklisted pubkeys
visible := make([]*event.E, 0, len(events))
for _, ev := range events {
if c.IsEventVisible(ev, accessLevel) {
visible = append(visible, ev)
}
}
return visible
}
// GetCuratingACL returns the database ACL instance for direct access
func (c *Curating) GetCuratingACL() *database.CuratingACL {
return c.curatingACL
}
func (c *Curating) Syncer() {
log.I.F("starting curating ACL syncer")
// Start background cleanup goroutine
go c.backgroundCleanup()
}
// backgroundCleanup periodically cleans up expired data
func (c *Curating) backgroundCleanup() {
// Run cleanup every hour
ticker := time.NewTicker(time.Hour)
defer ticker.Stop()
for {
select {
case <-c.Ctx.Done():
log.D.F("curating ACL background cleanup stopped")
return
case <-ticker.C:
c.runCleanup()
}
}
}
func (c *Curating) runCleanup() {
log.D.F("curating ACL: running background cleanup")
// Clean up expired IP blocks
if err := c.curatingACL.CleanupExpiredIPBlocks(); err != nil {
log.W.F("curating ACL: failed to cleanup expired IP blocks: %v", err)
}
// Clean up old event counts (older than 7 days)
cutoffDate := time.Now().AddDate(0, 0, -7).Format("2006-01-02")
if err := c.curatingACL.CleanupOldEventCounts(cutoffDate); err != nil {
log.W.F("curating ACL: failed to cleanup old event counts: %v", err)
}
// Refresh caches
if err := c.RefreshCaches(); err != nil {
log.W.F("curating ACL: failed to refresh caches: %v", err)
}
}
// RefreshCaches refreshes all in-memory caches from the database
func (c *Curating) RefreshCaches() error {
c.cacheMx.Lock()
defer c.cacheMx.Unlock()
// Refresh trusted pubkeys cache
trusted, err := c.curatingACL.ListTrustedPubkeys()
if err != nil {
return errorf.E("failed to list trusted pubkeys: %v", err)
}
c.trustedCache = make(map[string]bool)
for _, t := range trusted {
c.trustedCache[t.Pubkey] = true
}
// Refresh blacklisted pubkeys cache
blacklisted, err := c.curatingACL.ListBlacklistedPubkeys()
if err != nil {
return errorf.E("failed to list blacklisted pubkeys: %v", err)
}
c.blacklistedCache = make(map[string]bool)
for _, b := range blacklisted {
c.blacklistedCache[b.Pubkey] = true
}
// Refresh config cache
config, err := c.curatingACL.GetConfig()
if err != nil {
return errorf.E("failed to get config: %v", err)
}
c.configCache = &config
// Refresh allowed kinds cache
c.kindCache = make(map[int]bool)
for _, k := range config.AllowedKinds {
c.kindCache[k] = true
}
log.D.F("curating ACL: caches refreshed - %d trusted, %d blacklisted, %d allowed kinds",
len(c.trustedCache), len(c.blacklistedCache), len(c.kindCache))
return nil
}
// GetConfig returns the current configuration
func (c *Curating) GetConfig() (database.CuratingConfig, error) {
c.cacheMx.RLock()
if c.configCache != nil {
config := *c.configCache
c.cacheMx.RUnlock()
return config, nil
}
c.cacheMx.RUnlock()
return c.curatingACL.GetConfig()
}
// IsConfigured returns true if the relay has been configured
func (c *Curating) IsConfigured() (bool, error) {
return c.curatingACL.IsConfigured()
}
// ProcessConfigEvent processes a kind 30078 event to extract curating configuration
func (c *Curating) ProcessConfigEvent(ev *event.E) error {
if ev.Kind != CuratingConfigKind {
return errorf.E("invalid event kind: expected %d, got %d", CuratingConfigKind, ev.Kind)
}
// Check d-tag
dTag := ev.Tags.GetFirst([]byte("d"))
if dTag == nil || string(dTag.Value()) != CuratingConfigDTag {
return errorf.E("invalid d-tag: expected %s", CuratingConfigDTag)
}
// Check if pubkey is owner or admin
pubkeyHex := hex.EncodeToString(ev.Pubkey)
isOwner := false
isAdmin := false
for _, v := range c.owners {
if utils.FastEqual(v, ev.Pubkey) {
isOwner = true
break
}
}
if !isOwner {
for _, v := range c.admins {
if utils.FastEqual(v, ev.Pubkey) {
isAdmin = true
break
}
}
}
if !isOwner && !isAdmin {
return errorf.E("config event must be from owner or admin")
}
// Parse configuration from tags
config := database.CuratingConfig{
ConfigEventID: hex.EncodeToString(ev.ID[:]),
ConfigPubkey: pubkeyHex,
ConfiguredAt: ev.CreatedAt,
DailyLimit: DefaultDailyLimit,
FirstBanHours: DefaultFirstBanHours,
SecondBanHours: DefaultSecondBanHours,
}
for _, tag := range *ev.Tags {
if tag.Len() < 2 {
continue
}
key := string(tag.Key())
value := string(tag.Value())
switch key {
case "daily_limit":
if v, err := strconv.Atoi(value); err == nil && v > 0 {
config.DailyLimit = v
}
case "ip_daily_limit":
if v, err := strconv.Atoi(value); err == nil && v > 0 {
config.IPDailyLimit = v
}
case "first_ban_hours":
if v, err := strconv.Atoi(value); err == nil && v > 0 {
config.FirstBanHours = v
}
case "second_ban_hours":
if v, err := strconv.Atoi(value); err == nil && v > 0 {
config.SecondBanHours = v
}
case "kind_category":
config.KindCategories = append(config.KindCategories, value)
case "kind_range":
config.AllowedRanges = append(config.AllowedRanges, value)
case "kind":
if k, err := strconv.Atoi(value); err == nil {
config.AllowedKinds = append(config.AllowedKinds, k)
}
}
}
// Save configuration
if err := c.curatingACL.SaveConfig(config); err != nil {
return errorf.E("failed to save config: %v", err)
}
// Refresh caches
c.cacheMx.Lock()
c.configCache = &config
c.cacheMx.Unlock()
log.I.F("curating ACL: configuration updated from event %s by %s",
config.ConfigEventID, config.ConfigPubkey)
return nil
}
// IsTrusted checks if a pubkey is trusted
func (c *Curating) IsTrusted(pubkeyHex string) bool {
c.cacheMx.RLock()
if c.trustedCache[pubkeyHex] {
c.cacheMx.RUnlock()
return true
}
c.cacheMx.RUnlock()
trusted, _ := c.curatingACL.IsPubkeyTrusted(pubkeyHex)
return trusted
}
// IsBlacklisted checks if a pubkey is blacklisted
func (c *Curating) IsBlacklisted(pubkeyHex string) bool {
c.cacheMx.RLock()
if c.blacklistedCache[pubkeyHex] {
c.cacheMx.RUnlock()
return true
}
c.cacheMx.RUnlock()
blacklisted, _ := c.curatingACL.IsPubkeyBlacklisted(pubkeyHex)
return blacklisted
}
// TrustPubkey adds a pubkey to the trusted list
func (c *Curating) TrustPubkey(pubkeyHex, note string) error {
pubkeyHex = strings.ToLower(pubkeyHex)
if err := c.curatingACL.SaveTrustedPubkey(pubkeyHex, note); err != nil {
return err
}
// Update cache
c.cacheMx.Lock()
c.trustedCache[pubkeyHex] = true
delete(c.blacklistedCache, pubkeyHex) // Remove from blacklist cache if present
c.cacheMx.Unlock()
// Also remove from blacklist in DB
c.curatingACL.RemoveBlacklistedPubkey(pubkeyHex)
return nil
}
// UntrustPubkey removes a pubkey from the trusted list
func (c *Curating) UntrustPubkey(pubkeyHex string) error {
pubkeyHex = strings.ToLower(pubkeyHex)
if err := c.curatingACL.RemoveTrustedPubkey(pubkeyHex); err != nil {
return err
}
// Update cache
c.cacheMx.Lock()
delete(c.trustedCache, pubkeyHex)
c.cacheMx.Unlock()
return nil
}
// BlacklistPubkey adds a pubkey to the blacklist
func (c *Curating) BlacklistPubkey(pubkeyHex, reason string) error {
pubkeyHex = strings.ToLower(pubkeyHex)
if err := c.curatingACL.SaveBlacklistedPubkey(pubkeyHex, reason); err != nil {
return err
}
// Update cache
c.cacheMx.Lock()
c.blacklistedCache[pubkeyHex] = true
delete(c.trustedCache, pubkeyHex) // Remove from trusted cache if present
c.cacheMx.Unlock()
// Also remove from trusted list in DB
c.curatingACL.RemoveTrustedPubkey(pubkeyHex)
return nil
}
// UnblacklistPubkey removes a pubkey from the blacklist
func (c *Curating) UnblacklistPubkey(pubkeyHex string) error {
pubkeyHex = strings.ToLower(pubkeyHex)
if err := c.curatingACL.RemoveBlacklistedPubkey(pubkeyHex); err != nil {
return err
}
// Update cache
c.cacheMx.Lock()
delete(c.blacklistedCache, pubkeyHex)
c.cacheMx.Unlock()
return nil
}
func init() {
Registry.Register(new(Curating))
}

View File

@@ -0,0 +1,989 @@
//go:build !(js && wasm)
package database
import (
"bytes"
"encoding/json"
"fmt"
"sort"
"time"
"github.com/dgraph-io/badger/v4"
)
// CuratingConfig represents the configuration for curating ACL mode
// This is parsed from a kind 30078 event with d-tag "curating-config"
type CuratingConfig struct {
DailyLimit int `json:"daily_limit"` // Max events per day for unclassified users
IPDailyLimit int `json:"ip_daily_limit"` // Max events per day from a single IP (flood protection)
FirstBanHours int `json:"first_ban_hours"` // IP ban duration for first offense
SecondBanHours int `json:"second_ban_hours"` // IP ban duration for second+ offense
AllowedKinds []int `json:"allowed_kinds"` // Explicit kind numbers
AllowedRanges []string `json:"allowed_ranges"` // Kind ranges like "1000-1999"
KindCategories []string `json:"kind_categories"` // Category IDs like "social", "dm"
ConfigEventID string `json:"config_event_id"` // ID of the config event
ConfigPubkey string `json:"config_pubkey"` // Pubkey that published config
ConfiguredAt int64 `json:"configured_at"` // Timestamp of config event
}
// TrustedPubkey represents an explicitly trusted publisher
type TrustedPubkey struct {
Pubkey string `json:"pubkey"`
Note string `json:"note,omitempty"`
Added time.Time `json:"added"`
}
// BlacklistedPubkey represents a blacklisted publisher
type BlacklistedPubkey struct {
Pubkey string `json:"pubkey"`
Reason string `json:"reason,omitempty"`
Added time.Time `json:"added"`
}
// PubkeyEventCount tracks daily event counts for rate limiting
type PubkeyEventCount struct {
Pubkey string `json:"pubkey"`
Date string `json:"date"` // YYYY-MM-DD format
Count int `json:"count"`
LastEvent time.Time `json:"last_event"`
}
// IPOffense tracks rate limit violations from IPs
type IPOffense struct {
IP string `json:"ip"`
OffenseCount int `json:"offense_count"`
PubkeysHit []string `json:"pubkeys_hit"` // Pubkeys that hit rate limit from this IP
LastOffense time.Time `json:"last_offense"`
}
// CuratingBlockedIP represents a temporarily blocked IP with expiration
type CuratingBlockedIP struct {
IP string `json:"ip"`
Reason string `json:"reason"`
ExpiresAt time.Time `json:"expires_at"`
Added time.Time `json:"added"`
}
// SpamEvent represents an event flagged as spam
type SpamEvent struct {
EventID string `json:"event_id"`
Pubkey string `json:"pubkey"`
Reason string `json:"reason,omitempty"`
Added time.Time `json:"added"`
}
// UnclassifiedUser represents a user who hasn't been trusted or blacklisted
type UnclassifiedUser struct {
Pubkey string `json:"pubkey"`
EventCount int `json:"event_count"`
LastEvent time.Time `json:"last_event"`
}
// CuratingACL database operations
type CuratingACL struct {
*D
}
// NewCuratingACL creates a new CuratingACL instance
func NewCuratingACL(db *D) *CuratingACL {
return &CuratingACL{D: db}
}
// ==================== Configuration ====================
// SaveConfig saves the curating configuration
func (c *CuratingACL) SaveConfig(config CuratingConfig) error {
return c.Update(func(txn *badger.Txn) error {
key := c.getConfigKey()
data, err := json.Marshal(config)
if err != nil {
return err
}
return txn.Set(key, data)
})
}
// GetConfig returns the curating configuration
func (c *CuratingACL) GetConfig() (CuratingConfig, error) {
var config CuratingConfig
err := c.View(func(txn *badger.Txn) error {
key := c.getConfigKey()
item, err := txn.Get(key)
if err != nil {
if err == badger.ErrKeyNotFound {
return nil // Return empty config
}
return err
}
val, err := item.ValueCopy(nil)
if err != nil {
return err
}
return json.Unmarshal(val, &config)
})
return config, err
}
// IsConfigured returns true if a configuration event has been set
func (c *CuratingACL) IsConfigured() (bool, error) {
config, err := c.GetConfig()
if err != nil {
return false, err
}
return config.ConfigEventID != "", nil
}
// ==================== Trusted Pubkeys ====================
// SaveTrustedPubkey saves a trusted pubkey to the database
func (c *CuratingACL) SaveTrustedPubkey(pubkey string, note string) error {
return c.Update(func(txn *badger.Txn) error {
key := c.getTrustedPubkeyKey(pubkey)
trusted := TrustedPubkey{
Pubkey: pubkey,
Note: note,
Added: time.Now(),
}
data, err := json.Marshal(trusted)
if err != nil {
return err
}
return txn.Set(key, data)
})
}
// RemoveTrustedPubkey removes a trusted pubkey from the database
func (c *CuratingACL) RemoveTrustedPubkey(pubkey string) error {
return c.Update(func(txn *badger.Txn) error {
key := c.getTrustedPubkeyKey(pubkey)
return txn.Delete(key)
})
}
// ListTrustedPubkeys returns all trusted pubkeys
func (c *CuratingACL) ListTrustedPubkeys() ([]TrustedPubkey, error) {
var trusted []TrustedPubkey
err := c.View(func(txn *badger.Txn) error {
prefix := c.getTrustedPubkeyPrefix()
it := txn.NewIterator(badger.IteratorOptions{Prefix: prefix})
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
val, err := item.ValueCopy(nil)
if err != nil {
continue
}
var t TrustedPubkey
if err := json.Unmarshal(val, &t); err != nil {
continue
}
trusted = append(trusted, t)
}
return nil
})
return trusted, err
}
// IsPubkeyTrusted checks if a pubkey is trusted
func (c *CuratingACL) IsPubkeyTrusted(pubkey string) (bool, error) {
var trusted bool
err := c.View(func(txn *badger.Txn) error {
key := c.getTrustedPubkeyKey(pubkey)
_, err := txn.Get(key)
if err == badger.ErrKeyNotFound {
trusted = false
return nil
}
if err != nil {
return err
}
trusted = true
return nil
})
return trusted, err
}
// ==================== Blacklisted Pubkeys ====================
// SaveBlacklistedPubkey saves a blacklisted pubkey to the database
func (c *CuratingACL) SaveBlacklistedPubkey(pubkey string, reason string) error {
return c.Update(func(txn *badger.Txn) error {
key := c.getBlacklistedPubkeyKey(pubkey)
blacklisted := BlacklistedPubkey{
Pubkey: pubkey,
Reason: reason,
Added: time.Now(),
}
data, err := json.Marshal(blacklisted)
if err != nil {
return err
}
return txn.Set(key, data)
})
}
// RemoveBlacklistedPubkey removes a blacklisted pubkey from the database
func (c *CuratingACL) RemoveBlacklistedPubkey(pubkey string) error {
return c.Update(func(txn *badger.Txn) error {
key := c.getBlacklistedPubkeyKey(pubkey)
return txn.Delete(key)
})
}
// ListBlacklistedPubkeys returns all blacklisted pubkeys
func (c *CuratingACL) ListBlacklistedPubkeys() ([]BlacklistedPubkey, error) {
var blacklisted []BlacklistedPubkey
err := c.View(func(txn *badger.Txn) error {
prefix := c.getBlacklistedPubkeyPrefix()
it := txn.NewIterator(badger.IteratorOptions{Prefix: prefix})
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
val, err := item.ValueCopy(nil)
if err != nil {
continue
}
var b BlacklistedPubkey
if err := json.Unmarshal(val, &b); err != nil {
continue
}
blacklisted = append(blacklisted, b)
}
return nil
})
return blacklisted, err
}
// IsPubkeyBlacklisted checks if a pubkey is blacklisted
func (c *CuratingACL) IsPubkeyBlacklisted(pubkey string) (bool, error) {
var blacklisted bool
err := c.View(func(txn *badger.Txn) error {
key := c.getBlacklistedPubkeyKey(pubkey)
_, err := txn.Get(key)
if err == badger.ErrKeyNotFound {
blacklisted = false
return nil
}
if err != nil {
return err
}
blacklisted = true
return nil
})
return blacklisted, err
}
// ==================== Event Counting ====================
// GetEventCount returns the event count for a pubkey on a specific date
func (c *CuratingACL) GetEventCount(pubkey, date string) (int, error) {
var count int
err := c.View(func(txn *badger.Txn) error {
key := c.getEventCountKey(pubkey, date)
item, err := txn.Get(key)
if err == badger.ErrKeyNotFound {
count = 0
return nil
}
if err != nil {
return err
}
val, err := item.ValueCopy(nil)
if err != nil {
return err
}
var ec PubkeyEventCount
if err := json.Unmarshal(val, &ec); err != nil {
return err
}
count = ec.Count
return nil
})
return count, err
}
// IncrementEventCount increments and returns the new event count for a pubkey
func (c *CuratingACL) IncrementEventCount(pubkey, date string) (int, error) {
var newCount int
err := c.Update(func(txn *badger.Txn) error {
key := c.getEventCountKey(pubkey, date)
var ec PubkeyEventCount
item, err := txn.Get(key)
if err == badger.ErrKeyNotFound {
ec = PubkeyEventCount{
Pubkey: pubkey,
Date: date,
Count: 0,
LastEvent: time.Now(),
}
} else if err != nil {
return err
} else {
val, err := item.ValueCopy(nil)
if err != nil {
return err
}
if err := json.Unmarshal(val, &ec); err != nil {
return err
}
}
ec.Count++
ec.LastEvent = time.Now()
newCount = ec.Count
data, err := json.Marshal(ec)
if err != nil {
return err
}
return txn.Set(key, data)
})
return newCount, err
}
// CleanupOldEventCounts removes event counts older than the specified date
func (c *CuratingACL) CleanupOldEventCounts(beforeDate string) error {
return c.Update(func(txn *badger.Txn) error {
prefix := c.getEventCountPrefix()
it := txn.NewIterator(badger.IteratorOptions{Prefix: prefix})
defer it.Close()
var keysToDelete [][]byte
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
val, err := item.ValueCopy(nil)
if err != nil {
continue
}
var ec PubkeyEventCount
if err := json.Unmarshal(val, &ec); err != nil {
continue
}
if ec.Date < beforeDate {
keysToDelete = append(keysToDelete, item.KeyCopy(nil))
}
}
for _, key := range keysToDelete {
if err := txn.Delete(key); err != nil {
return err
}
}
return nil
})
}
// ==================== IP Event Counting ====================
// IPEventCount tracks events from an IP address per day (flood protection)
type IPEventCount struct {
IP string `json:"ip"`
Date string `json:"date"`
Count int `json:"count"`
LastEvent time.Time `json:"last_event"`
}
// GetIPEventCount returns the total event count for an IP on a specific date
func (c *CuratingACL) GetIPEventCount(ip, date string) (int, error) {
var count int
err := c.View(func(txn *badger.Txn) error {
key := c.getIPEventCountKey(ip, date)
item, err := txn.Get(key)
if err == badger.ErrKeyNotFound {
count = 0
return nil
}
if err != nil {
return err
}
val, err := item.ValueCopy(nil)
if err != nil {
return err
}
var ec IPEventCount
if err := json.Unmarshal(val, &ec); err != nil {
return err
}
count = ec.Count
return nil
})
return count, err
}
// IncrementIPEventCount increments and returns the new event count for an IP
func (c *CuratingACL) IncrementIPEventCount(ip, date string) (int, error) {
var newCount int
err := c.Update(func(txn *badger.Txn) error {
key := c.getIPEventCountKey(ip, date)
var ec IPEventCount
item, err := txn.Get(key)
if err == badger.ErrKeyNotFound {
ec = IPEventCount{
IP: ip,
Date: date,
Count: 0,
LastEvent: time.Now(),
}
} else if err != nil {
return err
} else {
val, err := item.ValueCopy(nil)
if err != nil {
return err
}
if err := json.Unmarshal(val, &ec); err != nil {
return err
}
}
ec.Count++
ec.LastEvent = time.Now()
newCount = ec.Count
data, err := json.Marshal(ec)
if err != nil {
return err
}
return txn.Set(key, data)
})
return newCount, err
}
// CleanupOldIPEventCounts removes IP event counts older than the specified date
func (c *CuratingACL) CleanupOldIPEventCounts(beforeDate string) error {
return c.Update(func(txn *badger.Txn) error {
prefix := c.getIPEventCountPrefix()
it := txn.NewIterator(badger.IteratorOptions{Prefix: prefix})
defer it.Close()
var keysToDelete [][]byte
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
val, err := item.ValueCopy(nil)
if err != nil {
continue
}
var ec IPEventCount
if err := json.Unmarshal(val, &ec); err != nil {
continue
}
if ec.Date < beforeDate {
keysToDelete = append(keysToDelete, item.KeyCopy(nil))
}
}
for _, key := range keysToDelete {
if err := txn.Delete(key); err != nil {
return err
}
}
return nil
})
}
func (c *CuratingACL) getIPEventCountKey(ip, date string) []byte {
buf := new(bytes.Buffer)
buf.WriteString("CURATING_ACL_IP_EVENT_COUNT_")
buf.WriteString(ip)
buf.WriteString("_")
buf.WriteString(date)
return buf.Bytes()
}
func (c *CuratingACL) getIPEventCountPrefix() []byte {
return []byte("CURATING_ACL_IP_EVENT_COUNT_")
}
// ==================== IP Offense Tracking ====================
// GetIPOffense returns the offense record for an IP
func (c *CuratingACL) GetIPOffense(ip string) (*IPOffense, error) {
var offense *IPOffense
err := c.View(func(txn *badger.Txn) error {
key := c.getIPOffenseKey(ip)
item, err := txn.Get(key)
if err == badger.ErrKeyNotFound {
return nil
}
if err != nil {
return err
}
val, err := item.ValueCopy(nil)
if err != nil {
return err
}
offense = new(IPOffense)
return json.Unmarshal(val, offense)
})
return offense, err
}
// RecordIPOffense records a rate limit violation from an IP for a pubkey
// Returns the new offense count
func (c *CuratingACL) RecordIPOffense(ip, pubkey string) (int, error) {
var newCount int
err := c.Update(func(txn *badger.Txn) error {
key := c.getIPOffenseKey(ip)
var offense IPOffense
item, err := txn.Get(key)
if err == badger.ErrKeyNotFound {
offense = IPOffense{
IP: ip,
OffenseCount: 0,
PubkeysHit: []string{},
LastOffense: time.Now(),
}
} else if err != nil {
return err
} else {
val, err := item.ValueCopy(nil)
if err != nil {
return err
}
if err := json.Unmarshal(val, &offense); err != nil {
return err
}
}
// Add pubkey if not already in list
found := false
for _, p := range offense.PubkeysHit {
if p == pubkey {
found = true
break
}
}
if !found {
offense.PubkeysHit = append(offense.PubkeysHit, pubkey)
offense.OffenseCount++
}
offense.LastOffense = time.Now()
newCount = offense.OffenseCount
data, err := json.Marshal(offense)
if err != nil {
return err
}
return txn.Set(key, data)
})
return newCount, err
}
// ==================== IP Blocking ====================
// BlockIP blocks an IP for a specified duration
func (c *CuratingACL) BlockIP(ip string, duration time.Duration, reason string) error {
return c.Update(func(txn *badger.Txn) error {
key := c.getBlockedIPKey(ip)
blocked := CuratingBlockedIP{
IP: ip,
Reason: reason,
ExpiresAt: time.Now().Add(duration),
Added: time.Now(),
}
data, err := json.Marshal(blocked)
if err != nil {
return err
}
return txn.Set(key, data)
})
}
// UnblockIP removes an IP from the blocked list
func (c *CuratingACL) UnblockIP(ip string) error {
return c.Update(func(txn *badger.Txn) error {
key := c.getBlockedIPKey(ip)
return txn.Delete(key)
})
}
// IsIPBlocked checks if an IP is blocked and returns expiration time
func (c *CuratingACL) IsIPBlocked(ip string) (bool, time.Time, error) {
var blocked bool
var expiresAt time.Time
err := c.View(func(txn *badger.Txn) error {
key := c.getBlockedIPKey(ip)
item, err := txn.Get(key)
if err == badger.ErrKeyNotFound {
blocked = false
return nil
}
if err != nil {
return err
}
val, err := item.ValueCopy(nil)
if err != nil {
return err
}
var b CuratingBlockedIP
if err := json.Unmarshal(val, &b); err != nil {
return err
}
if time.Now().After(b.ExpiresAt) {
// Block has expired
blocked = false
return nil
}
blocked = true
expiresAt = b.ExpiresAt
return nil
})
return blocked, expiresAt, err
}
// ListBlockedIPs returns all blocked IPs (including expired ones)
func (c *CuratingACL) ListBlockedIPs() ([]CuratingBlockedIP, error) {
var blocked []CuratingBlockedIP
err := c.View(func(txn *badger.Txn) error {
prefix := c.getBlockedIPPrefix()
it := txn.NewIterator(badger.IteratorOptions{Prefix: prefix})
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
val, err := item.ValueCopy(nil)
if err != nil {
continue
}
var b CuratingBlockedIP
if err := json.Unmarshal(val, &b); err != nil {
continue
}
blocked = append(blocked, b)
}
return nil
})
return blocked, err
}
// CleanupExpiredIPBlocks removes expired IP blocks
func (c *CuratingACL) CleanupExpiredIPBlocks() error {
return c.Update(func(txn *badger.Txn) error {
prefix := c.getBlockedIPPrefix()
it := txn.NewIterator(badger.IteratorOptions{Prefix: prefix})
defer it.Close()
now := time.Now()
var keysToDelete [][]byte
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
val, err := item.ValueCopy(nil)
if err != nil {
continue
}
var b CuratingBlockedIP
if err := json.Unmarshal(val, &b); err != nil {
continue
}
if now.After(b.ExpiresAt) {
keysToDelete = append(keysToDelete, item.KeyCopy(nil))
}
}
for _, key := range keysToDelete {
if err := txn.Delete(key); err != nil {
return err
}
}
return nil
})
}
// ==================== Spam Events ====================
// MarkEventAsSpam marks an event as spam
func (c *CuratingACL) MarkEventAsSpam(eventID, pubkey, reason string) error {
return c.Update(func(txn *badger.Txn) error {
key := c.getSpamEventKey(eventID)
spam := SpamEvent{
EventID: eventID,
Pubkey: pubkey,
Reason: reason,
Added: time.Now(),
}
data, err := json.Marshal(spam)
if err != nil {
return err
}
return txn.Set(key, data)
})
}
// UnmarkEventAsSpam removes the spam flag from an event
func (c *CuratingACL) UnmarkEventAsSpam(eventID string) error {
return c.Update(func(txn *badger.Txn) error {
key := c.getSpamEventKey(eventID)
return txn.Delete(key)
})
}
// IsEventSpam checks if an event is marked as spam
func (c *CuratingACL) IsEventSpam(eventID string) (bool, error) {
var spam bool
err := c.View(func(txn *badger.Txn) error {
key := c.getSpamEventKey(eventID)
_, err := txn.Get(key)
if err == badger.ErrKeyNotFound {
spam = false
return nil
}
if err != nil {
return err
}
spam = true
return nil
})
return spam, err
}
// ListSpamEvents returns all spam events
func (c *CuratingACL) ListSpamEvents() ([]SpamEvent, error) {
var spam []SpamEvent
err := c.View(func(txn *badger.Txn) error {
prefix := c.getSpamEventPrefix()
it := txn.NewIterator(badger.IteratorOptions{Prefix: prefix})
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
val, err := item.ValueCopy(nil)
if err != nil {
continue
}
var s SpamEvent
if err := json.Unmarshal(val, &s); err != nil {
continue
}
spam = append(spam, s)
}
return nil
})
return spam, err
}
// ==================== Unclassified Users ====================
// ListUnclassifiedUsers returns users who are neither trusted nor blacklisted
// sorted by event count descending
func (c *CuratingACL) ListUnclassifiedUsers(limit int) ([]UnclassifiedUser, error) {
// First, get all trusted and blacklisted pubkeys to exclude
trusted, err := c.ListTrustedPubkeys()
if err != nil {
return nil, err
}
blacklisted, err := c.ListBlacklistedPubkeys()
if err != nil {
return nil, err
}
excludeSet := make(map[string]struct{})
for _, t := range trusted {
excludeSet[t.Pubkey] = struct{}{}
}
for _, b := range blacklisted {
excludeSet[b.Pubkey] = struct{}{}
}
// Now iterate through event counts and aggregate by pubkey
pubkeyCounts := make(map[string]*UnclassifiedUser)
err = c.View(func(txn *badger.Txn) error {
prefix := c.getEventCountPrefix()
it := txn.NewIterator(badger.IteratorOptions{Prefix: prefix})
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
val, err := item.ValueCopy(nil)
if err != nil {
continue
}
var ec PubkeyEventCount
if err := json.Unmarshal(val, &ec); err != nil {
continue
}
// Skip if trusted or blacklisted
if _, excluded := excludeSet[ec.Pubkey]; excluded {
continue
}
if existing, ok := pubkeyCounts[ec.Pubkey]; ok {
existing.EventCount += ec.Count
if ec.LastEvent.After(existing.LastEvent) {
existing.LastEvent = ec.LastEvent
}
} else {
pubkeyCounts[ec.Pubkey] = &UnclassifiedUser{
Pubkey: ec.Pubkey,
EventCount: ec.Count,
LastEvent: ec.LastEvent,
}
}
}
return nil
})
if err != nil {
return nil, err
}
// Convert to slice and sort by event count descending
var users []UnclassifiedUser
for _, u := range pubkeyCounts {
users = append(users, *u)
}
sort.Slice(users, func(i, j int) bool {
return users[i].EventCount > users[j].EventCount
})
// Apply limit
if limit > 0 && len(users) > limit {
users = users[:limit]
}
return users, nil
}
// ==================== Key Generation ====================
func (c *CuratingACL) getConfigKey() []byte {
return []byte("CURATING_ACL_CONFIG")
}
func (c *CuratingACL) getTrustedPubkeyKey(pubkey string) []byte {
buf := new(bytes.Buffer)
buf.WriteString("CURATING_ACL_TRUSTED_PUBKEY_")
buf.WriteString(pubkey)
return buf.Bytes()
}
func (c *CuratingACL) getTrustedPubkeyPrefix() []byte {
return []byte("CURATING_ACL_TRUSTED_PUBKEY_")
}
func (c *CuratingACL) getBlacklistedPubkeyKey(pubkey string) []byte {
buf := new(bytes.Buffer)
buf.WriteString("CURATING_ACL_BLACKLISTED_PUBKEY_")
buf.WriteString(pubkey)
return buf.Bytes()
}
func (c *CuratingACL) getBlacklistedPubkeyPrefix() []byte {
return []byte("CURATING_ACL_BLACKLISTED_PUBKEY_")
}
func (c *CuratingACL) getEventCountKey(pubkey, date string) []byte {
buf := new(bytes.Buffer)
buf.WriteString("CURATING_ACL_EVENT_COUNT_")
buf.WriteString(pubkey)
buf.WriteString("_")
buf.WriteString(date)
return buf.Bytes()
}
func (c *CuratingACL) getEventCountPrefix() []byte {
return []byte("CURATING_ACL_EVENT_COUNT_")
}
func (c *CuratingACL) getIPOffenseKey(ip string) []byte {
buf := new(bytes.Buffer)
buf.WriteString("CURATING_ACL_IP_OFFENSE_")
buf.WriteString(ip)
return buf.Bytes()
}
func (c *CuratingACL) getBlockedIPKey(ip string) []byte {
buf := new(bytes.Buffer)
buf.WriteString("CURATING_ACL_BLOCKED_IP_")
buf.WriteString(ip)
return buf.Bytes()
}
func (c *CuratingACL) getBlockedIPPrefix() []byte {
return []byte("CURATING_ACL_BLOCKED_IP_")
}
func (c *CuratingACL) getSpamEventKey(eventID string) []byte {
buf := new(bytes.Buffer)
buf.WriteString("CURATING_ACL_SPAM_EVENT_")
buf.WriteString(eventID)
return buf.Bytes()
}
func (c *CuratingACL) getSpamEventPrefix() []byte {
return []byte("CURATING_ACL_SPAM_EVENT_")
}
// ==================== Kind Checking Helpers ====================
// IsKindAllowed checks if an event kind is allowed based on config
func (c *CuratingACL) IsKindAllowed(kind int, config *CuratingConfig) bool {
if config == nil {
return false
}
// Check explicit kinds
for _, k := range config.AllowedKinds {
if k == kind {
return true
}
}
// Check ranges
for _, rangeStr := range config.AllowedRanges {
if kindInRange(kind, rangeStr) {
return true
}
}
// Check categories
for _, cat := range config.KindCategories {
if kindInCategory(kind, cat) {
return true
}
}
return false
}
// kindInRange checks if a kind is within a range string like "1000-1999"
func kindInRange(kind int, rangeStr string) bool {
var start, end int
n, err := fmt.Sscanf(rangeStr, "%d-%d", &start, &end)
if err != nil || n != 2 {
return false
}
return kind >= start && kind <= end
}
// kindInCategory checks if a kind belongs to a predefined category
func kindInCategory(kind int, category string) bool {
categories := map[string][]int{
"social": {0, 1, 3, 6, 7, 10002},
"dm": {4, 14, 1059},
"longform": {30023, 30024},
"media": {1063, 20, 21, 22},
"marketplace": {30017, 30018, 30019, 30020, 1021, 1022},
"groups_nip29": {9, 10, 11, 12, 9000, 9001, 9002, 39000, 39001, 39002},
"groups_nip72": {34550, 1111, 4550},
"lists": {10000, 10001, 10003, 30000, 30001, 30003},
}
kinds, ok := categories[category]
if !ok {
return false
}
for _, k := range kinds {
if k == kind {
return true
}
}
return false
}

View File

@@ -38,5 +38,27 @@ func (a *GraphAdapter) TraverseThread(seedEventID []byte, maxDepth int, directio
return a.db.TraverseThread(seedEventID, maxDepth, direction)
}
// CollectInboundRefs implements graph.GraphDatabase.
// It collects events that reference items in the result.
func (a *GraphAdapter) CollectInboundRefs(result graph.GraphResultI, depth int, kinds []uint16) error {
// Type assert to get the concrete GraphResult
graphResult, ok := result.(*GraphResult)
if !ok {
return nil // Can't collect refs if we don't have a GraphResult
}
return a.db.AddInboundRefsToResult(graphResult, depth, kinds)
}
// CollectOutboundRefs implements graph.GraphDatabase.
// It collects events referenced by items in the result.
func (a *GraphAdapter) CollectOutboundRefs(result graph.GraphResultI, depth int, kinds []uint16) error {
// Type assert to get the concrete GraphResult
graphResult, ok := result.(*GraphResult)
if !ok {
return nil
}
return a.db.AddOutboundRefsToResult(graphResult, depth, kinds)
}
// Verify GraphAdapter implements graph.GraphDatabase
var _ graph.GraphDatabase = (*GraphAdapter)(nil)

View File

@@ -325,3 +325,13 @@ func (r *GraphResult) GetTotalPubkeys() int {
func (r *GraphResult) GetTotalEvents() int {
return r.TotalEvents
}
// GetInboundRefs returns the InboundRefs map for external access.
func (r *GraphResult) GetInboundRefs() map[uint16]map[string][]string {
return r.InboundRefs
}
// GetOutboundRefs returns the OutboundRefs map for external access.
func (r *GraphResult) GetOutboundRefs() map[uint16]map[string][]string {
return r.OutboundRefs
}

View File

@@ -13,12 +13,13 @@ import (
"next.orly.dev/pkg/database/indexes"
"next.orly.dev/pkg/database/indexes/types"
"git.mleku.dev/mleku/nostr/encoders/event"
"git.mleku.dev/mleku/nostr/encoders/hex"
"git.mleku.dev/mleku/nostr/encoders/ints"
"git.mleku.dev/mleku/nostr/encoders/kind"
)
const (
currentVersion uint32 = 7
currentVersion uint32 = 8
)
func (d *D) RunMigrations() {
@@ -115,6 +116,14 @@ func (d *D) RunMigrations() {
// bump to version 7
_ = d.writeVersionTag(7)
}
if dbVersion < 8 {
log.I.F("migrating to version 8...")
// Backfill e-tag graph indexes (eeg/gee) for graph query support
// This creates edges for all existing events with e-tags
d.BackfillETagGraph()
// bump to version 8
_ = d.writeVersionTag(8)
}
}
// writeVersionTag writes a new version tag key to the database (no value)
@@ -1079,3 +1088,183 @@ func (d *D) RebuildWordIndexesWithNormalization() {
log.I.F("word index rebuild with unicode normalization complete")
}
// BackfillETagGraph populates e-tag graph indexes (eeg/gee) for all existing events.
// This enables graph traversal queries for thread/reply discovery.
//
// The migration:
// 1. Iterates all events in compact storage (cmp prefix)
// 2. Extracts e-tags from each event
// 3. For e-tags referencing events we have, creates bidirectional edges:
// - eeg|source|target|kind|direction(out) - forward edge
// - gee|target|kind|direction(in)|source - reverse edge
//
// This is idempotent: running multiple times won't create duplicate edges
// (BadgerDB overwrites existing keys).
func (d *D) BackfillETagGraph() {
log.I.F("backfilling e-tag graph indexes for graph query support...")
var err error
type ETagEdge struct {
SourceSerial *types.Uint40
TargetSerial *types.Uint40
Kind *types.Uint16
}
var edges []ETagEdge
var processedEvents int
var eventsWithETags int
var skippedTargets int
// First pass: collect all e-tag edges from events
if err = d.View(func(txn *badger.Txn) error {
// Iterate compact events (cmp prefix)
cmpPrf := new(bytes.Buffer)
if err = indexes.CompactEventEnc(nil).MarshalWrite(cmpPrf); chk.E(err) {
return err
}
it := txn.NewIterator(badger.IteratorOptions{Prefix: cmpPrf.Bytes()})
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
key := item.KeyCopy(nil)
// Extract serial from key (prefix 3 bytes + serial 5 bytes)
if len(key) < 8 {
continue
}
sourceSerial := new(types.Uint40)
if err = sourceSerial.UnmarshalRead(bytes.NewReader(key[3:8])); chk.E(err) {
continue
}
// Get event data
var val []byte
if val, err = item.ValueCopy(nil); chk.E(err) {
continue
}
// Decode the event
// First get the event ID from serial (needed for compact format decoding)
eventId, idErr := d.GetEventIdBySerial(sourceSerial)
if idErr != nil {
continue
}
resolver := NewDatabaseSerialResolver(d, d.serialCache)
ev, decErr := UnmarshalCompactEvent(val, eventId, resolver)
if decErr != nil || ev == nil {
continue
}
processedEvents++
// Extract e-tags
eTags := ev.Tags.GetAll([]byte("e"))
if len(eTags) == 0 {
continue
}
eventsWithETags++
eventKind := new(types.Uint16)
eventKind.Set(ev.Kind)
for _, eTag := range eTags {
if eTag.Len() < 2 {
continue
}
// Get event ID from e-tag
var targetEventID []byte
targetEventID, err = hex.Dec(string(eTag.ValueHex()))
if err != nil || len(targetEventID) != 32 {
continue
}
// Look up target event's serial
targetSerial, lookupErr := d.GetSerialById(targetEventID)
if lookupErr != nil || targetSerial == nil {
// Target event not in our database - skip
skippedTargets++
continue
}
edges = append(edges, ETagEdge{
SourceSerial: sourceSerial,
TargetSerial: targetSerial,
Kind: eventKind,
})
}
}
return nil
}); chk.E(err) {
log.E.F("e-tag graph backfill: failed to collect edges: %v", err)
return
}
log.I.F("e-tag graph backfill: processed %d events, %d with e-tags, found %d edges to create (%d targets not found)",
processedEvents, eventsWithETags, len(edges), skippedTargets)
if len(edges) == 0 {
log.I.F("e-tag graph backfill: no edges to create")
return
}
// Sort edges for ordered writes (improves compaction)
sort.Slice(edges, func(i, j int) bool {
if edges[i].SourceSerial.Get() != edges[j].SourceSerial.Get() {
return edges[i].SourceSerial.Get() < edges[j].SourceSerial.Get()
}
return edges[i].TargetSerial.Get() < edges[j].TargetSerial.Get()
})
// Second pass: write edges in batches
const batchSize = 1000
var createdEdges int
for i := 0; i < len(edges); i += batchSize {
end := i + batchSize
if end > len(edges) {
end = len(edges)
}
batch := edges[i:end]
if err = d.Update(func(txn *badger.Txn) error {
for _, edge := range batch {
// Create forward edge: eeg|source|target|kind|direction(out)
directionOut := new(types.Letter)
directionOut.Set(types.EdgeDirectionETagOut)
keyBuf := new(bytes.Buffer)
if err = indexes.EventEventGraphEnc(edge.SourceSerial, edge.TargetSerial, edge.Kind, directionOut).MarshalWrite(keyBuf); chk.E(err) {
continue
}
if err = txn.Set(keyBuf.Bytes(), nil); chk.E(err) {
continue
}
// Create reverse edge: gee|target|kind|direction(in)|source
directionIn := new(types.Letter)
directionIn.Set(types.EdgeDirectionETagIn)
keyBuf.Reset()
if err = indexes.GraphEventEventEnc(edge.TargetSerial, edge.Kind, directionIn, edge.SourceSerial).MarshalWrite(keyBuf); chk.E(err) {
continue
}
if err = txn.Set(keyBuf.Bytes(), nil); chk.E(err) {
continue
}
createdEdges++
}
return nil
}); chk.E(err) {
log.W.F("e-tag graph backfill: batch write failed: %v", err)
continue
}
if (i/batchSize)%10 == 0 && i > 0 {
log.I.F("e-tag graph backfill progress: %d/%d edges created", i, len(edges))
}
}
log.I.F("e-tag graph backfill complete: created %d bidirectional edges", createdEdges)
}

View File

@@ -36,5 +36,27 @@ func (a *GraphAdapter) TraverseThread(seedEventID []byte, maxDepth int, directio
return a.db.TraverseThread(seedEventID, maxDepth, direction)
}
// CollectInboundRefs implements graph.GraphDatabase.
// It collects events that reference items in the result.
func (a *GraphAdapter) CollectInboundRefs(result graph.GraphResultI, depth int, kinds []uint16) error {
// Type assert to get the concrete GraphResult
graphResult, ok := result.(*GraphResult)
if !ok {
return nil // Can't collect refs if we don't have a GraphResult
}
return a.db.AddInboundRefsToResult(graphResult, depth, kinds)
}
// CollectOutboundRefs implements graph.GraphDatabase.
// It collects events referenced by items in the result.
func (a *GraphAdapter) CollectOutboundRefs(result graph.GraphResultI, depth int, kinds []uint16) error {
// Type assert to get the concrete GraphResult
graphResult, ok := result.(*GraphResult)
if !ok {
return nil
}
return a.db.AddOutboundRefsToResult(graphResult, depth, kinds)
}
// Verify GraphAdapter implements graph.GraphDatabase
var _ graph.GraphDatabase = (*GraphAdapter)(nil)

163
pkg/neo4j/graph-refs.go Normal file
View File

@@ -0,0 +1,163 @@
package neo4j
import (
"context"
"fmt"
"strings"
)
// AddInboundRefsToResult collects inbound references (events that reference discovered items)
// for events at a specific depth in the result.
//
// For example, if you have a follows graph result and want to find all kind-7 reactions
// to posts by users at depth 1, this collects those reactions and adds them to result.InboundRefs.
//
// Parameters:
// - result: The graph result to augment with ref data
// - depth: The depth at which to collect refs (0 = all depths)
// - kinds: Event kinds to collect (e.g., [7] for reactions, [6] for reposts)
func (n *N) AddInboundRefsToResult(result *GraphResult, depth int, kinds []uint16) error {
ctx := context.Background()
// Get pubkeys to find refs for
var pubkeys []string
if depth == 0 {
pubkeys = result.GetAllPubkeys()
} else {
pubkeys = result.GetPubkeysAtDepth(depth)
}
if len(pubkeys) == 0 {
n.Logger.Debugf("AddInboundRefsToResult: no pubkeys at depth %d", depth)
return nil
}
// Convert kinds to int64 for Neo4j
kindsInt := make([]int64, len(kinds))
for i, k := range kinds {
kindsInt[i] = int64(k)
}
// Query for events by these pubkeys and their inbound references
// This finds: (ref:Event)-[:REFERENCES]->(authored:Event)<-[:AUTHORED_BY]-(u:NostrUser)
// where the referencing event has the specified kinds
cypher := `
UNWIND $pubkeys AS pk
MATCH (u:NostrUser {pubkey: pk})<-[:AUTHORED_BY]-(authored:Event)
WHERE authored.kind IN [1, 30023]
MATCH (ref:Event)-[:REFERENCES]->(authored)
WHERE ref.kind IN $kinds
RETURN authored.id AS target_id, ref.id AS ref_id, ref.kind AS ref_kind
`
params := map[string]any{
"pubkeys": pubkeys,
"kinds": kindsInt,
}
queryResult, err := n.ExecuteRead(ctx, cypher, params)
if err != nil {
return fmt.Errorf("failed to query inbound refs: %w", err)
}
refCount := 0
for queryResult.Next(ctx) {
record := queryResult.Record()
targetID, ok := record.Values[0].(string)
if !ok || targetID == "" {
continue
}
refID, ok := record.Values[1].(string)
if !ok || refID == "" {
continue
}
refKind, ok := record.Values[2].(int64)
if !ok {
continue
}
result.AddInboundRef(uint16(refKind), strings.ToLower(targetID), strings.ToLower(refID))
refCount++
}
n.Logger.Debugf("AddInboundRefsToResult: collected %d refs for %d pubkeys", refCount, len(pubkeys))
return nil
}
// AddOutboundRefsToResult collects outbound references (events referenced by discovered items).
//
// For example, find all events that posts by users at depth 1 reference (quoted posts, replied-to posts).
func (n *N) AddOutboundRefsToResult(result *GraphResult, depth int, kinds []uint16) error {
ctx := context.Background()
// Get pubkeys to find refs for
var pubkeys []string
if depth == 0 {
pubkeys = result.GetAllPubkeys()
} else {
pubkeys = result.GetPubkeysAtDepth(depth)
}
if len(pubkeys) == 0 {
n.Logger.Debugf("AddOutboundRefsToResult: no pubkeys at depth %d", depth)
return nil
}
// Convert kinds to int64 for Neo4j
kindsInt := make([]int64, len(kinds))
for i, k := range kinds {
kindsInt[i] = int64(k)
}
// Query for events by these pubkeys and their outbound references
// This finds: (authored:Event)-[:REFERENCES]->(ref:Event)
// where the authored event has the specified kinds
cypher := `
UNWIND $pubkeys AS pk
MATCH (u:NostrUser {pubkey: pk})<-[:AUTHORED_BY]-(authored:Event)
WHERE authored.kind IN $kinds
MATCH (authored)-[:REFERENCES]->(ref:Event)
RETURN authored.id AS source_id, ref.id AS ref_id, authored.kind AS source_kind
`
params := map[string]any{
"pubkeys": pubkeys,
"kinds": kindsInt,
}
queryResult, err := n.ExecuteRead(ctx, cypher, params)
if err != nil {
return fmt.Errorf("failed to query outbound refs: %w", err)
}
refCount := 0
for queryResult.Next(ctx) {
record := queryResult.Record()
sourceID, ok := record.Values[0].(string)
if !ok || sourceID == "" {
continue
}
refID, ok := record.Values[1].(string)
if !ok || refID == "" {
continue
}
sourceKind, ok := record.Values[2].(int64)
if !ok {
continue
}
result.AddOutboundRef(uint16(sourceKind), strings.ToLower(sourceID), strings.ToLower(refID))
refCount++
}
n.Logger.Debugf("AddOutboundRefsToResult: collected %d refs from %d pubkeys", refCount, len(pubkeys))
return nil
}

View File

@@ -33,6 +33,14 @@ type GraphResult struct {
// TotalEvents is the count of unique events discovered across all depths.
TotalEvents int
// InboundRefs tracks inbound references (events that reference discovered items).
// Structure: kind -> target_id -> []referencing_event_ids
InboundRefs map[uint16]map[string][]string
// OutboundRefs tracks outbound references (events referenced by discovered items).
// Structure: kind -> source_id -> []referenced_event_ids
OutboundRefs map[uint16]map[string][]string
}
// NewGraphResult creates a new initialized GraphResult.
@@ -42,6 +50,8 @@ func NewGraphResult() *GraphResult {
EventsByDepth: make(map[int][]string),
FirstSeenPubkey: make(map[string]int),
FirstSeenEvent: make(map[string]int),
InboundRefs: make(map[uint16]map[string][]string),
OutboundRefs: make(map[uint16]map[string][]string),
}
}
@@ -195,3 +205,45 @@ func (r *GraphResult) GetEventDepthsSorted() []int {
sort.Ints(depths)
return depths
}
// GetInboundRefs returns the InboundRefs map for external access.
func (r *GraphResult) GetInboundRefs() map[uint16]map[string][]string {
return r.InboundRefs
}
// GetOutboundRefs returns the OutboundRefs map for external access.
func (r *GraphResult) GetOutboundRefs() map[uint16]map[string][]string {
return r.OutboundRefs
}
// AddInboundRef records an inbound reference from a referencing event to a target.
func (r *GraphResult) AddInboundRef(kind uint16, targetIDHex string, referencingEventIDHex string) {
if r.InboundRefs[kind] == nil {
r.InboundRefs[kind] = make(map[string][]string)
}
r.InboundRefs[kind][targetIDHex] = append(r.InboundRefs[kind][targetIDHex], referencingEventIDHex)
}
// AddOutboundRef records an outbound reference from a source event to a referenced event.
func (r *GraphResult) AddOutboundRef(kind uint16, sourceIDHex string, referencedEventIDHex string) {
if r.OutboundRefs[kind] == nil {
r.OutboundRefs[kind] = make(map[string][]string)
}
r.OutboundRefs[kind][sourceIDHex] = append(r.OutboundRefs[kind][sourceIDHex], referencedEventIDHex)
}
// GetPubkeysAtDepth returns pubkeys at a specific depth, or empty slice if none.
func (r *GraphResult) GetPubkeysAtDepth(depth int) []string {
if pubkeys, exists := r.PubkeysByDepth[depth]; exists {
return pubkeys
}
return []string{}
}
// GetEventsAtDepth returns events at a specific depth, or empty slice if none.
func (r *GraphResult) GetEventsAtDepth(depth int) []string {
if events, exists := r.EventsByDepth[depth]; exists {
return events
}
return []string{}
}

View File

@@ -6,6 +6,7 @@ package graph
import (
"encoding/json"
"sort"
"strconv"
"time"
@@ -37,6 +38,9 @@ type GraphResultI interface {
GetEventsByDepth() map[int][]string
GetTotalPubkeys() int
GetTotalEvents() int
// Ref aggregation methods
GetInboundRefs() map[uint16]map[string][]string
GetOutboundRefs() map[uint16]map[string][]string
}
// GraphDatabase defines the interface for graph traversal operations.
@@ -50,6 +54,10 @@ type GraphDatabase interface {
FindMentions(pubkey []byte, kinds []uint16) (GraphResultI, error)
// TraverseThread performs BFS traversal of thread structure
TraverseThread(seedEventID []byte, maxDepth int, direction string) (GraphResultI, error)
// CollectInboundRefs finds events that reference items in the result
CollectInboundRefs(result GraphResultI, depth int, kinds []uint16) error
// CollectOutboundRefs finds events referenced by items in the result
CollectOutboundRefs(result GraphResultI, depth int, kinds []uint16) error
}
// Executor handles graph query execution and response generation.
@@ -138,6 +146,36 @@ func (e *Executor) Execute(q *Query) (*event.E, error) {
return nil, ErrInvalidMethod
}
// Collect inbound refs if specified
if q.HasInboundRefs() {
for _, refSpec := range q.InboundRefs {
kinds := make([]uint16, len(refSpec.Kinds))
for i, k := range refSpec.Kinds {
kinds[i] = uint16(k)
}
// Collect refs at the specified from_depth (0 = all depths)
if err = e.db.CollectInboundRefs(result, refSpec.FromDepth, kinds); err != nil {
log.W.F("graph executor: failed to collect inbound refs: %v", err)
// Continue without refs rather than failing the query
}
}
log.D.F("graph executor: collected inbound refs")
}
// Collect outbound refs if specified
if q.HasOutboundRefs() {
for _, refSpec := range q.OutboundRefs {
kinds := make([]uint16, len(refSpec.Kinds))
for i, k := range refSpec.Kinds {
kinds[i] = uint16(k)
}
if err = e.db.CollectOutboundRefs(result, refSpec.FromDepth, kinds); err != nil {
log.W.F("graph executor: failed to collect outbound refs: %v", err)
}
}
log.D.F("graph executor: collected outbound refs")
}
// Generate response event
return e.generateResponse(q, result, responseKind)
}
@@ -157,6 +195,14 @@ func (e *Executor) generateResponse(q *Query, result GraphResultI, responseKind
content.TotalEvents = result.GetTotalEvents()
}
// Add ref summaries if present
if inboundRefs := result.GetInboundRefs(); len(inboundRefs) > 0 {
content.InboundRefs = buildRefSummaries(inboundRefs)
}
if outboundRefs := result.GetOutboundRefs(); len(outboundRefs) > 0 {
content.OutboundRefs = buildRefSummaries(outboundRefs)
}
contentBytes, err := json.Marshal(content)
if err != nil {
return nil, err
@@ -199,4 +245,55 @@ type ResponseContent struct {
// TotalEvents is the total count of unique events discovered
TotalEvents int `json:"total_events,omitempty"`
// InboundRefs contains aggregated inbound references (events referencing discovered items)
// Structure: array of {kind, target, count, refs[]}
InboundRefs []RefSummary `json:"inbound_refs,omitempty"`
// OutboundRefs contains aggregated outbound references (events referenced by discovered items)
// Structure: array of {kind, source, count, refs[]}
OutboundRefs []RefSummary `json:"outbound_refs,omitempty"`
}
// RefSummary represents aggregated reference data for a single target/source.
type RefSummary struct {
// Kind is the kind of the referencing/referenced events
Kind uint16 `json:"kind"`
// Target is the event ID being referenced (for inbound) or referencing (for outbound)
Target string `json:"target"`
// Count is the number of references
Count int `json:"count"`
// Refs is the list of event IDs (optional, may be omitted for large sets)
Refs []string `json:"refs,omitempty"`
}
// buildRefSummaries converts the ref map structure to a sorted array of RefSummary.
// Results are sorted by count descending (most referenced first).
func buildRefSummaries(refs map[uint16]map[string][]string) []RefSummary {
var summaries []RefSummary
for kind, targets := range refs {
for targetID, refIDs := range targets {
summaries = append(summaries, RefSummary{
Kind: kind,
Target: targetID,
Count: len(refIDs),
Refs: refIDs,
})
}
}
// Sort by count descending
sort.Slice(summaries, func(i, j int) bool {
if summaries[i].Count != summaries[j].Count {
return summaries[i].Count > summaries[j].Count
}
// Secondary sort by kind for stability
return summaries[i].Kind < summaries[j].Kind
})
return summaries
}

View File

@@ -10,7 +10,7 @@ import (
)
// MinimumMemoryMB is the minimum memory required to run the relay with rate limiting.
const MinimumMemoryMB = 500
const MinimumMemoryMB = 128
// AutoDetectMemoryFraction is the fraction of available memory to use when auto-detecting.
const AutoDetectMemoryFraction = 0.66
@@ -20,7 +20,7 @@ const AutoDetectMemoryFraction = 0.66
const DefaultMaxMemoryMB = 1500
// ErrInsufficientMemory is returned when there isn't enough memory to run the relay.
var ErrInsufficientMemory = errors.New("insufficient memory: relay requires at least 500MB of available memory")
var ErrInsufficientMemory = errors.New("insufficient memory: relay requires at least 128MB of available memory")
// ProcessMemoryStats contains memory statistics for the current process.
// On Linux, these are read from /proc/self/status for accurate RSS values.

View File

@@ -1 +1 @@
v0.46.2
v0.47.0