Compare commits

..

5 Commits

Author SHA1 Message Date
woikos
ea7bc75fac Fix NIP-11 caching and export streaming issues (v0.46.2)
Some checks failed
Go / build-and-release (push) Has been cancelled
- Fix Content-Type header being set on request instead of response
- Add Vary: Accept header to prevent browser/CDN caching NIP-11 for HTML
- Add periodic flushing during export for HTTP streaming (every 100 events)
- Add initial flush after headers to start streaming immediately
- Add X-Content-Type-Options: nosniff to prevent browser buffering

Files modified:
- app/handle-relayinfo.go: Fix header and add Vary: Accept
- app/server.go: Add initial flush and nosniff header for export
- pkg/database/export.go: Add periodic and final flushing for streaming

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-03 07:17:48 +01:00
woikos
2e9cde01f8 Refactor Tor to subprocess mode, enabled by default (v0.46.1)
Some checks failed
Go / build-and-release (push) Has been cancelled
- Spawn tor binary as subprocess instead of requiring external daemon
- Auto-generate torrc in $ORLY_DATA_DIR/tor/ (userspace, no root)
- Enable Tor by default; gracefully disable if tor binary not found
- Add ORLY_TOR_BINARY and ORLY_TOR_SOCKS config options
- Remove external Tor setup scripts and documentation

Files modified:
- app/config/config.go: New subprocess-based Tor config options
- app/main.go: Updated Tor initialization for new config
- pkg/tor/service.go: Rewritten for subprocess management
- Removed: deploy/orly-tor.service, docs/TOR_SETUP.md, scripts/tor-*.sh

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-03 06:01:09 +01:00
woikos
25d087697e Add Tor hidden service support and fallback relay profile fetching (v0.46.0)
Some checks failed
Go / build-and-release (push) Has been cancelled
- Add pkg/tor package for Tor hidden service integration
- Add Tor config options: ORLY_TOR_ENABLED, ORLY_TOR_PORT, ORLY_TOR_HS_DIR, ORLY_TOR_ONION_ADDRESS
- Extend NIP-11 relay info with addresses field for .onion URLs
- Add fallback relays (Damus, nos.lol, nostr.band, purplepag.es) for profile lookups
- Refactor profile fetching to try local relay first, then fallback relays
- Add Tor setup documentation and deployment scripts

Files modified:
- app/config/config.go: Add Tor configuration options
- app/handle-relayinfo.go: Add ExtendedRelayInfo with addresses field
- app/main.go: Initialize and manage Tor service lifecycle
- app/server.go: Add torService field to Server struct
- app/web/src/constants.js: Add FALLBACK_RELAYS
- app/web/src/nostr.js: Add fallback relay profile fetching
- pkg/tor/: New package for Tor hidden service management
- docs/TOR_SETUP.md: Documentation for Tor configuration
- deploy/orly-tor.service: Systemd service for Tor integration
- scripts/tor-*.sh: Setup scripts for Tor development and production

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-03 05:50:03 +01:00
woikos
6056446a73 Add script to enable archive features on deployment 2026-01-02 19:57:19 +01:00
woikos
8a14cec3cd Add archive relay query augmentation and access-based GC (v0.45.0)
- Add async archive relay querying (local results immediate, archives in background)
- Add query caching with filter normalization to avoid repeated requests
- Add session-deduplicated access tracking for events
- Add continuous garbage collection based on access patterns
- Auto-detect storage limit (80% of filesystem) when ORLY_MAX_STORAGE_BYTES=0
- Support NIP-50 search queries to archive relays

New environment variables:
- ORLY_ARCHIVE_ENABLED: Enable archive relay query augmentation
- ORLY_ARCHIVE_RELAYS: Comma-separated archive relay URLs
- ORLY_ARCHIVE_TIMEOUT_SEC: Archive query timeout
- ORLY_ARCHIVE_CACHE_TTL_HRS: Query deduplication window
- ORLY_GC_ENABLED: Enable access-based garbage collection
- ORLY_MAX_STORAGE_BYTES: Max storage (0=auto 80%)
- ORLY_GC_INTERVAL_SEC: GC check interval
- ORLY_GC_BATCH_SIZE: Events per GC cycle

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-02 19:35:16 +01:00
29 changed files with 2507 additions and 60 deletions

View File

@@ -149,6 +149,13 @@ type C struct {
BunkerEnabled bool `env:"ORLY_BUNKER_ENABLED" default:"false" usage:"enable NIP-46 bunker signing service (requires WireGuard)"`
BunkerPort int `env:"ORLY_BUNKER_PORT" default:"3335" usage:"internal port for bunker WebSocket (only accessible via WireGuard)"`
// Tor hidden service configuration (subprocess mode - runs tor binary automatically)
TorEnabled bool `env:"ORLY_TOR_ENABLED" default:"true" usage:"enable Tor hidden service (spawns tor subprocess; disable with false if tor not installed)"`
TorPort int `env:"ORLY_TOR_PORT" default:"3336" usage:"internal port for Tor hidden service traffic"`
TorDataDir string `env:"ORLY_TOR_DATA_DIR" usage:"Tor data directory (default: $ORLY_DATA_DIR/tor)"`
TorBinary string `env:"ORLY_TOR_BINARY" default:"tor" usage:"path to tor binary (default: search in PATH)"`
TorSOCKS int `env:"ORLY_TOR_SOCKS" default:"0" usage:"SOCKS port for outbound Tor connections (0=disabled)"`
// Cashu access token configuration (NIP-XX)
CashuEnabled bool `env:"ORLY_CASHU_ENABLED" default:"false" usage:"enable Cashu blind signature tokens for access control"`
CashuTokenTTL string `env:"ORLY_CASHU_TOKEN_TTL" default:"168h" usage:"token validity duration (default: 1 week)"`
@@ -160,6 +167,18 @@ type C struct {
// Cluster replication configuration
ClusterPropagatePrivilegedEvents bool `env:"ORLY_CLUSTER_PROPAGATE_PRIVILEGED_EVENTS" default:"true" usage:"propagate privileged events (DMs, gift wraps, etc.) to relay peers for replication"`
// Archive relay configuration (query augmentation from authoritative archives)
ArchiveEnabled bool `env:"ORLY_ARCHIVE_ENABLED" default:"false" usage:"enable archive relay query augmentation (fetch from archives, cache locally)"`
ArchiveRelays []string `env:"ORLY_ARCHIVE_RELAYS" default:"wss://archive.orly.dev/" usage:"comma-separated list of archive relay URLs for query augmentation"`
ArchiveTimeoutSec int `env:"ORLY_ARCHIVE_TIMEOUT_SEC" default:"30" usage:"timeout in seconds for archive relay queries"`
ArchiveCacheTTLHrs int `env:"ORLY_ARCHIVE_CACHE_TTL_HRS" default:"24" usage:"hours to cache query fingerprints to avoid repeated archive requests"`
// Storage management configuration (access-based garbage collection)
MaxStorageBytes int64 `env:"ORLY_MAX_STORAGE_BYTES" default:"0" usage:"maximum storage in bytes (0=auto-detect 80%% of filesystem)"`
GCEnabled bool `env:"ORLY_GC_ENABLED" default:"true" usage:"enable continuous garbage collection based on access patterns"`
GCIntervalSec int `env:"ORLY_GC_INTERVAL_SEC" default:"60" usage:"seconds between GC runs when storage exceeds limit"`
GCBatchSize int `env:"ORLY_GC_BATCH_SIZE" default:"1000" usage:"number of events to consider per GC run"`
// ServeMode is set programmatically by the 'serve' subcommand to grant full owner
// access to all users (no env tag - internal use only)
ServeMode bool
@@ -590,3 +609,54 @@ func (cfg *C) GetCashuConfigValues() (
scopes,
cfg.CashuReauthorize
}
// GetArchiveConfigValues returns the archive relay configuration values.
// This avoids circular imports with pkg/archive while allowing main.go to construct
// the archive manager configuration.
func (cfg *C) GetArchiveConfigValues() (
enabled bool,
relays []string,
timeoutSec int,
cacheTTLHrs int,
) {
return cfg.ArchiveEnabled,
cfg.ArchiveRelays,
cfg.ArchiveTimeoutSec,
cfg.ArchiveCacheTTLHrs
}
// GetStorageConfigValues returns the storage management configuration values.
// This avoids circular imports with pkg/storage while allowing main.go to construct
// the garbage collector and access tracker configuration.
func (cfg *C) GetStorageConfigValues() (
maxStorageBytes int64,
gcEnabled bool,
gcIntervalSec int,
gcBatchSize int,
) {
return cfg.MaxStorageBytes,
cfg.GCEnabled,
cfg.GCIntervalSec,
cfg.GCBatchSize
}
// GetTorConfigValues returns the Tor hidden service configuration values.
// This avoids circular imports with pkg/tor while allowing main.go to construct
// the Tor service configuration.
func (cfg *C) GetTorConfigValues() (
enabled bool,
port int,
dataDir string,
binary string,
socksPort int,
) {
dataDir = cfg.TorDataDir
if dataDir == "" {
dataDir = filepath.Join(cfg.DataDir, "tor")
}
return cfg.TorEnabled,
cfg.TorPort,
dataDir,
cfg.TorBinary,
cfg.TorSOCKS
}

View File

@@ -15,6 +15,13 @@ import (
"next.orly.dev/pkg/version"
)
// ExtendedRelayInfo extends the standard NIP-11 relay info with additional fields.
// The Addresses field contains alternative WebSocket URLs for the relay (e.g., .onion).
type ExtendedRelayInfo struct {
*relayinfo.T
Addresses []string `json:"addresses,omitempty"`
}
// HandleRelayInfo generates and returns a relay information document in JSON
// format based on the server's configuration and supported NIPs.
//
@@ -30,7 +37,8 @@ import (
// Informer interface implementation or predefined server configuration. It
// returns this document as a JSON response to the client.
func (s *Server) HandleRelayInfo(w http.ResponseWriter, r *http.Request) {
r.Header.Set("Content-Type", "application/json")
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Vary", "Accept")
log.D.Ln("handling relay information document")
var info *relayinfo.T
nips := []relayinfo.NIP{
@@ -138,6 +146,32 @@ func (s *Server) HandleRelayInfo(w http.ResponseWriter, r *http.Request) {
},
Icon: icon,
}
if err := json.NewEncoder(w).Encode(info); chk.E(err) {
// Build addresses list from config and Tor service
var addresses []string
// Add configured relay addresses
if len(s.Config.RelayAddresses) > 0 {
addresses = append(addresses, s.Config.RelayAddresses...)
}
// Add Tor hidden service address if available
if s.torService != nil {
if onionAddr := s.torService.OnionWSAddress(); onionAddr != "" {
addresses = append(addresses, onionAddr)
}
}
// Return extended info if we have addresses, otherwise standard info
if len(addresses) > 0 {
extInfo := &ExtendedRelayInfo{
T: info,
Addresses: addresses,
}
if err := json.NewEncoder(w).Encode(extInfo); chk.E(err) {
}
} else {
if err := json.NewEncoder(w).Encode(info); chk.E(err) {
}
}
}

View File

@@ -690,6 +690,31 @@ func (l *Listener) HandleReq(msg []byte) (err error) {
Write(l); chk.E(err) {
return
}
// Record access for returned events (for GC access-based ranking)
if l.accessTracker != nil && len(events) > 0 {
go func(evts event.S, connID string) {
for _, ev := range evts {
if ser, err := l.DB.GetSerialById(ev.ID); err == nil && ser != nil {
l.accessTracker.RecordAccess(ser.Get(), connID)
}
}
}(events, l.connectionID)
}
// Trigger archive relay query if enabled (background fetch + stream results)
if l.archiveManager != nil && l.archiveManager.IsEnabled() && len(*env.Filters) > 0 {
// Use first filter for archive query
f := (*env.Filters)[0]
go l.archiveManager.QueryArchive(
string(env.Subscription),
l.connectionID,
f,
seen,
l, // implements EventDeliveryChannel
)
}
// if the query was for just Ids, we know there can't be any more results,
// so cancel the subscription.
cancel := true

View File

@@ -3,6 +3,7 @@ package app
import (
"context"
"crypto/rand"
"fmt"
"net/http"
"strings"
"time"
@@ -99,15 +100,17 @@ whitelist:
handlerSemSize = 100 // Default if not configured
}
now := time.Now()
listener := &Listener{
ctx: ctx,
cancel: cancel,
Server: s,
conn: conn,
remote: remote,
connectionID: fmt.Sprintf("%s-%d", remote, now.UnixNano()), // Unique connection ID for access tracking
req: r,
cashuToken: cashuToken, // Verified Cashu access token (nil if none provided)
startTime: time.Now(),
startTime: now,
writeChan: make(chan publish.WriteRequest, 100), // Buffered channel for writes
writeDone: make(chan struct{}),
messageQueue: make(chan messageRequest, 100), // Buffered channel for message processing

View File

@@ -28,6 +28,7 @@ type Listener struct {
ctx context.Context
cancel context.CancelFunc // Cancel function for this listener's context
remote string
connectionID string // Unique identifier for this connection (for access tracking)
req *http.Request
challenge atomicutils.Bytes
authedPubkey atomicutils.Bytes
@@ -112,6 +113,29 @@ func (l *Listener) Write(p []byte) (n int, err error) {
}
}
// SendEvent sends an event to the client. Implements archive.EventDeliveryChannel.
func (l *Listener) SendEvent(ev *event.E) error {
if ev == nil {
return nil
}
// Serialize the event as an EVENT envelope
data := ev.Serialize()
// Use Write to send
_, err := l.Write(data)
return err
}
// IsConnected returns whether the client connection is still active.
// Implements archive.EventDeliveryChannel.
func (l *Listener) IsConnected() bool {
select {
case <-l.ctx.Done():
return false
default:
return l.conn != nil
}
}
// WriteControl sends a control message through the write channel
func (l *Listener) WriteControl(messageType int, data []byte, deadline time.Time) (err error) {
// Defensive: recover from any panic when sending to closed channel

View File

@@ -29,8 +29,11 @@ import (
cashuiface "next.orly.dev/pkg/interfaces/cashu"
"next.orly.dev/pkg/ratelimit"
"next.orly.dev/pkg/spider"
"next.orly.dev/pkg/storage"
dsync "next.orly.dev/pkg/sync"
"next.orly.dev/pkg/wireguard"
"next.orly.dev/pkg/archive"
"next.orly.dev/pkg/tor"
"git.mleku.dev/mleku/nostr/interfaces/signer/p8k"
)
@@ -512,6 +515,68 @@ func Run(
}
}
// Initialize access tracker for storage management (only for Badger backend)
if badgerDB, ok := db.(*database.D); ok {
l.accessTracker = storage.NewAccessTracker(badgerDB, 100000) // 100k dedup cache
l.accessTracker.Start()
log.I.F("access tracker initialized")
// Initialize garbage collector if enabled
maxBytes, gcEnabled, gcIntervalSec, gcBatchSize := cfg.GetStorageConfigValues()
if gcEnabled {
gcCfg := storage.GCConfig{
MaxStorageBytes: maxBytes,
Interval: time.Duration(gcIntervalSec) * time.Second,
BatchSize: gcBatchSize,
MinAgeSec: 3600, // Minimum 1 hour before eviction
}
l.garbageCollector = storage.NewGarbageCollector(ctx, badgerDB, l.accessTracker, gcCfg)
l.garbageCollector.Start()
log.I.F("garbage collector started (interval: %ds, batch: %d)", gcIntervalSec, gcBatchSize)
}
}
// Initialize archive relay manager if enabled
archiveEnabled, archiveRelays, archiveTimeoutSec, archiveCacheTTLHrs := cfg.GetArchiveConfigValues()
if archiveEnabled && len(archiveRelays) > 0 {
archiveCfg := archive.Config{
Enabled: true,
Relays: archiveRelays,
TimeoutSec: archiveTimeoutSec,
CacheTTLHrs: archiveCacheTTLHrs,
}
l.archiveManager = archive.New(ctx, db, archiveCfg)
log.I.F("archive relay manager initialized with %d relays", len(archiveRelays))
}
// Initialize Tor hidden service if enabled (spawns tor subprocess)
torEnabled, torPort, torDataDir, torBinary, torSOCKSPort := cfg.GetTorConfigValues()
if torEnabled {
torCfg := &tor.Config{
Port: torPort,
DataDir: torDataDir,
Binary: torBinary,
SOCKSPort: torSOCKSPort,
Handler: l,
}
var err error
l.torService, err = tor.New(torCfg)
if err != nil {
log.W.F("Tor disabled: %v", err)
} else {
if err = l.torService.Start(); err != nil {
log.W.F("failed to start Tor service: %v", err)
l.torService = nil
} else {
if addr := l.torService.OnionWSAddress(); addr != "" {
log.I.F("Tor hidden service listening on port %d, address: %s", torPort, addr)
} else {
log.I.F("Tor hidden service listening on port %d (waiting for .onion address)", torPort)
}
}
}
}
// Start rate limiter if enabled
if limiter != nil && limiter.IsEnabled() {
limiter.Start()
@@ -621,6 +686,30 @@ func Run(
log.I.F("rate limiter stopped")
}
// Stop archive manager if running
if l.archiveManager != nil {
l.archiveManager.Stop()
log.I.F("archive manager stopped")
}
// Stop Tor service if running
if l.torService != nil {
l.torService.Stop()
log.I.F("Tor service stopped")
}
// Stop garbage collector if running
if l.garbageCollector != nil {
l.garbageCollector.Stop()
log.I.F("garbage collector stopped")
}
// Stop access tracker if running
if l.accessTracker != nil {
l.accessTracker.Stop()
log.I.F("access tracker stopped")
}
// Stop bunker server if running
if l.bunkerServer != nil {
l.bunkerServer.Stop()

View File

@@ -38,8 +38,11 @@ import (
"next.orly.dev/pkg/cashu/verifier"
"next.orly.dev/pkg/ratelimit"
"next.orly.dev/pkg/spider"
"next.orly.dev/pkg/storage"
dsync "next.orly.dev/pkg/sync"
"next.orly.dev/pkg/wireguard"
"next.orly.dev/pkg/archive"
"next.orly.dev/pkg/tor"
)
type Server struct {
@@ -91,6 +94,14 @@ type Server struct {
// Cashu access token system (NIP-XX)
CashuIssuer *issuer.Issuer
CashuVerifier *verifier.Verifier
// Archive relay and storage management
archiveManager *archive.Manager
accessTracker *storage.AccessTracker
garbageCollector *storage.GarbageCollector
// Tor hidden service
torService *tor.Service
}
// isIPBlacklisted checks if an IP address is blacklisted using the managed ACL system
@@ -727,6 +738,12 @@ func (s *Server) handleExport(w http.ResponseWriter, r *http.Request) {
w.Header().Set(
"Content-Disposition", "attachment; filename=\""+filename+"\"",
)
w.Header().Set("X-Content-Type-Options", "nosniff")
// Flush headers to start streaming immediately
if flusher, ok := w.(http.Flusher); ok {
flusher.Flush()
}
// Stream export
s.DB.Export(s.Ctx, w, pks...)

View File

@@ -1,5 +1,6 @@
{
"lockfileVersion": 1,
"configVersion": 0,
"workspaces": {
"": {
"name": "svelte-app",

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -7,6 +7,14 @@ export const DEFAULT_RELAYS = [
`${wsProtocol}//${window.location.host}/`,
];
// Fallback relays for profile lookups when local relay doesn't have the data
export const FALLBACK_RELAYS = [
'wss://relay.damus.io',
'wss://nos.lol',
'wss://relay.nostr.band',
'wss://purplepag.es',
];
// Replaceable kinds for the recovery dropdown
// Based on NIP-01: kinds 0, 3, and 10000-19999 are replaceable
// kinds 30000-39999 are addressable (parameterized replaceable)

View File

@@ -1,7 +1,7 @@
import { SimplePool } from 'nostr-tools/pool';
import { EventStore } from 'applesauce-core';
import { PrivateKeySigner } from 'applesauce-signers';
import { DEFAULT_RELAYS } from "./constants.js";
import { DEFAULT_RELAYS, FALLBACK_RELAYS } from "./constants.js";
// Nostr client wrapper using nostr-tools
class NostrClient {
@@ -450,65 +450,115 @@ export async function fetchUserProfile(pubkey) {
console.warn("Failed to load cached profile", e);
}
// 2) Fetch profile from relays
try {
const filters = [{
kinds: [0],
authors: [pubkey],
limit: 1
}];
const filters = [{
kinds: [0],
authors: [pubkey],
limit: 1
}];
// 2) Fetch profile from local relay first
try {
const events = await fetchEvents(filters, { timeout: 10000 });
if (events.length > 0) {
const profileEvent = events[0];
console.log("Profile fetched:", profileEvent);
// Cache the event
await putEvent(profileEvent);
// Publish the profile event to the local relay
try {
console.log("Publishing profile event to local relay:", profileEvent.id);
await nostrClient.publish(profileEvent);
console.log("Profile event successfully saved to local relay");
} catch (publishError) {
console.warn("Failed to publish profile to local relay:", publishError);
// Don't fail the whole operation if publishing fails
}
// Parse profile data
const profile = parseProfileFromEvent(profileEvent);
// Notify listeners that an updated profile is available
try {
if (typeof window !== "undefined" && window.dispatchEvent) {
window.dispatchEvent(
new CustomEvent("profile-updated", {
detail: { pubkey, profile, event: profileEvent },
}),
);
}
} catch (e) {
console.warn("Failed to dispatch profile-updated event", e);
}
return profile;
} else {
// No profile found - create a default profile for new users
console.log("No profile found for pubkey, creating default:", pubkey);
return await createDefaultProfile(pubkey);
console.log("Profile fetched from local relay:", profileEvent);
return processProfileEvent(profileEvent, pubkey);
}
} catch (error) {
console.error("Failed to fetch profile:", error);
// Try to create default profile on error too
try {
return await createDefaultProfile(pubkey);
} catch (e) {
console.error("Failed to create default profile:", e);
return null;
}
console.warn("Failed to fetch profile from local relay:", error);
}
// 3) Try fallback relays if local relay doesn't have the profile
console.log("Profile not found on local relay, trying fallback relays:", FALLBACK_RELAYS);
try {
const profileEvent = await fetchProfileFromFallbackRelays(pubkey, filters);
if (profileEvent) {
return processProfileEvent(profileEvent, pubkey);
}
} catch (error) {
console.warn("Failed to fetch profile from fallback relays:", error);
}
// 4) No profile found anywhere - create a default profile for new users
console.log("No profile found for pubkey, creating default:", pubkey);
try {
return await createDefaultProfile(pubkey);
} catch (e) {
console.error("Failed to create default profile:", e);
return null;
}
}
// Helper to fetch profile from fallback relays
async function fetchProfileFromFallbackRelays(pubkey, filters) {
return new Promise((resolve) => {
const events = [];
const timeoutId = setTimeout(() => {
sub.close();
// Return the most recent profile event
if (events.length > 0) {
events.sort((a, b) => b.created_at - a.created_at);
resolve(events[0]);
} else {
resolve(null);
}
}, 5000);
const sub = nostrClient.pool.subscribeMany(
FALLBACK_RELAYS,
filters,
{
onevent(event) {
console.log("Profile event received from fallback relay:", event.id?.substring(0, 8));
events.push(event);
},
oneose() {
clearTimeout(timeoutId);
sub.close();
if (events.length > 0) {
events.sort((a, b) => b.created_at - a.created_at);
resolve(events[0]);
} else {
resolve(null);
}
}
}
);
});
}
// Helper to process and cache a profile event
async function processProfileEvent(profileEvent, pubkey) {
// Cache the event
await putEvent(profileEvent);
// Publish the profile event to the local relay
try {
console.log("Publishing profile event to local relay:", profileEvent.id);
await nostrClient.publish(profileEvent);
console.log("Profile event successfully saved to local relay");
} catch (publishError) {
console.warn("Failed to publish profile to local relay:", publishError);
}
// Parse profile data
const profile = parseProfileFromEvent(profileEvent);
// Notify listeners that an updated profile is available
try {
if (typeof window !== "undefined" && window.dispatchEvent) {
window.dispatchEvent(
new CustomEvent("profile-updated", {
detail: { pubkey, profile, event: profileEvent },
}),
);
}
} catch (e) {
console.warn("Failed to dispatch profile-updated event", e);
}
return profile;
}
/**

View File

@@ -278,6 +278,17 @@ func (w *RelySQLiteWrapper) EventIdsBySerial(start uint64, count int) (evs []uin
return nil, fmt.Errorf("not implemented")
}
// Access tracking stubs (not needed for benchmarking)
func (w *RelySQLiteWrapper) RecordEventAccess(serial uint64, connectionID string) error {
return nil // No-op for benchmarking
}
func (w *RelySQLiteWrapper) GetEventAccessInfo(serial uint64) (lastAccess int64, accessCount uint32, err error) {
return 0, 0, nil
}
func (w *RelySQLiteWrapper) GetLeastAccessedEvents(limit int, minAgeSec int64) (serials []uint64, err error) {
return nil, nil
}
// Helper function to check if a kind is replaceable
func isReplaceableKind(kind int) bool {
return (kind >= 10000 && kind < 20000) || kind == 0 || kind == 3

283
pkg/archive/archive.go Normal file
View File

@@ -0,0 +1,283 @@
// Package archive provides query augmentation from authoritative archive relays.
// It manages connections to archive relays and fetches events that match local
// queries, caching them locally for future access.
package archive
import (
"context"
"sync"
"time"
"lol.mleku.dev/log"
"git.mleku.dev/mleku/nostr/encoders/event"
"git.mleku.dev/mleku/nostr/encoders/filter"
)
// ArchiveDatabase defines the interface for storing fetched events.
type ArchiveDatabase interface {
SaveEvent(ctx context.Context, ev *event.E) (exists bool, err error)
}
// EventDeliveryChannel defines the interface for streaming results back to clients.
type EventDeliveryChannel interface {
SendEvent(ev *event.E) error
IsConnected() bool
}
// Manager handles connections to archive relays for query augmentation.
type Manager struct {
ctx context.Context
cancel context.CancelFunc
relays []string
timeout time.Duration
db ArchiveDatabase
queryCache *QueryCache
// Connection pool
mu sync.RWMutex
connections map[string]*RelayConnection
// Configuration
enabled bool
}
// Config holds the configuration for the archive manager.
type Config struct {
Enabled bool
Relays []string
TimeoutSec int
CacheTTLHrs int
}
// New creates a new archive manager.
func New(ctx context.Context, db ArchiveDatabase, cfg Config) *Manager {
if !cfg.Enabled || len(cfg.Relays) == 0 {
return &Manager{enabled: false}
}
mgrCtx, cancel := context.WithCancel(ctx)
timeout := time.Duration(cfg.TimeoutSec) * time.Second
if timeout <= 0 {
timeout = 30 * time.Second
}
cacheTTL := time.Duration(cfg.CacheTTLHrs) * time.Hour
if cacheTTL <= 0 {
cacheTTL = 24 * time.Hour
}
m := &Manager{
ctx: mgrCtx,
cancel: cancel,
relays: cfg.Relays,
timeout: timeout,
db: db,
queryCache: NewQueryCache(cacheTTL, 100000), // 100k cached queries
connections: make(map[string]*RelayConnection),
enabled: true,
}
log.I.F("archive manager initialized with %d relays, %v timeout, %v cache TTL",
len(cfg.Relays), timeout, cacheTTL)
return m
}
// IsEnabled returns whether the archive manager is enabled.
func (m *Manager) IsEnabled() bool {
return m.enabled
}
// QueryArchive queries archive relays asynchronously and stores/streams results.
// This should be called in a goroutine after returning local results.
//
// Parameters:
// - subID: the subscription ID for the query
// - connID: the connection ID (for access tracking)
// - f: the filter to query
// - delivered: map of event IDs already delivered to the client
// - listener: optional channel to stream results back (may be nil)
func (m *Manager) QueryArchive(
subID string,
connID string,
f *filter.F,
delivered map[string]struct{},
listener EventDeliveryChannel,
) {
if !m.enabled {
return
}
// Check if this query was recently executed
if m.queryCache.HasQueried(f) {
log.D.F("archive: query cache hit, skipping archive query for sub %s", subID)
return
}
// Mark query as executed
m.queryCache.MarkQueried(f)
// Create query context with timeout
queryCtx, cancel := context.WithTimeout(m.ctx, m.timeout)
defer cancel()
// Query all relays in parallel
var wg sync.WaitGroup
results := make(chan *event.E, 1000)
for _, relayURL := range m.relays {
wg.Add(1)
go func(url string) {
defer wg.Done()
m.queryRelay(queryCtx, url, f, results)
}(relayURL)
}
// Close results channel when all relays are done
go func() {
wg.Wait()
close(results)
}()
// Process results
stored := 0
streamed := 0
for ev := range results {
// Skip if already delivered
evIDStr := string(ev.ID[:])
if _, exists := delivered[evIDStr]; exists {
continue
}
// Store event
exists, err := m.db.SaveEvent(queryCtx, ev)
if err != nil {
log.D.F("archive: failed to save event: %v", err)
continue
}
if !exists {
stored++
}
// Stream to client if still connected
if listener != nil && listener.IsConnected() {
if err := listener.SendEvent(ev); err == nil {
streamed++
delivered[evIDStr] = struct{}{}
}
}
}
if stored > 0 || streamed > 0 {
log.D.F("archive: query %s completed - stored: %d, streamed: %d", subID, stored, streamed)
}
}
// queryRelay queries a single archive relay and sends results to the channel.
func (m *Manager) queryRelay(ctx context.Context, url string, f *filter.F, results chan<- *event.E) {
conn, err := m.getOrCreateConnection(url)
if err != nil {
log.D.F("archive: failed to connect to %s: %v", url, err)
return
}
events, err := conn.Query(ctx, f)
if err != nil {
log.D.F("archive: query failed on %s: %v", url, err)
return
}
for _, ev := range events {
select {
case <-ctx.Done():
return
case results <- ev:
}
}
}
// getOrCreateConnection returns an existing connection or creates a new one.
func (m *Manager) getOrCreateConnection(url string) (*RelayConnection, error) {
m.mu.RLock()
conn, exists := m.connections[url]
m.mu.RUnlock()
if exists && conn.IsConnected() {
return conn, nil
}
m.mu.Lock()
defer m.mu.Unlock()
// Double-check after acquiring write lock
conn, exists = m.connections[url]
if exists && conn.IsConnected() {
return conn, nil
}
// Create new connection
conn = NewRelayConnection(m.ctx, url)
if err := conn.Connect(); err != nil {
return nil, err
}
m.connections[url] = conn
return conn, nil
}
// Stop stops the archive manager and closes all connections.
func (m *Manager) Stop() {
if !m.enabled {
return
}
m.cancel()
m.mu.Lock()
defer m.mu.Unlock()
for _, conn := range m.connections {
conn.Close()
}
m.connections = make(map[string]*RelayConnection)
log.I.F("archive manager stopped")
}
// Stats returns current archive manager statistics.
func (m *Manager) Stats() ManagerStats {
if !m.enabled {
return ManagerStats{}
}
m.mu.RLock()
defer m.mu.RUnlock()
connected := 0
for _, conn := range m.connections {
if conn.IsConnected() {
connected++
}
}
return ManagerStats{
Enabled: m.enabled,
TotalRelays: len(m.relays),
ConnectedRelays: connected,
CachedQueries: m.queryCache.Len(),
MaxCachedQueries: m.queryCache.MaxSize(),
}
}
// ManagerStats holds archive manager statistics.
type ManagerStats struct {
Enabled bool
TotalRelays int
ConnectedRelays int
CachedQueries int
MaxCachedQueries int
}

175
pkg/archive/connection.go Normal file
View File

@@ -0,0 +1,175 @@
package archive
import (
"context"
"sync"
"time"
"git.mleku.dev/mleku/nostr/encoders/event"
"git.mleku.dev/mleku/nostr/encoders/filter"
"git.mleku.dev/mleku/nostr/ws"
"lol.mleku.dev/log"
)
// RelayConnection manages a single archive relay connection.
type RelayConnection struct {
url string
client *ws.Client
ctx context.Context
cancel context.CancelFunc
// Connection state
mu sync.RWMutex
lastConnect time.Time
reconnectDelay time.Duration
connected bool
}
const (
// Initial delay between reconnection attempts
initialReconnectDelay = 5 * time.Second
// Maximum delay between reconnection attempts
maxReconnectDelay = 5 * time.Minute
// Connection timeout
connectTimeout = 10 * time.Second
// Query timeout (per query, not global)
queryTimeout = 30 * time.Second
)
// NewRelayConnection creates a new relay connection.
func NewRelayConnection(parentCtx context.Context, url string) *RelayConnection {
ctx, cancel := context.WithCancel(parentCtx)
return &RelayConnection{
url: url,
ctx: ctx,
cancel: cancel,
reconnectDelay: initialReconnectDelay,
}
}
// Connect establishes a connection to the archive relay.
func (rc *RelayConnection) Connect() error {
rc.mu.Lock()
defer rc.mu.Unlock()
if rc.connected && rc.client != nil {
return nil
}
connectCtx, cancel := context.WithTimeout(rc.ctx, connectTimeout)
defer cancel()
client, err := ws.RelayConnect(connectCtx, rc.url)
if err != nil {
rc.reconnectDelay = min(rc.reconnectDelay*2, maxReconnectDelay)
return err
}
rc.client = client
rc.connected = true
rc.lastConnect = time.Now()
rc.reconnectDelay = initialReconnectDelay
log.D.F("archive: connected to %s", rc.url)
return nil
}
// Query executes a query against the archive relay.
// Returns a slice of events matching the filter.
func (rc *RelayConnection) Query(ctx context.Context, f *filter.F) ([]*event.E, error) {
rc.mu.RLock()
client := rc.client
connected := rc.connected
rc.mu.RUnlock()
if !connected || client == nil {
if err := rc.Connect(); err != nil {
return nil, err
}
rc.mu.RLock()
client = rc.client
rc.mu.RUnlock()
}
// Create query context with timeout
queryCtx, cancel := context.WithTimeout(ctx, queryTimeout)
defer cancel()
// Subscribe to the filter
sub, err := client.Subscribe(queryCtx, filter.NewS(f))
if err != nil {
rc.handleDisconnection()
return nil, err
}
defer sub.Unsub()
// Collect events until EOSE or timeout
var events []*event.E
for {
select {
case <-queryCtx.Done():
return events, nil
case <-sub.EndOfStoredEvents:
return events, nil
case ev := <-sub.Events:
if ev == nil {
return events, nil
}
events = append(events, ev)
}
}
}
// handleDisconnection marks the connection as disconnected.
func (rc *RelayConnection) handleDisconnection() {
rc.mu.Lock()
defer rc.mu.Unlock()
rc.connected = false
if rc.client != nil {
rc.client.Close()
rc.client = nil
}
}
// IsConnected returns whether the relay is currently connected.
func (rc *RelayConnection) IsConnected() bool {
rc.mu.RLock()
defer rc.mu.RUnlock()
if !rc.connected || rc.client == nil {
return false
}
// Check if client is still connected
return rc.client.IsConnected()
}
// Close closes the relay connection.
func (rc *RelayConnection) Close() {
rc.cancel()
rc.mu.Lock()
defer rc.mu.Unlock()
rc.connected = false
if rc.client != nil {
rc.client.Close()
rc.client = nil
}
}
// URL returns the relay URL.
func (rc *RelayConnection) URL() string {
return rc.url
}
// min returns the smaller of two durations.
func min(a, b time.Duration) time.Duration {
if a < b {
return a
}
return b
}

238
pkg/archive/query_cache.go Normal file
View File

@@ -0,0 +1,238 @@
package archive
import (
"container/list"
"crypto/sha256"
"encoding/binary"
"encoding/hex"
"sort"
"sync"
"time"
"git.mleku.dev/mleku/nostr/encoders/filter"
)
// QueryCache tracks which filters have been queried recently to avoid
// repeated requests to archive relays for the same filter.
type QueryCache struct {
mu sync.RWMutex
entries map[string]*list.Element
order *list.List
maxSize int
ttl time.Duration
}
// queryCacheEntry holds a cached query fingerprint and timestamp.
type queryCacheEntry struct {
fingerprint string
queriedAt time.Time
}
// NewQueryCache creates a new query cache.
func NewQueryCache(ttl time.Duration, maxSize int) *QueryCache {
if maxSize <= 0 {
maxSize = 100000
}
if ttl <= 0 {
ttl = 24 * time.Hour
}
return &QueryCache{
entries: make(map[string]*list.Element),
order: list.New(),
maxSize: maxSize,
ttl: ttl,
}
}
// HasQueried returns true if the filter was queried within the TTL.
func (qc *QueryCache) HasQueried(f *filter.F) bool {
fingerprint := qc.normalizeAndHash(f)
qc.mu.RLock()
elem, exists := qc.entries[fingerprint]
qc.mu.RUnlock()
if !exists {
return false
}
entry := elem.Value.(*queryCacheEntry)
// Check if still within TTL
if time.Since(entry.queriedAt) > qc.ttl {
// Expired - remove it
qc.mu.Lock()
if elem, exists := qc.entries[fingerprint]; exists {
delete(qc.entries, fingerprint)
qc.order.Remove(elem)
}
qc.mu.Unlock()
return false
}
return true
}
// MarkQueried marks a filter as having been queried.
func (qc *QueryCache) MarkQueried(f *filter.F) {
fingerprint := qc.normalizeAndHash(f)
qc.mu.Lock()
defer qc.mu.Unlock()
// Update existing entry
if elem, exists := qc.entries[fingerprint]; exists {
qc.order.MoveToFront(elem)
elem.Value.(*queryCacheEntry).queriedAt = time.Now()
return
}
// Evict oldest if at capacity
if len(qc.entries) >= qc.maxSize {
oldest := qc.order.Back()
if oldest != nil {
entry := oldest.Value.(*queryCacheEntry)
delete(qc.entries, entry.fingerprint)
qc.order.Remove(oldest)
}
}
// Add new entry
entry := &queryCacheEntry{
fingerprint: fingerprint,
queriedAt: time.Now(),
}
elem := qc.order.PushFront(entry)
qc.entries[fingerprint] = elem
}
// normalizeAndHash creates a canonical fingerprint for a filter.
// This ensures that differently-ordered filters with the same content
// produce identical fingerprints.
func (qc *QueryCache) normalizeAndHash(f *filter.F) string {
h := sha256.New()
// Normalize and hash IDs (sorted)
if f.Ids != nil && f.Ids.Len() > 0 {
ids := make([]string, 0, f.Ids.Len())
for _, id := range f.Ids.T {
ids = append(ids, string(id))
}
sort.Strings(ids)
h.Write([]byte("ids:"))
for _, id := range ids {
h.Write([]byte(id))
}
}
// Normalize and hash Authors (sorted)
if f.Authors != nil && f.Authors.Len() > 0 {
authors := make([]string, 0, f.Authors.Len())
for _, author := range f.Authors.T {
authors = append(authors, string(author))
}
sort.Strings(authors)
h.Write([]byte("authors:"))
for _, a := range authors {
h.Write([]byte(a))
}
}
// Normalize and hash Kinds (sorted)
if f.Kinds != nil && f.Kinds.Len() > 0 {
kinds := f.Kinds.ToUint16()
sort.Slice(kinds, func(i, j int) bool { return kinds[i] < kinds[j] })
h.Write([]byte("kinds:"))
for _, k := range kinds {
var buf [2]byte
binary.BigEndian.PutUint16(buf[:], k)
h.Write(buf[:])
}
}
// Normalize and hash Tags (sorted by key, then values)
if f.Tags != nil && f.Tags.Len() > 0 {
// Collect all tag keys and sort them
tagMap := make(map[string][]string)
for _, t := range *f.Tags {
if t.Len() > 0 {
key := string(t.Key())
values := make([]string, 0, t.Len()-1)
for j := 1; j < t.Len(); j++ {
values = append(values, string(t.T[j]))
}
sort.Strings(values)
tagMap[key] = values
}
}
// Sort keys and hash
keys := make([]string, 0, len(tagMap))
for k := range tagMap {
keys = append(keys, k)
}
sort.Strings(keys)
h.Write([]byte("tags:"))
for _, k := range keys {
h.Write([]byte(k))
h.Write([]byte(":"))
for _, v := range tagMap[k] {
h.Write([]byte(v))
}
}
}
// Hash Since timestamp
if f.Since != nil {
h.Write([]byte("since:"))
var buf [8]byte
binary.BigEndian.PutUint64(buf[:], uint64(f.Since.V))
h.Write(buf[:])
}
// Hash Until timestamp
if f.Until != nil {
h.Write([]byte("until:"))
var buf [8]byte
binary.BigEndian.PutUint64(buf[:], uint64(f.Until.V))
h.Write(buf[:])
}
// Hash Limit
if f.Limit != nil && *f.Limit > 0 {
h.Write([]byte("limit:"))
var buf [4]byte
binary.BigEndian.PutUint32(buf[:], uint32(*f.Limit))
h.Write(buf[:])
}
// Hash Search (NIP-50)
if len(f.Search) > 0 {
h.Write([]byte("search:"))
h.Write(f.Search)
}
return hex.EncodeToString(h.Sum(nil))
}
// Len returns the number of cached queries.
func (qc *QueryCache) Len() int {
qc.mu.RLock()
defer qc.mu.RUnlock()
return len(qc.entries)
}
// MaxSize returns the maximum cache size.
func (qc *QueryCache) MaxSize() int {
return qc.maxSize
}
// Clear removes all entries from the cache.
func (qc *QueryCache) Clear() {
qc.mu.Lock()
defer qc.mu.Unlock()
qc.entries = make(map[string]*list.Element)
qc.order.Init()
}

View File

@@ -0,0 +1,182 @@
//go:build !(js && wasm)
package database
import (
"encoding/binary"
"sort"
"time"
"github.com/dgraph-io/badger/v4"
)
const (
// accessTrackingPrefix is the key prefix for access tracking records.
// Key format: acc:{8-byte serial} -> {8-byte lastAccessTime}{4-byte accessCount}
accessTrackingPrefix = "acc:"
)
// RecordEventAccess updates access tracking for an event.
// This increments the access count and updates the last access time.
// The connectionID is currently not used for deduplication in the database layer,
// but is passed for potential future use. Deduplication is handled in the
// higher-level AccessTracker which maintains an in-memory cache.
func (d *D) RecordEventAccess(serial uint64, connectionID string) error {
key := d.accessKey(serial)
return d.Update(func(txn *badger.Txn) error {
var lastAccess int64
var accessCount uint32
// Try to get existing record
item, err := txn.Get(key)
if err == nil {
err = item.Value(func(val []byte) error {
if len(val) >= 12 {
lastAccess = int64(binary.BigEndian.Uint64(val[0:8]))
accessCount = binary.BigEndian.Uint32(val[8:12])
}
return nil
})
if err != nil {
return err
}
} else if err != badger.ErrKeyNotFound {
return err
}
// Update values
_ = lastAccess // unused in simple increment mode
lastAccess = time.Now().Unix()
accessCount++
// Write back
val := make([]byte, 12)
binary.BigEndian.PutUint64(val[0:8], uint64(lastAccess))
binary.BigEndian.PutUint32(val[8:12], accessCount)
return txn.Set(key, val)
})
}
// GetEventAccessInfo returns access information for an event.
// Returns (0, 0, nil) if the event has never been accessed.
func (d *D) GetEventAccessInfo(serial uint64) (lastAccess int64, accessCount uint32, err error) {
key := d.accessKey(serial)
err = d.View(func(txn *badger.Txn) error {
item, gerr := txn.Get(key)
if gerr != nil {
if gerr == badger.ErrKeyNotFound {
// Not found is not an error - just return zeros
return nil
}
return gerr
}
return item.Value(func(val []byte) error {
if len(val) >= 12 {
lastAccess = int64(binary.BigEndian.Uint64(val[0:8]))
accessCount = binary.BigEndian.Uint32(val[8:12])
}
return nil
})
})
return
}
// accessEntry holds access metadata for sorting
type accessEntry struct {
serial uint64
lastAccess int64
count uint32
}
// GetLeastAccessedEvents returns event serials sorted by coldness.
// Events with older last access times and lower access counts are returned first.
// limit: maximum number of events to return
// minAgeSec: minimum age in seconds since last access (events accessed more recently are excluded)
func (d *D) GetLeastAccessedEvents(limit int, minAgeSec int64) (serials []uint64, err error) {
cutoffTime := time.Now().Unix() - minAgeSec
var entries []accessEntry
err = d.View(func(txn *badger.Txn) error {
prefix := []byte(accessTrackingPrefix)
opts := badger.DefaultIteratorOptions
opts.Prefix = prefix
opts.PrefetchValues = true
it := txn.NewIterator(opts)
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
key := item.Key()
// Extract serial from key (after prefix)
if len(key) <= len(prefix) {
continue
}
serial := binary.BigEndian.Uint64(key[len(prefix):])
var lastAccess int64
var accessCount uint32
err := item.Value(func(val []byte) error {
if len(val) >= 12 {
lastAccess = int64(binary.BigEndian.Uint64(val[0:8]))
accessCount = binary.BigEndian.Uint32(val[8:12])
}
return nil
})
if err != nil {
continue
}
// Only include events older than cutoff
if lastAccess < cutoffTime {
entries = append(entries, accessEntry{serial, lastAccess, accessCount})
}
}
return nil
})
if err != nil {
return nil, err
}
// Sort by coldness score (older + fewer accesses = colder = lower score)
// Score = lastAccess + (accessCount * 3600)
// Lower score = colder = evict first
sort.Slice(entries, func(i, j int) bool {
scoreI := entries[i].lastAccess + int64(entries[i].count)*3600
scoreJ := entries[j].lastAccess + int64(entries[j].count)*3600
return scoreI < scoreJ
})
// Return up to limit
for i := 0; i < len(entries) && i < limit; i++ {
serials = append(serials, entries[i].serial)
}
return serials, nil
}
// accessKey generates the database key for an access tracking record.
func (d *D) accessKey(serial uint64) []byte {
key := make([]byte, len(accessTrackingPrefix)+8)
copy(key, accessTrackingPrefix)
binary.BigEndian.PutUint64(key[len(accessTrackingPrefix):], serial)
return key
}
// DeleteAccessRecord removes the access tracking record for an event.
// This should be called when an event is deleted.
func (d *D) DeleteAccessRecord(serial uint64) error {
key := d.accessKey(serial)
return d.Update(func(txn *badger.Txn) error {
return txn.Delete(key)
})
}

View File

@@ -17,6 +17,11 @@ import (
"git.mleku.dev/mleku/nostr/utils/units"
)
// Flusher interface for HTTP streaming
type flusher interface {
Flush()
}
// Export the complete database of stored events to an io.Writer in line structured minified
// JSON. Supports both legacy and compact event formats.
func (d *D) Export(c context.Context, w io.Writer, pubkeys ...[]byte) {
@@ -24,11 +29,18 @@ func (d *D) Export(c context.Context, w io.Writer, pubkeys ...[]byte) {
evB := make([]byte, 0, units.Mb)
evBuf := bytes.NewBuffer(evB)
// Get flusher for HTTP streaming if available
var f flusher
if fl, ok := w.(flusher); ok {
f = fl
}
// Performance tracking
startTime := time.Now()
var eventCount, bytesWritten int64
lastLogTime := startTime
const logInterval = 5 * time.Second
const flushInterval = 100 // Flush every N events
log.I.F("export: starting export operation")
@@ -109,6 +121,11 @@ func (d *D) Export(c context.Context, w io.Writer, pubkeys ...[]byte) {
eventCount++
ev.Free()
// Flush periodically for HTTP streaming
if f != nil && eventCount%flushInterval == 0 {
f.Flush()
}
// Progress logging every logInterval
if time.Since(lastLogTime) >= logInterval {
elapsed := time.Since(startTime)
@@ -169,6 +186,11 @@ func (d *D) Export(c context.Context, w io.Writer, pubkeys ...[]byte) {
eventCount++
ev.Free()
// Flush periodically for HTTP streaming
if f != nil && eventCount%flushInterval == 0 {
f.Flush()
}
// Progress logging every logInterval
if time.Since(lastLogTime) >= logInterval {
elapsed := time.Since(startTime)
@@ -186,6 +208,11 @@ func (d *D) Export(c context.Context, w io.Writer, pubkeys ...[]byte) {
return
}
// Final flush
if f != nil {
f.Flush()
}
// Final export summary
elapsed := time.Since(startTime)
eventsPerSec := float64(eventCount) / elapsed.Seconds()
@@ -244,6 +271,11 @@ func (d *D) Export(c context.Context, w io.Writer, pubkeys ...[]byte) {
eventCount++
ev.Free()
// Flush periodically for HTTP streaming
if f != nil && eventCount%flushInterval == 0 {
f.Flush()
}
// Progress logging every logInterval
if time.Since(lastLogTime) >= logInterval {
elapsed := time.Since(startTime)
@@ -261,6 +293,11 @@ func (d *D) Export(c context.Context, w io.Writer, pubkeys ...[]byte) {
}
}
// Final flush
if f != nil {
f.Flush()
}
// Final export summary for pubkey export
elapsed := time.Since(startTime)
eventsPerSec := float64(eventCount) / elapsed.Seconds()

View File

@@ -104,6 +104,16 @@ type Database interface {
CacheEvents(f *filter.F, events event.S)
InvalidateQueryCache()
// Access tracking for storage management (garbage collection based on access patterns)
// RecordEventAccess records an access to an event by a connection.
// The connectionID is used to deduplicate accesses from the same connection.
RecordEventAccess(serial uint64, connectionID string) error
// GetEventAccessInfo returns the last access time and access count for an event.
GetEventAccessInfo(serial uint64) (lastAccess int64, accessCount uint32, err error)
// GetLeastAccessedEvents returns event serials sorted by coldness (oldest/lowest access).
// limit: max events to return, minAgeSec: minimum age in seconds since last access.
GetLeastAccessedEvents(limit int, minAgeSec int64) (serials []uint64, err error)
// Utility methods
EventIdsBySerial(start uint64, count int) (evs []uint64, err error)
}

View File

@@ -0,0 +1,94 @@
package neo4j
import (
"context"
"fmt"
"time"
)
// RecordEventAccess updates access tracking for an event in Neo4j.
// This creates or updates an AccessTrack node for the event.
func (n *N) RecordEventAccess(serial uint64, connectionID string) error {
cypher := `
MERGE (a:AccessTrack {serial: $serial})
ON CREATE SET a.lastAccess = $now, a.count = 1
ON MATCH SET a.lastAccess = $now, a.count = a.count + 1`
params := map[string]any{
"serial": int64(serial), // Neo4j uses int64
"now": time.Now().Unix(),
}
_, err := n.ExecuteWrite(context.Background(), cypher, params)
if err != nil {
return fmt.Errorf("failed to record event access: %w", err)
}
return nil
}
// GetEventAccessInfo returns access information for an event.
func (n *N) GetEventAccessInfo(serial uint64) (lastAccess int64, accessCount uint32, err error) {
cypher := "MATCH (a:AccessTrack {serial: $serial}) RETURN a.lastAccess AS lastAccess, a.count AS count"
params := map[string]any{"serial": int64(serial)}
result, err := n.ExecuteRead(context.Background(), cypher, params)
if err != nil {
return 0, 0, fmt.Errorf("failed to get event access info: %w", err)
}
ctx := context.Background()
if result.Next(ctx) {
record := result.Record()
if record != nil {
if la, found := record.Get("lastAccess"); found {
if v, ok := la.(int64); ok {
lastAccess = v
}
}
if c, found := record.Get("count"); found {
if v, ok := c.(int64); ok {
accessCount = uint32(v)
}
}
}
}
return lastAccess, accessCount, nil
}
// GetLeastAccessedEvents returns event serials sorted by coldness.
func (n *N) GetLeastAccessedEvents(limit int, minAgeSec int64) (serials []uint64, err error) {
cutoffTime := time.Now().Unix() - minAgeSec
cypher := `
MATCH (a:AccessTrack)
WHERE a.lastAccess < $cutoff
RETURN a.serial AS serial, a.lastAccess AS lastAccess, a.count AS count
ORDER BY (a.lastAccess + a.count * 3600) ASC
LIMIT $limit`
params := map[string]any{
"cutoff": cutoffTime,
"limit": limit,
}
result, err := n.ExecuteRead(context.Background(), cypher, params)
if err != nil {
return nil, fmt.Errorf("failed to get least accessed events: %w", err)
}
ctx := context.Background()
for result.Next(ctx) {
record := result.Record()
if record != nil {
if s, found := record.Get("serial"); found {
if v, ok := s.(int64); ok {
serials = append(serials, uint64(v))
}
}
}
}
return serials, nil
}

View File

@@ -0,0 +1,163 @@
//go:build !windows
package storage
import (
"container/list"
"context"
"sync"
"lol.mleku.dev/log"
)
// AccessTrackerDatabase defines the interface for the underlying database
// that stores access tracking information.
type AccessTrackerDatabase interface {
RecordEventAccess(serial uint64, connectionID string) error
GetEventAccessInfo(serial uint64) (lastAccess int64, accessCount uint32, err error)
GetLeastAccessedEvents(limit int, minAgeSec int64) (serials []uint64, err error)
}
// accessKey is the composite key for deduplication: serial + connectionID
type accessKey struct {
Serial uint64
ConnectionID string
}
// AccessTracker tracks event access patterns with session deduplication.
// It maintains an in-memory cache to deduplicate accesses from the same
// connection, reducing database writes while ensuring unique session counting.
type AccessTracker struct {
db AccessTrackerDatabase
// Deduplication cache: tracks which (serial, connectionID) pairs
// have already been recorded in this session window
mu sync.RWMutex
seen map[accessKey]struct{}
seenOrder *list.List // LRU order for eviction
seenElements map[accessKey]*list.Element
maxSeen int // Maximum entries in dedup cache
// Flush interval for stats
ctx context.Context
cancel context.CancelFunc
}
// NewAccessTracker creates a new access tracker.
// maxSeenEntries controls the size of the deduplication cache.
func NewAccessTracker(db AccessTrackerDatabase, maxSeenEntries int) *AccessTracker {
if maxSeenEntries <= 0 {
maxSeenEntries = 100000 // Default: 100k entries
}
ctx, cancel := context.WithCancel(context.Background())
return &AccessTracker{
db: db,
seen: make(map[accessKey]struct{}),
seenOrder: list.New(),
seenElements: make(map[accessKey]*list.Element),
maxSeen: maxSeenEntries,
ctx: ctx,
cancel: cancel,
}
}
// RecordAccess records an access to an event by a connection.
// Deduplicates accesses from the same connection within the cache window.
// Returns true if this was a new access, false if deduplicated.
func (t *AccessTracker) RecordAccess(serial uint64, connectionID string) (bool, error) {
key := accessKey{Serial: serial, ConnectionID: connectionID}
t.mu.Lock()
// Check if already seen
if _, exists := t.seen[key]; exists {
// Move to front (most recent)
if elem, ok := t.seenElements[key]; ok {
t.seenOrder.MoveToFront(elem)
}
t.mu.Unlock()
return false, nil // Deduplicated
}
// Evict oldest if at capacity
if len(t.seen) >= t.maxSeen {
oldest := t.seenOrder.Back()
if oldest != nil {
oldKey := oldest.Value.(accessKey)
delete(t.seen, oldKey)
delete(t.seenElements, oldKey)
t.seenOrder.Remove(oldest)
}
}
// Add to cache
t.seen[key] = struct{}{}
elem := t.seenOrder.PushFront(key)
t.seenElements[key] = elem
t.mu.Unlock()
// Record to database
if err := t.db.RecordEventAccess(serial, connectionID); err != nil {
return true, err
}
return true, nil
}
// GetAccessInfo returns the access information for an event.
func (t *AccessTracker) GetAccessInfo(serial uint64) (lastAccess int64, accessCount uint32, err error) {
return t.db.GetEventAccessInfo(serial)
}
// GetColdestEvents returns event serials sorted by coldness.
// limit: max events to return
// minAgeSec: minimum age in seconds since last access
func (t *AccessTracker) GetColdestEvents(limit int, minAgeSec int64) ([]uint64, error) {
return t.db.GetLeastAccessedEvents(limit, minAgeSec)
}
// ClearConnection removes all dedup entries for a specific connection.
// Call this when a connection closes to free up cache space.
func (t *AccessTracker) ClearConnection(connectionID string) {
t.mu.Lock()
defer t.mu.Unlock()
// Find and remove all entries for this connection
for key, elem := range t.seenElements {
if key.ConnectionID == connectionID {
delete(t.seen, key)
delete(t.seenElements, key)
t.seenOrder.Remove(elem)
}
}
}
// Stats returns current cache statistics.
func (t *AccessTracker) Stats() AccessTrackerStats {
t.mu.RLock()
defer t.mu.RUnlock()
return AccessTrackerStats{
CachedEntries: len(t.seen),
MaxEntries: t.maxSeen,
}
}
// AccessTrackerStats holds access tracker statistics.
type AccessTrackerStats struct {
CachedEntries int
MaxEntries int
}
// Start starts any background goroutines for the tracker.
// Currently a no-op but provided for future use.
func (t *AccessTracker) Start() {
log.I.F("access tracker started with %d max dedup entries", t.maxSeen)
}
// Stop stops the access tracker and releases resources.
func (t *AccessTracker) Stop() {
t.cancel()
log.I.F("access tracker stopped")
}

278
pkg/storage/gc.go Normal file
View File

@@ -0,0 +1,278 @@
//go:build !windows
package storage
import (
"context"
"sync"
"sync/atomic"
"time"
"lol.mleku.dev/log"
"next.orly.dev/pkg/database/indexes/types"
"git.mleku.dev/mleku/nostr/encoders/event"
)
// GCDatabase defines the interface for database operations needed by the GC.
type GCDatabase interface {
Path() string
FetchEventBySerial(ser *types.Uint40) (ev *event.E, err error)
DeleteEventBySerial(ctx context.Context, ser *types.Uint40, ev *event.E) error
}
// GarbageCollector manages continuous event eviction based on access patterns.
// It monitors storage usage and evicts the least accessed events when the
// storage limit is exceeded.
type GarbageCollector struct {
ctx context.Context
cancel context.CancelFunc
db GCDatabase
tracker *AccessTracker
// Configuration
dataDir string
maxBytes int64 // 0 = auto-calculate
interval time.Duration
batchSize int
minAgeSec int64 // Minimum age before considering for eviction
// State
mu sync.Mutex
running bool
evictedCount uint64
lastRun time.Time
}
// GCConfig holds configuration for the garbage collector.
type GCConfig struct {
MaxStorageBytes int64 // 0 = auto-calculate (80% of filesystem)
Interval time.Duration // How often to check storage
BatchSize int // Events to consider per GC run
MinAgeSec int64 // Minimum age before eviction (default: 1 hour)
}
// DefaultGCConfig returns a default GC configuration.
func DefaultGCConfig() GCConfig {
return GCConfig{
MaxStorageBytes: 0, // Auto-detect
Interval: time.Minute, // Check every minute
BatchSize: 1000, // 1000 events per run
MinAgeSec: 3600, // 1 hour minimum age
}
}
// NewGarbageCollector creates a new garbage collector.
func NewGarbageCollector(
ctx context.Context,
db GCDatabase,
tracker *AccessTracker,
cfg GCConfig,
) *GarbageCollector {
gcCtx, cancel := context.WithCancel(ctx)
if cfg.BatchSize <= 0 {
cfg.BatchSize = 1000
}
if cfg.Interval <= 0 {
cfg.Interval = time.Minute
}
if cfg.MinAgeSec <= 0 {
cfg.MinAgeSec = 3600 // 1 hour
}
return &GarbageCollector{
ctx: gcCtx,
cancel: cancel,
db: db,
tracker: tracker,
dataDir: db.Path(),
maxBytes: cfg.MaxStorageBytes,
interval: cfg.Interval,
batchSize: cfg.BatchSize,
minAgeSec: cfg.MinAgeSec,
}
}
// Start begins the garbage collection loop.
func (gc *GarbageCollector) Start() {
gc.mu.Lock()
if gc.running {
gc.mu.Unlock()
return
}
gc.running = true
gc.mu.Unlock()
go gc.runLoop()
log.I.F("garbage collector started (interval: %s, batch: %d)", gc.interval, gc.batchSize)
}
// Stop stops the garbage collector.
func (gc *GarbageCollector) Stop() {
gc.cancel()
gc.mu.Lock()
gc.running = false
gc.mu.Unlock()
log.I.F("garbage collector stopped (total evicted: %d)", atomic.LoadUint64(&gc.evictedCount))
}
// runLoop is the main GC loop.
func (gc *GarbageCollector) runLoop() {
ticker := time.NewTicker(gc.interval)
defer ticker.Stop()
for {
select {
case <-gc.ctx.Done():
return
case <-ticker.C:
if err := gc.runCycle(); err != nil {
log.W.F("GC cycle error: %v", err)
}
}
}
}
// runCycle executes one garbage collection cycle.
func (gc *GarbageCollector) runCycle() error {
gc.mu.Lock()
gc.lastRun = time.Now()
gc.mu.Unlock()
// Check if we need to run GC
shouldRun, currentBytes, maxBytes, err := gc.shouldRunGC()
if err != nil {
return err
}
if !shouldRun {
return nil
}
log.D.F("GC triggered: current=%d MB, max=%d MB (%.1f%%)",
currentBytes/(1024*1024),
maxBytes/(1024*1024),
float64(currentBytes)/float64(maxBytes)*100)
// Get coldest events
serials, err := gc.tracker.GetColdestEvents(gc.batchSize, gc.minAgeSec)
if err != nil {
return err
}
if len(serials) == 0 {
log.D.F("GC: no events eligible for eviction")
return nil
}
// Evict events
evicted, err := gc.evictEvents(serials)
if err != nil {
return err
}
atomic.AddUint64(&gc.evictedCount, uint64(evicted))
log.I.F("GC: evicted %d events (total: %d)", evicted, atomic.LoadUint64(&gc.evictedCount))
return nil
}
// shouldRunGC checks if storage limit is exceeded.
func (gc *GarbageCollector) shouldRunGC() (bool, int64, int64, error) {
// Calculate max storage (dynamic based on filesystem)
maxBytes, err := CalculateMaxStorage(gc.dataDir, gc.maxBytes)
if err != nil {
return false, 0, 0, err
}
// Get current usage
currentBytes, err := GetCurrentStorageUsage(gc.dataDir)
if err != nil {
return false, 0, 0, err
}
return currentBytes > maxBytes, currentBytes, maxBytes, nil
}
// evictEvents evicts the specified events from the database.
func (gc *GarbageCollector) evictEvents(serials []uint64) (int, error) {
evicted := 0
for _, serial := range serials {
// Check context for cancellation
select {
case <-gc.ctx.Done():
return evicted, gc.ctx.Err()
default:
}
// Convert serial to Uint40
ser := &types.Uint40{}
if err := ser.Set(serial); err != nil {
log.D.F("GC: invalid serial %d: %v", serial, err)
continue
}
// Fetch the event
ev, err := gc.db.FetchEventBySerial(ser)
if err != nil {
log.D.F("GC: failed to fetch event %d: %v", serial, err)
continue
}
if ev == nil {
continue // Already deleted
}
// Delete the event
if err := gc.db.DeleteEventBySerial(gc.ctx, ser, ev); err != nil {
log.D.F("GC: failed to delete event %d: %v", serial, err)
continue
}
evicted++
// Rate limit to avoid overwhelming the database
if evicted%100 == 0 {
time.Sleep(10 * time.Millisecond)
}
}
return evicted, nil
}
// Stats returns current GC statistics.
func (gc *GarbageCollector) Stats() GCStats {
gc.mu.Lock()
lastRun := gc.lastRun
running := gc.running
gc.mu.Unlock()
// Get storage info
currentBytes, _ := GetCurrentStorageUsage(gc.dataDir)
maxBytes, _ := CalculateMaxStorage(gc.dataDir, gc.maxBytes)
var percentage float64
if maxBytes > 0 {
percentage = float64(currentBytes) / float64(maxBytes) * 100
}
return GCStats{
Running: running,
LastRunTime: lastRun,
TotalEvicted: atomic.LoadUint64(&gc.evictedCount),
CurrentStorageBytes: currentBytes,
MaxStorageBytes: maxBytes,
StoragePercentage: percentage,
}
}
// GCStats holds garbage collector statistics.
type GCStats struct {
Running bool
LastRunTime time.Time
TotalEvicted uint64
CurrentStorageBytes int64
MaxStorageBytes int64
StoragePercentage float64
}

65
pkg/storage/limits.go Normal file
View File

@@ -0,0 +1,65 @@
//go:build !windows
// Package storage provides storage management functionality including filesystem
// space detection, access tracking for events, and garbage collection based on
// access patterns.
package storage
import (
"syscall"
)
// FilesystemStats holds information about filesystem space usage.
type FilesystemStats struct {
Total uint64 // Total bytes on filesystem
Available uint64 // Available bytes (for unprivileged users)
Used uint64 // Used bytes
}
// GetFilesystemStats returns filesystem space information for the given path.
// The path should be a directory within the filesystem to check.
func GetFilesystemStats(path string) (stats FilesystemStats, err error) {
var stat syscall.Statfs_t
if err = syscall.Statfs(path, &stat); err != nil {
return
}
stats.Total = stat.Blocks * uint64(stat.Bsize)
stats.Available = stat.Bavail * uint64(stat.Bsize)
stats.Used = stats.Total - stats.Available
return
}
// CalculateMaxStorage calculates the maximum storage limit for the relay.
// If configuredMax > 0, it returns that value directly.
// Otherwise, it returns 80% of the available filesystem space.
func CalculateMaxStorage(dataDir string, configuredMax int64) (int64, error) {
if configuredMax > 0 {
return configuredMax, nil
}
stats, err := GetFilesystemStats(dataDir)
if err != nil {
return 0, err
}
// Return 80% of available space
maxBytes := int64(float64(stats.Available) * 0.8)
// Also ensure we don't exceed 80% of total filesystem
maxTotal := int64(float64(stats.Total) * 0.8)
if maxBytes > maxTotal {
maxBytes = maxTotal
}
return maxBytes, nil
}
// GetCurrentStorageUsage calculates the current storage usage of the data directory.
// This is an approximation based on filesystem stats for the given path.
func GetCurrentStorageUsage(dataDir string) (int64, error) {
stats, err := GetFilesystemStats(dataDir)
if err != nil {
return 0, err
}
return int64(stats.Used), err
}

View File

@@ -0,0 +1,38 @@
//go:build windows
package storage
import (
"errors"
)
// FilesystemStats holds information about filesystem space usage.
type FilesystemStats struct {
Total uint64 // Total bytes on filesystem
Available uint64 // Available bytes (for unprivileged users)
Used uint64 // Used bytes
}
// GetFilesystemStats returns filesystem space information for the given path.
// Windows implementation is not yet supported.
func GetFilesystemStats(path string) (stats FilesystemStats, err error) {
// TODO: Implement using syscall.GetDiskFreeSpaceEx
err = errors.New("filesystem stats not implemented on Windows")
return
}
// CalculateMaxStorage calculates the maximum storage limit for the relay.
// If configuredMax > 0, it returns that value directly.
// Windows auto-detection is not yet supported.
func CalculateMaxStorage(dataDir string, configuredMax int64) (int64, error) {
if configuredMax > 0 {
return configuredMax, nil
}
return 0, errors.New("auto-detect storage limit not implemented on Windows; set ORLY_MAX_STORAGE_BYTES manually")
}
// GetCurrentStorageUsage calculates the current storage usage of the data directory.
// Windows implementation is not yet supported.
func GetCurrentStorageUsage(dataDir string) (int64, error) {
return 0, errors.New("storage usage detection not implemented on Windows")
}

116
pkg/tor/hostname.go Normal file
View File

@@ -0,0 +1,116 @@
package tor
import (
"os"
"path/filepath"
"strings"
"sync"
"time"
"lol.mleku.dev/chk"
"lol.mleku.dev/log"
)
// HostnameWatcher watches the Tor hidden service hostname file for changes.
// When Tor creates or updates a hidden service, it writes the .onion address
// to a file called "hostname" in the HiddenServiceDir.
type HostnameWatcher struct {
hsDir string
address string
onChange func(string)
stopCh chan struct{}
mu sync.RWMutex
}
// NewHostnameWatcher creates a new hostname watcher for the given HiddenServiceDir.
func NewHostnameWatcher(hsDir string) *HostnameWatcher {
return &HostnameWatcher{
hsDir: hsDir,
stopCh: make(chan struct{}),
}
}
// OnChange sets a callback function to be called when the hostname changes.
func (w *HostnameWatcher) OnChange(fn func(string)) {
w.mu.Lock()
w.onChange = fn
w.mu.Unlock()
}
// Start begins watching the hostname file.
func (w *HostnameWatcher) Start() error {
// Try to read immediately
if err := w.readHostname(); err != nil {
log.D.F("hostname file not yet available: %v", err)
}
// Start polling goroutine
go w.poll()
return nil
}
// Stop stops the hostname watcher.
func (w *HostnameWatcher) Stop() {
close(w.stopCh)
}
// Address returns the current .onion address.
func (w *HostnameWatcher) Address() string {
w.mu.RLock()
defer w.mu.RUnlock()
return w.address
}
// poll periodically checks the hostname file for changes.
func (w *HostnameWatcher) poll() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-w.stopCh:
return
case <-ticker.C:
if err := w.readHostname(); err != nil {
// Only log at trace level to avoid spam
log.T.F("hostname read: %v", err)
}
}
}
}
// readHostname reads the hostname file and updates the address if changed.
func (w *HostnameWatcher) readHostname() error {
path := filepath.Join(w.hsDir, "hostname")
data, err := os.ReadFile(path)
if chk.T(err) {
return err
}
// Parse the address (file contains "xyz.onion\n")
addr := strings.TrimSpace(string(data))
if addr == "" {
return nil
}
w.mu.Lock()
oldAddr := w.address
w.address = addr
onChange := w.onChange
w.mu.Unlock()
// Call callback if address changed
if addr != oldAddr && onChange != nil {
onChange(addr)
}
return nil
}
// HostnameFilePath returns the path to the hostname file.
func (w *HostnameWatcher) HostnameFilePath() string {
return filepath.Join(w.hsDir, "hostname")
}

358
pkg/tor/service.go Normal file
View File

@@ -0,0 +1,358 @@
// Package tor provides Tor hidden service integration for the ORLY relay.
// It spawns a tor subprocess with automatic configuration and manages
// the hidden service lifecycle.
package tor
import (
"bufio"
"context"
"fmt"
"io"
"net"
"net/http"
"os"
"os/exec"
"path/filepath"
"strings"
"sync"
"time"
"github.com/gorilla/websocket"
"lol.mleku.dev/chk"
"lol.mleku.dev/log"
)
// Config holds Tor subprocess configuration.
type Config struct {
// Port is the internal port for the hidden service
Port int
// DataDir is the directory for Tor data (torrc, keys, hostname, etc.)
DataDir string
// Binary is the path to the tor executable
Binary string
// SOCKSPort is the port for outbound SOCKS connections (0 = disabled)
SOCKSPort int
// Handler is the HTTP handler to serve (typically the main relay handler)
Handler http.Handler
}
// Service manages the Tor subprocess and hidden service listener.
type Service struct {
cfg *Config
listener net.Listener
server *http.Server
// Tor subprocess
cmd *exec.Cmd
stdout io.ReadCloser
stderr io.ReadCloser
// onionAddress is the detected .onion address
onionAddress string
// hostname watcher
hostnameWatcher *HostnameWatcher
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
mu sync.RWMutex
}
// New creates a new Tor service with the given configuration.
// Returns an error if the tor binary is not found.
func New(cfg *Config) (*Service, error) {
if cfg.Port == 0 {
cfg.Port = 3336
}
// Find tor binary
binary := cfg.Binary
if binary == "" {
binary = "tor"
}
torPath, err := exec.LookPath(binary)
if err != nil {
return nil, fmt.Errorf("tor binary not found: %w (install tor or set ORLY_TOR_ENABLED=false)", err)
}
cfg.Binary = torPath
// Ensure data directory exists
if err := os.MkdirAll(cfg.DataDir, 0700); err != nil {
return nil, fmt.Errorf("failed to create Tor data directory: %w", err)
}
ctx, cancel := context.WithCancel(context.Background())
s := &Service{
cfg: cfg,
ctx: ctx,
cancel: cancel,
}
return s, nil
}
// generateTorrc creates the torrc configuration file.
func (s *Service) generateTorrc() (string, error) {
torrcPath := filepath.Join(s.cfg.DataDir, "torrc")
hsDir := filepath.Join(s.cfg.DataDir, "hidden_service")
// Ensure hidden service directory exists with correct permissions
if err := os.MkdirAll(hsDir, 0700); err != nil {
return "", fmt.Errorf("failed to create hidden service directory: %w", err)
}
var sb strings.Builder
sb.WriteString("# ORLY Tor hidden service configuration\n")
sb.WriteString("# Auto-generated - do not edit\n\n")
// Data directory
sb.WriteString(fmt.Sprintf("DataDirectory %s/data\n", s.cfg.DataDir))
// Hidden service configuration
sb.WriteString(fmt.Sprintf("HiddenServiceDir %s\n", hsDir))
sb.WriteString(fmt.Sprintf("HiddenServicePort 80 127.0.0.1:%d\n", s.cfg.Port))
// Optional SOCKS port for outbound connections
if s.cfg.SOCKSPort > 0 {
sb.WriteString(fmt.Sprintf("SocksPort %d\n", s.cfg.SOCKSPort))
} else {
sb.WriteString("SocksPort 0\n")
}
// Disable unused features to reduce resource usage
sb.WriteString("ControlPort 0\n")
sb.WriteString("Log notice stdout\n")
// Write torrc
if err := os.WriteFile(torrcPath, []byte(sb.String()), 0600); err != nil {
return "", fmt.Errorf("failed to write torrc: %w", err)
}
// Create data subdirectory
if err := os.MkdirAll(filepath.Join(s.cfg.DataDir, "data"), 0700); err != nil {
return "", fmt.Errorf("failed to create Tor data subdirectory: %w", err)
}
return torrcPath, nil
}
// Start spawns the Tor subprocess and initializes the listener.
func (s *Service) Start() error {
// Generate torrc
torrcPath, err := s.generateTorrc()
if err != nil {
return err
}
log.I.F("starting Tor subprocess with config: %s", torrcPath)
// Start tor subprocess
s.cmd = exec.CommandContext(s.ctx, s.cfg.Binary, "-f", torrcPath)
// Capture stdout/stderr for logging
s.stdout, err = s.cmd.StdoutPipe()
if err != nil {
return fmt.Errorf("failed to get Tor stdout: %w", err)
}
s.stderr, err = s.cmd.StderrPipe()
if err != nil {
return fmt.Errorf("failed to get Tor stderr: %w", err)
}
if err := s.cmd.Start(); err != nil {
return fmt.Errorf("failed to start Tor: %w", err)
}
log.I.F("Tor subprocess started (PID %d)", s.cmd.Process.Pid)
// Log Tor output
s.wg.Add(2)
go s.logOutput("tor", s.stdout)
go s.logOutput("tor", s.stderr)
// Monitor subprocess
s.wg.Add(1)
go s.monitorProcess()
// Start hostname watcher
hsDir := filepath.Join(s.cfg.DataDir, "hidden_service")
s.hostnameWatcher = NewHostnameWatcher(hsDir)
s.hostnameWatcher.OnChange(func(addr string) {
s.mu.Lock()
s.onionAddress = addr
s.mu.Unlock()
log.I.F("Tor hidden service address: %s", addr)
})
if err := s.hostnameWatcher.Start(); err != nil {
log.W.F("failed to start hostname watcher: %v", err)
} else {
// Get initial address
if addr := s.hostnameWatcher.Address(); addr != "" {
s.mu.Lock()
s.onionAddress = addr
s.mu.Unlock()
}
}
// Create listener for the hidden service port
addr := fmt.Sprintf("127.0.0.1:%d", s.cfg.Port)
s.listener, err = net.Listen("tcp", addr)
if chk.E(err) {
s.Stop()
return fmt.Errorf("failed to listen on %s: %w", addr, err)
}
// Create HTTP server with the provided handler
s.server = &http.Server{
Handler: s.cfg.Handler,
ReadTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second,
IdleTimeout: 120 * time.Second,
}
// Start serving
s.wg.Add(1)
go func() {
defer s.wg.Done()
log.I.F("Tor hidden service listener started on %s", addr)
if err := s.server.Serve(s.listener); err != nil && err != http.ErrServerClosed {
log.E.F("Tor server error: %v", err)
}
}()
return nil
}
// logOutput reads from a pipe and logs each line.
func (s *Service) logOutput(prefix string, r io.ReadCloser) {
defer s.wg.Done()
scanner := bufio.NewScanner(r)
for scanner.Scan() {
line := scanner.Text()
// Filter out common noise
if strings.Contains(line, "Bootstrapped") {
log.I.F("[%s] %s", prefix, line)
} else if strings.Contains(line, "[warn]") || strings.Contains(line, "[err]") {
log.W.F("[%s] %s", prefix, line)
} else {
log.D.F("[%s] %s", prefix, line)
}
}
}
// monitorProcess watches the Tor subprocess and logs when it exits.
func (s *Service) monitorProcess() {
defer s.wg.Done()
err := s.cmd.Wait()
if err != nil {
select {
case <-s.ctx.Done():
// Expected shutdown
log.D.F("Tor subprocess exited (shutdown)")
default:
log.E.F("Tor subprocess exited unexpectedly: %v", err)
}
} else {
log.I.F("Tor subprocess exited cleanly")
}
}
// Stop gracefully shuts down the Tor service.
func (s *Service) Stop() error {
s.cancel()
// Stop hostname watcher
if s.hostnameWatcher != nil {
s.hostnameWatcher.Stop()
}
// Shutdown HTTP server
if s.server != nil {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := s.server.Shutdown(ctx); chk.E(err) {
// Continue shutdown anyway
}
}
// Close listener
if s.listener != nil {
s.listener.Close()
}
// Terminate Tor subprocess
if s.cmd != nil && s.cmd.Process != nil {
log.D.F("sending SIGTERM to Tor subprocess (PID %d)", s.cmd.Process.Pid)
s.cmd.Process.Signal(os.Interrupt)
// Give it a few seconds to exit gracefully
done := make(chan struct{})
go func() {
s.cmd.Wait()
close(done)
}()
select {
case <-done:
log.D.F("Tor subprocess exited gracefully")
case <-time.After(5 * time.Second):
log.W.F("Tor subprocess did not exit, killing")
s.cmd.Process.Kill()
}
}
s.wg.Wait()
log.I.F("Tor service stopped")
return nil
}
// OnionAddress returns the current .onion address.
func (s *Service) OnionAddress() string {
s.mu.RLock()
defer s.mu.RUnlock()
return s.onionAddress
}
// OnionWSAddress returns the full WebSocket URL for the hidden service.
// Format: ws://<address>.onion/
func (s *Service) OnionWSAddress() string {
addr := s.OnionAddress()
if addr == "" {
return ""
}
// Ensure address ends with .onion
if len(addr) >= 6 && addr[len(addr)-6:] != ".onion" {
addr = addr + ".onion"
}
return "ws://" + addr + "/"
}
// IsRunning returns whether the Tor service is currently running.
func (s *Service) IsRunning() bool {
return s.listener != nil && s.cmd != nil && s.cmd.Process != nil
}
// Upgrader returns a WebSocket upgrader configured for Tor connections.
// Tor connections don't send Origin headers, so we skip origin check.
func (s *Service) Upgrader() *websocket.Upgrader {
return &websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
return true // Allow all origins for Tor
},
}
}
// DataDir returns the Tor data directory path.
func (s *Service) DataDir() string {
return s.cfg.DataDir
}
// HiddenServiceDir returns the hidden service directory path.
func (s *Service) HiddenServiceDir() string {
return filepath.Join(s.cfg.DataDir, "hidden_service")
}

View File

@@ -1 +1 @@
v0.44.7
v0.46.2

View File

@@ -0,0 +1,24 @@
//go:build js && wasm
package wasmdb
// RecordEventAccess is a stub for WasmDB.
// Access tracking is not implemented for the WebAssembly backend as it's
// primarily used for client-side applications where storage management
// is handled differently.
func (w *W) RecordEventAccess(serial uint64, connectionID string) error {
// No-op for WasmDB
return nil
}
// GetEventAccessInfo is a stub for WasmDB.
func (w *W) GetEventAccessInfo(serial uint64) (lastAccess int64, accessCount uint32, err error) {
// No-op for WasmDB - return zeros
return 0, 0, nil
}
// GetLeastAccessedEvents is a stub for WasmDB.
func (w *W) GetLeastAccessedEvents(limit int, minAgeSec int64) (serials []uint64, err error) {
// No-op for WasmDB - return empty slice
return nil, nil
}

54
scripts/enable-archive.sh Executable file
View File

@@ -0,0 +1,54 @@
#!/bin/bash
# Enable archive relay features on mleku.dev
set -e
SERVICE_FILE="/etc/systemd/system/orly.service"
echo "Updating orly.service to enable archive features..."
sudo tee "$SERVICE_FILE" > /dev/null << 'EOF'
[Unit]
Description=ORLY Nostr Relay
After=network.target
Wants=network.target
[Service]
Type=simple
User=mleku
Group=mleku
WorkingDirectory=/home/mleku/src/next.orly.dev
ExecStart=/home/mleku/.local/bin/orly
Restart=always
RestartSec=5
StandardOutput=journal
StandardError=journal
SyslogIdentifier=orly
# Archive relay query augmentation
Environment=ORLY_ARCHIVE_ENABLED=true
Environment=ORLY_ARCHIVE_RELAYS=wss://archive.orly.dev/
# Network settings
AmbientCapabilities=CAP_NET_BIND_SERVICE
[Install]
WantedBy=multi-user.target
EOF
echo "Reloading systemd..."
sudo systemctl daemon-reload
echo "Building and installing new version..."
cd /home/mleku/src/next.orly.dev
CGO_ENABLED=0 go build -o orly
sudo setcap 'cap_net_bind_service=+ep' ./orly
cp ./orly ~/.local/bin/
echo "Restarting orly service..."
sudo systemctl restart orly
echo "Done! Archive features are now enabled."
echo ""
echo "Check status with: sudo systemctl status orly"
echo "View logs with: sudo journalctl -u orly -f"