Add archive relay query augmentation and access-based GC (v0.45.0)

- Add async archive relay querying (local results immediate, archives in background)
- Add query caching with filter normalization to avoid repeated requests
- Add session-deduplicated access tracking for events
- Add continuous garbage collection based on access patterns
- Auto-detect storage limit (80% of filesystem) when ORLY_MAX_STORAGE_BYTES=0
- Support NIP-50 search queries to archive relays

New environment variables:
- ORLY_ARCHIVE_ENABLED: Enable archive relay query augmentation
- ORLY_ARCHIVE_RELAYS: Comma-separated archive relay URLs
- ORLY_ARCHIVE_TIMEOUT_SEC: Archive query timeout
- ORLY_ARCHIVE_CACHE_TTL_HRS: Query deduplication window
- ORLY_GC_ENABLED: Enable access-based garbage collection
- ORLY_MAX_STORAGE_BYTES: Max storage (0=auto 80%)
- ORLY_GC_INTERVAL_SEC: GC check interval
- ORLY_GC_BATCH_SIZE: Events per GC cycle

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
woikos
2026-01-02 19:35:16 +01:00
parent 0008d33792
commit 8a14cec3cd
19 changed files with 1718 additions and 2 deletions

283
pkg/archive/archive.go Normal file
View File

@@ -0,0 +1,283 @@
// Package archive provides query augmentation from authoritative archive relays.
// It manages connections to archive relays and fetches events that match local
// queries, caching them locally for future access.
package archive
import (
"context"
"sync"
"time"
"lol.mleku.dev/log"
"git.mleku.dev/mleku/nostr/encoders/event"
"git.mleku.dev/mleku/nostr/encoders/filter"
)
// ArchiveDatabase defines the interface for storing fetched events.
type ArchiveDatabase interface {
SaveEvent(ctx context.Context, ev *event.E) (exists bool, err error)
}
// EventDeliveryChannel defines the interface for streaming results back to clients.
type EventDeliveryChannel interface {
SendEvent(ev *event.E) error
IsConnected() bool
}
// Manager handles connections to archive relays for query augmentation.
type Manager struct {
ctx context.Context
cancel context.CancelFunc
relays []string
timeout time.Duration
db ArchiveDatabase
queryCache *QueryCache
// Connection pool
mu sync.RWMutex
connections map[string]*RelayConnection
// Configuration
enabled bool
}
// Config holds the configuration for the archive manager.
type Config struct {
Enabled bool
Relays []string
TimeoutSec int
CacheTTLHrs int
}
// New creates a new archive manager.
func New(ctx context.Context, db ArchiveDatabase, cfg Config) *Manager {
if !cfg.Enabled || len(cfg.Relays) == 0 {
return &Manager{enabled: false}
}
mgrCtx, cancel := context.WithCancel(ctx)
timeout := time.Duration(cfg.TimeoutSec) * time.Second
if timeout <= 0 {
timeout = 30 * time.Second
}
cacheTTL := time.Duration(cfg.CacheTTLHrs) * time.Hour
if cacheTTL <= 0 {
cacheTTL = 24 * time.Hour
}
m := &Manager{
ctx: mgrCtx,
cancel: cancel,
relays: cfg.Relays,
timeout: timeout,
db: db,
queryCache: NewQueryCache(cacheTTL, 100000), // 100k cached queries
connections: make(map[string]*RelayConnection),
enabled: true,
}
log.I.F("archive manager initialized with %d relays, %v timeout, %v cache TTL",
len(cfg.Relays), timeout, cacheTTL)
return m
}
// IsEnabled returns whether the archive manager is enabled.
func (m *Manager) IsEnabled() bool {
return m.enabled
}
// QueryArchive queries archive relays asynchronously and stores/streams results.
// This should be called in a goroutine after returning local results.
//
// Parameters:
// - subID: the subscription ID for the query
// - connID: the connection ID (for access tracking)
// - f: the filter to query
// - delivered: map of event IDs already delivered to the client
// - listener: optional channel to stream results back (may be nil)
func (m *Manager) QueryArchive(
subID string,
connID string,
f *filter.F,
delivered map[string]struct{},
listener EventDeliveryChannel,
) {
if !m.enabled {
return
}
// Check if this query was recently executed
if m.queryCache.HasQueried(f) {
log.D.F("archive: query cache hit, skipping archive query for sub %s", subID)
return
}
// Mark query as executed
m.queryCache.MarkQueried(f)
// Create query context with timeout
queryCtx, cancel := context.WithTimeout(m.ctx, m.timeout)
defer cancel()
// Query all relays in parallel
var wg sync.WaitGroup
results := make(chan *event.E, 1000)
for _, relayURL := range m.relays {
wg.Add(1)
go func(url string) {
defer wg.Done()
m.queryRelay(queryCtx, url, f, results)
}(relayURL)
}
// Close results channel when all relays are done
go func() {
wg.Wait()
close(results)
}()
// Process results
stored := 0
streamed := 0
for ev := range results {
// Skip if already delivered
evIDStr := string(ev.ID[:])
if _, exists := delivered[evIDStr]; exists {
continue
}
// Store event
exists, err := m.db.SaveEvent(queryCtx, ev)
if err != nil {
log.D.F("archive: failed to save event: %v", err)
continue
}
if !exists {
stored++
}
// Stream to client if still connected
if listener != nil && listener.IsConnected() {
if err := listener.SendEvent(ev); err == nil {
streamed++
delivered[evIDStr] = struct{}{}
}
}
}
if stored > 0 || streamed > 0 {
log.D.F("archive: query %s completed - stored: %d, streamed: %d", subID, stored, streamed)
}
}
// queryRelay queries a single archive relay and sends results to the channel.
func (m *Manager) queryRelay(ctx context.Context, url string, f *filter.F, results chan<- *event.E) {
conn, err := m.getOrCreateConnection(url)
if err != nil {
log.D.F("archive: failed to connect to %s: %v", url, err)
return
}
events, err := conn.Query(ctx, f)
if err != nil {
log.D.F("archive: query failed on %s: %v", url, err)
return
}
for _, ev := range events {
select {
case <-ctx.Done():
return
case results <- ev:
}
}
}
// getOrCreateConnection returns an existing connection or creates a new one.
func (m *Manager) getOrCreateConnection(url string) (*RelayConnection, error) {
m.mu.RLock()
conn, exists := m.connections[url]
m.mu.RUnlock()
if exists && conn.IsConnected() {
return conn, nil
}
m.mu.Lock()
defer m.mu.Unlock()
// Double-check after acquiring write lock
conn, exists = m.connections[url]
if exists && conn.IsConnected() {
return conn, nil
}
// Create new connection
conn = NewRelayConnection(m.ctx, url)
if err := conn.Connect(); err != nil {
return nil, err
}
m.connections[url] = conn
return conn, nil
}
// Stop stops the archive manager and closes all connections.
func (m *Manager) Stop() {
if !m.enabled {
return
}
m.cancel()
m.mu.Lock()
defer m.mu.Unlock()
for _, conn := range m.connections {
conn.Close()
}
m.connections = make(map[string]*RelayConnection)
log.I.F("archive manager stopped")
}
// Stats returns current archive manager statistics.
func (m *Manager) Stats() ManagerStats {
if !m.enabled {
return ManagerStats{}
}
m.mu.RLock()
defer m.mu.RUnlock()
connected := 0
for _, conn := range m.connections {
if conn.IsConnected() {
connected++
}
}
return ManagerStats{
Enabled: m.enabled,
TotalRelays: len(m.relays),
ConnectedRelays: connected,
CachedQueries: m.queryCache.Len(),
MaxCachedQueries: m.queryCache.MaxSize(),
}
}
// ManagerStats holds archive manager statistics.
type ManagerStats struct {
Enabled bool
TotalRelays int
ConnectedRelays int
CachedQueries int
MaxCachedQueries int
}

175
pkg/archive/connection.go Normal file
View File

@@ -0,0 +1,175 @@
package archive
import (
"context"
"sync"
"time"
"git.mleku.dev/mleku/nostr/encoders/event"
"git.mleku.dev/mleku/nostr/encoders/filter"
"git.mleku.dev/mleku/nostr/ws"
"lol.mleku.dev/log"
)
// RelayConnection manages a single archive relay connection.
type RelayConnection struct {
url string
client *ws.Client
ctx context.Context
cancel context.CancelFunc
// Connection state
mu sync.RWMutex
lastConnect time.Time
reconnectDelay time.Duration
connected bool
}
const (
// Initial delay between reconnection attempts
initialReconnectDelay = 5 * time.Second
// Maximum delay between reconnection attempts
maxReconnectDelay = 5 * time.Minute
// Connection timeout
connectTimeout = 10 * time.Second
// Query timeout (per query, not global)
queryTimeout = 30 * time.Second
)
// NewRelayConnection creates a new relay connection.
func NewRelayConnection(parentCtx context.Context, url string) *RelayConnection {
ctx, cancel := context.WithCancel(parentCtx)
return &RelayConnection{
url: url,
ctx: ctx,
cancel: cancel,
reconnectDelay: initialReconnectDelay,
}
}
// Connect establishes a connection to the archive relay.
func (rc *RelayConnection) Connect() error {
rc.mu.Lock()
defer rc.mu.Unlock()
if rc.connected && rc.client != nil {
return nil
}
connectCtx, cancel := context.WithTimeout(rc.ctx, connectTimeout)
defer cancel()
client, err := ws.RelayConnect(connectCtx, rc.url)
if err != nil {
rc.reconnectDelay = min(rc.reconnectDelay*2, maxReconnectDelay)
return err
}
rc.client = client
rc.connected = true
rc.lastConnect = time.Now()
rc.reconnectDelay = initialReconnectDelay
log.D.F("archive: connected to %s", rc.url)
return nil
}
// Query executes a query against the archive relay.
// Returns a slice of events matching the filter.
func (rc *RelayConnection) Query(ctx context.Context, f *filter.F) ([]*event.E, error) {
rc.mu.RLock()
client := rc.client
connected := rc.connected
rc.mu.RUnlock()
if !connected || client == nil {
if err := rc.Connect(); err != nil {
return nil, err
}
rc.mu.RLock()
client = rc.client
rc.mu.RUnlock()
}
// Create query context with timeout
queryCtx, cancel := context.WithTimeout(ctx, queryTimeout)
defer cancel()
// Subscribe to the filter
sub, err := client.Subscribe(queryCtx, filter.NewS(f))
if err != nil {
rc.handleDisconnection()
return nil, err
}
defer sub.Unsub()
// Collect events until EOSE or timeout
var events []*event.E
for {
select {
case <-queryCtx.Done():
return events, nil
case <-sub.EndOfStoredEvents:
return events, nil
case ev := <-sub.Events:
if ev == nil {
return events, nil
}
events = append(events, ev)
}
}
}
// handleDisconnection marks the connection as disconnected.
func (rc *RelayConnection) handleDisconnection() {
rc.mu.Lock()
defer rc.mu.Unlock()
rc.connected = false
if rc.client != nil {
rc.client.Close()
rc.client = nil
}
}
// IsConnected returns whether the relay is currently connected.
func (rc *RelayConnection) IsConnected() bool {
rc.mu.RLock()
defer rc.mu.RUnlock()
if !rc.connected || rc.client == nil {
return false
}
// Check if client is still connected
return rc.client.IsConnected()
}
// Close closes the relay connection.
func (rc *RelayConnection) Close() {
rc.cancel()
rc.mu.Lock()
defer rc.mu.Unlock()
rc.connected = false
if rc.client != nil {
rc.client.Close()
rc.client = nil
}
}
// URL returns the relay URL.
func (rc *RelayConnection) URL() string {
return rc.url
}
// min returns the smaller of two durations.
func min(a, b time.Duration) time.Duration {
if a < b {
return a
}
return b
}

238
pkg/archive/query_cache.go Normal file
View File

@@ -0,0 +1,238 @@
package archive
import (
"container/list"
"crypto/sha256"
"encoding/binary"
"encoding/hex"
"sort"
"sync"
"time"
"git.mleku.dev/mleku/nostr/encoders/filter"
)
// QueryCache tracks which filters have been queried recently to avoid
// repeated requests to archive relays for the same filter.
type QueryCache struct {
mu sync.RWMutex
entries map[string]*list.Element
order *list.List
maxSize int
ttl time.Duration
}
// queryCacheEntry holds a cached query fingerprint and timestamp.
type queryCacheEntry struct {
fingerprint string
queriedAt time.Time
}
// NewQueryCache creates a new query cache.
func NewQueryCache(ttl time.Duration, maxSize int) *QueryCache {
if maxSize <= 0 {
maxSize = 100000
}
if ttl <= 0 {
ttl = 24 * time.Hour
}
return &QueryCache{
entries: make(map[string]*list.Element),
order: list.New(),
maxSize: maxSize,
ttl: ttl,
}
}
// HasQueried returns true if the filter was queried within the TTL.
func (qc *QueryCache) HasQueried(f *filter.F) bool {
fingerprint := qc.normalizeAndHash(f)
qc.mu.RLock()
elem, exists := qc.entries[fingerprint]
qc.mu.RUnlock()
if !exists {
return false
}
entry := elem.Value.(*queryCacheEntry)
// Check if still within TTL
if time.Since(entry.queriedAt) > qc.ttl {
// Expired - remove it
qc.mu.Lock()
if elem, exists := qc.entries[fingerprint]; exists {
delete(qc.entries, fingerprint)
qc.order.Remove(elem)
}
qc.mu.Unlock()
return false
}
return true
}
// MarkQueried marks a filter as having been queried.
func (qc *QueryCache) MarkQueried(f *filter.F) {
fingerprint := qc.normalizeAndHash(f)
qc.mu.Lock()
defer qc.mu.Unlock()
// Update existing entry
if elem, exists := qc.entries[fingerprint]; exists {
qc.order.MoveToFront(elem)
elem.Value.(*queryCacheEntry).queriedAt = time.Now()
return
}
// Evict oldest if at capacity
if len(qc.entries) >= qc.maxSize {
oldest := qc.order.Back()
if oldest != nil {
entry := oldest.Value.(*queryCacheEntry)
delete(qc.entries, entry.fingerprint)
qc.order.Remove(oldest)
}
}
// Add new entry
entry := &queryCacheEntry{
fingerprint: fingerprint,
queriedAt: time.Now(),
}
elem := qc.order.PushFront(entry)
qc.entries[fingerprint] = elem
}
// normalizeAndHash creates a canonical fingerprint for a filter.
// This ensures that differently-ordered filters with the same content
// produce identical fingerprints.
func (qc *QueryCache) normalizeAndHash(f *filter.F) string {
h := sha256.New()
// Normalize and hash IDs (sorted)
if f.Ids != nil && f.Ids.Len() > 0 {
ids := make([]string, 0, f.Ids.Len())
for _, id := range f.Ids.T {
ids = append(ids, string(id))
}
sort.Strings(ids)
h.Write([]byte("ids:"))
for _, id := range ids {
h.Write([]byte(id))
}
}
// Normalize and hash Authors (sorted)
if f.Authors != nil && f.Authors.Len() > 0 {
authors := make([]string, 0, f.Authors.Len())
for _, author := range f.Authors.T {
authors = append(authors, string(author))
}
sort.Strings(authors)
h.Write([]byte("authors:"))
for _, a := range authors {
h.Write([]byte(a))
}
}
// Normalize and hash Kinds (sorted)
if f.Kinds != nil && f.Kinds.Len() > 0 {
kinds := f.Kinds.ToUint16()
sort.Slice(kinds, func(i, j int) bool { return kinds[i] < kinds[j] })
h.Write([]byte("kinds:"))
for _, k := range kinds {
var buf [2]byte
binary.BigEndian.PutUint16(buf[:], k)
h.Write(buf[:])
}
}
// Normalize and hash Tags (sorted by key, then values)
if f.Tags != nil && f.Tags.Len() > 0 {
// Collect all tag keys and sort them
tagMap := make(map[string][]string)
for _, t := range *f.Tags {
if t.Len() > 0 {
key := string(t.Key())
values := make([]string, 0, t.Len()-1)
for j := 1; j < t.Len(); j++ {
values = append(values, string(t.T[j]))
}
sort.Strings(values)
tagMap[key] = values
}
}
// Sort keys and hash
keys := make([]string, 0, len(tagMap))
for k := range tagMap {
keys = append(keys, k)
}
sort.Strings(keys)
h.Write([]byte("tags:"))
for _, k := range keys {
h.Write([]byte(k))
h.Write([]byte(":"))
for _, v := range tagMap[k] {
h.Write([]byte(v))
}
}
}
// Hash Since timestamp
if f.Since != nil {
h.Write([]byte("since:"))
var buf [8]byte
binary.BigEndian.PutUint64(buf[:], uint64(f.Since.V))
h.Write(buf[:])
}
// Hash Until timestamp
if f.Until != nil {
h.Write([]byte("until:"))
var buf [8]byte
binary.BigEndian.PutUint64(buf[:], uint64(f.Until.V))
h.Write(buf[:])
}
// Hash Limit
if f.Limit != nil && *f.Limit > 0 {
h.Write([]byte("limit:"))
var buf [4]byte
binary.BigEndian.PutUint32(buf[:], uint32(*f.Limit))
h.Write(buf[:])
}
// Hash Search (NIP-50)
if len(f.Search) > 0 {
h.Write([]byte("search:"))
h.Write(f.Search)
}
return hex.EncodeToString(h.Sum(nil))
}
// Len returns the number of cached queries.
func (qc *QueryCache) Len() int {
qc.mu.RLock()
defer qc.mu.RUnlock()
return len(qc.entries)
}
// MaxSize returns the maximum cache size.
func (qc *QueryCache) MaxSize() int {
return qc.maxSize
}
// Clear removes all entries from the cache.
func (qc *QueryCache) Clear() {
qc.mu.Lock()
defer qc.mu.Unlock()
qc.entries = make(map[string]*list.Element)
qc.order.Init()
}