Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bb858a0d6f | ||
|
|
b478845e1c | ||
|
|
e75e6de59b | ||
|
|
1297a45ee3 | ||
|
|
138d5cbff9 |
@@ -207,8 +207,11 @@ type C struct {
|
||||
ArchiveCacheTTLHrs int `env:"ORLY_ARCHIVE_CACHE_TTL_HRS" default:"24" usage:"hours to cache query fingerprints to avoid repeated archive requests"`
|
||||
|
||||
// Storage management configuration (access-based garbage collection)
|
||||
// TODO: GC implementation needs batch transaction handling to avoid Badger race conditions
|
||||
// TODO: GC should use smaller batches with delays between transactions on large datasets
|
||||
// TODO: GC deletion should be serialized or use transaction pools to prevent concurrent txn issues
|
||||
MaxStorageBytes int64 `env:"ORLY_MAX_STORAGE_BYTES" default:"0" usage:"maximum storage in bytes (0=auto-detect 80%% of filesystem)"`
|
||||
GCEnabled bool `env:"ORLY_GC_ENABLED" default:"true" usage:"enable continuous garbage collection based on access patterns"`
|
||||
GCEnabled bool `env:"ORLY_GC_ENABLED" default:"false" usage:"enable continuous garbage collection based on access patterns (EXPERIMENTAL - may cause crashes under load)"`
|
||||
GCIntervalSec int `env:"ORLY_GC_INTERVAL_SEC" default:"60" usage:"seconds between GC runs when storage exceeds limit"`
|
||||
GCBatchSize int `env:"ORLY_GC_BATCH_SIZE" default:"1000" usage:"number of events to consider per GC run"`
|
||||
|
||||
|
||||
2
app/web/dist/bundle.js
vendored
2
app/web/dist/bundle.js
vendored
File diff suppressed because one or more lines are too long
2
app/web/dist/bundle.js.map
vendored
2
app/web/dist/bundle.js.map
vendored
File diff suppressed because one or more lines are too long
@@ -26,7 +26,7 @@ export function initConfig() {
|
||||
// 4. Not running on a typical relay port (3334) - likely a static server
|
||||
const hasStoredRelay = !!localStorage.getItem("relayUrl");
|
||||
const isFileProtocol = window.location.protocol === 'file:';
|
||||
const isNonRelayPort = !['3334', '443', '80', ''].includes(window.location.port);
|
||||
const isNonRelayPort = !['3334', '7777', '443', '80', ''].includes(window.location.port);
|
||||
|
||||
const standalone = BUILD_STANDALONE_MODE || hasStoredRelay || isFileProtocol || isNonRelayPort;
|
||||
isStandaloneMode.set(standalone);
|
||||
|
||||
@@ -951,11 +951,10 @@ export async function fetchAllEvents(options = {}) {
|
||||
} = options;
|
||||
|
||||
const now = Math.floor(Date.now() / 1000);
|
||||
const thirtyDaysAgo = now - (30 * 24 * 60 * 60);
|
||||
const sixMonthsAgo = now - (180 * 24 * 60 * 60);
|
||||
const fiveYearsAgo = now - (5 * 365 * 24 * 60 * 60);
|
||||
|
||||
// Start with 30 days if no since specified
|
||||
const initialSince = since || thirtyDaysAgo;
|
||||
// Start with 5 years if no since specified
|
||||
const initialSince = since || fiveYearsAgo;
|
||||
|
||||
const filters = [{ ...rest }];
|
||||
filters[0].since = initialSince;
|
||||
@@ -964,21 +963,10 @@ export async function fetchAllEvents(options = {}) {
|
||||
if (kinds) filters[0].kinds = kinds;
|
||||
if (limit) filters[0].limit = limit;
|
||||
|
||||
let events = await fetchEvents(filters, {
|
||||
const events = await fetchEvents(filters, {
|
||||
timeout: 30000
|
||||
});
|
||||
|
||||
// If we got few results and weren't already using a longer window, retry with 6 months
|
||||
const fewResultsThreshold = Math.min(20, limit / 2);
|
||||
if (events.length < fewResultsThreshold && initialSince > sixMonthsAgo && !since) {
|
||||
console.log(`[fetchAllEvents] Only got ${events.length} events, retrying with 6-month window...`);
|
||||
filters[0].since = sixMonthsAgo;
|
||||
events = await fetchEvents(filters, {
|
||||
timeout: 30000
|
||||
});
|
||||
console.log(`[fetchAllEvents] 6-month window returned ${events.length} events`);
|
||||
}
|
||||
|
||||
return events;
|
||||
}
|
||||
|
||||
|
||||
@@ -590,14 +590,14 @@ func (d *D) QueryEventsWithOptions(c context.Context, f *filter.F, includeDelete
|
||||
if f.Limit != nil && len(evs) > int(*f.Limit) {
|
||||
evs = evs[:*f.Limit]
|
||||
}
|
||||
// delete the expired events in a background thread
|
||||
go func() {
|
||||
for i, ser := range expDeletes {
|
||||
if err = d.DeleteEventBySerial(c, ser, expEvs[i]); chk.E(err) {
|
||||
continue
|
||||
}
|
||||
}
|
||||
}()
|
||||
// TODO: DISABLED - inline deletion of expired events causes Badger race conditions
|
||||
// under high concurrent load ("assignment to entry in nil map" panic).
|
||||
// Expired events should be cleaned up by a separate, rate-limited background
|
||||
// worker instead of being deleted inline during query processing.
|
||||
// See: pkg/storage/gc.go TODOs for proper batch deletion implementation.
|
||||
if len(expDeletes) > 0 {
|
||||
log.D.F("QueryEvents: found %d expired events (deletion disabled)", len(expDeletes))
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
|
||||
@@ -3,12 +3,15 @@ package neo4j
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"git.mleku.dev/mleku/nostr/encoders/event"
|
||||
"git.mleku.dev/mleku/nostr/encoders/filter"
|
||||
"git.mleku.dev/mleku/nostr/encoders/hex"
|
||||
"git.mleku.dev/mleku/nostr/encoders/kind"
|
||||
"git.mleku.dev/mleku/nostr/encoders/tag"
|
||||
"lol.mleku.dev/log"
|
||||
"next.orly.dev/pkg/database/indexes/types"
|
||||
@@ -41,11 +44,81 @@ func (n *N) QueryEventsWithOptions(
|
||||
}
|
||||
|
||||
// Parse response
|
||||
evs, err = n.parseEventsFromResult(result)
|
||||
allEvents, err := n.parseEventsFromResult(result)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to parse events: %w", err)
|
||||
}
|
||||
|
||||
// Filter replaceable events to only return the latest version
|
||||
// unless showAllVersions is true
|
||||
if showAllVersions {
|
||||
return allEvents, nil
|
||||
}
|
||||
|
||||
// Separate events by type and filter replaceables
|
||||
replaceableEvents := make(map[string]*event.E) // key: pubkey:kind
|
||||
paramReplaceableEvents := make(map[string]map[string]*event.E) // key: pubkey:kind -> d-tag -> event
|
||||
var regularEvents event.S
|
||||
|
||||
for _, ev := range allEvents {
|
||||
if kind.IsReplaceable(ev.Kind) {
|
||||
// For replaceable events, keep only the latest per pubkey:kind
|
||||
key := hex.Enc(ev.Pubkey) + ":" + strconv.Itoa(int(ev.Kind))
|
||||
existing, exists := replaceableEvents[key]
|
||||
if !exists || ev.CreatedAt > existing.CreatedAt {
|
||||
replaceableEvents[key] = ev
|
||||
}
|
||||
} else if kind.IsParameterizedReplaceable(ev.Kind) {
|
||||
// For parameterized replaceable events, keep only the latest per pubkey:kind:d-tag
|
||||
key := hex.Enc(ev.Pubkey) + ":" + strconv.Itoa(int(ev.Kind))
|
||||
|
||||
// Get the 'd' tag value
|
||||
dTag := ev.Tags.GetFirst([]byte("d"))
|
||||
var dValue string
|
||||
if dTag != nil && dTag.Len() > 1 {
|
||||
dValue = string(dTag.Value())
|
||||
}
|
||||
|
||||
// Initialize inner map if needed
|
||||
if _, exists := paramReplaceableEvents[key]; !exists {
|
||||
paramReplaceableEvents[key] = make(map[string]*event.E)
|
||||
}
|
||||
|
||||
// Keep only the newest version
|
||||
existing, exists := paramReplaceableEvents[key][dValue]
|
||||
if !exists || ev.CreatedAt > existing.CreatedAt {
|
||||
paramReplaceableEvents[key][dValue] = ev
|
||||
}
|
||||
} else {
|
||||
regularEvents = append(regularEvents, ev)
|
||||
}
|
||||
}
|
||||
|
||||
// Combine results
|
||||
evs = make(event.S, 0, len(replaceableEvents)+len(paramReplaceableEvents)+len(regularEvents))
|
||||
|
||||
for _, ev := range replaceableEvents {
|
||||
evs = append(evs, ev)
|
||||
}
|
||||
|
||||
for _, innerMap := range paramReplaceableEvents {
|
||||
for _, ev := range innerMap {
|
||||
evs = append(evs, ev)
|
||||
}
|
||||
}
|
||||
|
||||
evs = append(evs, regularEvents...)
|
||||
|
||||
// Re-sort by timestamp (newest first)
|
||||
sort.Slice(evs, func(i, j int) bool {
|
||||
return evs[i].CreatedAt > evs[j].CreatedAt
|
||||
})
|
||||
|
||||
// Re-apply limit after filtering
|
||||
if f.Limit != nil && len(evs) > int(*f.Limit) {
|
||||
evs = evs[:*f.Limit]
|
||||
}
|
||||
|
||||
return evs, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -56,6 +56,15 @@ func (n *N) SaveEvent(c context.Context, ev *event.E) (exists bool, err error) {
|
||||
return true, nil // Event already exists
|
||||
}
|
||||
|
||||
// For parameterized replaceable events (kinds 30000-39999), delete older versions
|
||||
// before saving the new one. This ensures Neo4j only stores the latest version.
|
||||
if ev.Kind >= 30000 && ev.Kind < 40000 {
|
||||
if err := n.deleteOlderParameterizedReplaceable(c, ev); err != nil {
|
||||
n.Logger.Warningf("failed to delete older replaceable events: %v", err)
|
||||
// Continue with save - older events will be filtered at query time
|
||||
}
|
||||
}
|
||||
|
||||
// Get next serial number
|
||||
serial, err := n.getNextSerial()
|
||||
if err != nil {
|
||||
@@ -444,3 +453,37 @@ ORDER BY e.created_at DESC`
|
||||
|
||||
return wouldReplace, serials, nil
|
||||
}
|
||||
|
||||
// deleteOlderParameterizedReplaceable deletes older versions of parameterized replaceable events
|
||||
// (kinds 30000-39999) that have the same pubkey, kind, and d-tag value.
|
||||
// This is called before saving a new event to ensure only the latest version is stored.
|
||||
func (n *N) deleteOlderParameterizedReplaceable(c context.Context, ev *event.E) error {
|
||||
authorPubkey := hex.Enc(ev.Pubkey[:])
|
||||
|
||||
// Get the d-tag value
|
||||
dTag := ev.Tags.GetFirst([]byte{'d'})
|
||||
dValue := ""
|
||||
if dTag != nil && len(dTag.T) >= 2 {
|
||||
dValue = string(dTag.T[1])
|
||||
}
|
||||
|
||||
// Delete older events with same pubkey, kind, and d-tag
|
||||
// Only delete if the existing event is older than the new one
|
||||
cypher := `
|
||||
MATCH (e:Event {kind: $kind, pubkey: $pubkey})-[:TAGGED_WITH]->(t:Tag {type: 'd', value: $dValue})
|
||||
WHERE e.created_at < $createdAt
|
||||
DETACH DELETE e`
|
||||
|
||||
params := map[string]any{
|
||||
"pubkey": authorPubkey,
|
||||
"kind": int64(ev.Kind),
|
||||
"dValue": dValue,
|
||||
"createdAt": ev.CreatedAt,
|
||||
}
|
||||
|
||||
if _, err := n.ExecuteWrite(c, cypher, params); err != nil {
|
||||
return fmt.Errorf("failed to delete older replaceable events: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -2,6 +2,24 @@
|
||||
|
||||
package storage
|
||||
|
||||
// TODO: IMPORTANT - This GC implementation is EXPERIMENTAL and may cause crashes under high load.
|
||||
// The current implementation has the following issues that need to be addressed:
|
||||
//
|
||||
// 1. Badger race condition: DeleteEventBySerial runs transactions that can trigger
|
||||
// "assignment to entry in nil map" panics in Badger v4.8.0 under concurrent load.
|
||||
// This happens when GC deletes events while many REQ queries are being processed.
|
||||
//
|
||||
// 2. Batch transaction handling: On large datasets (14+ GB), deletions should be:
|
||||
// - Serialized or use a transaction pool to prevent concurrent txn issues
|
||||
// - Batched with proper delays between batches to avoid overwhelming Badger
|
||||
// - Rate-limited based on current system load
|
||||
//
|
||||
// 3. The current 10ms delay every 100 events (line ~237) is insufficient for busy relays.
|
||||
// Consider adaptive rate limiting based on pending transaction count.
|
||||
//
|
||||
// 4. Consider using Badger's WriteBatch API instead of individual Update transactions
|
||||
// for bulk deletions, which may be more efficient and avoid some race conditions.
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
@@ -1 +1 @@
|
||||
v0.52.4
|
||||
v0.52.9
|
||||
|
||||
Reference in New Issue
Block a user