Compare commits

...

3 Commits

Author SHA1 Message Date
woikos
a0af5bb45e Fix Neo4j query returning zero events for REQ filters (v0.49.1)
Some checks failed
Go / build-and-release (push) Failing after 29s
- Fix zero-value timestamp filter bug: since/until with value 0 were
  being added as WHERE clauses, causing queries to match no events
- Fix event parsing: use direct slice assignment instead of copy() on
  nil slices for ID, Pubkey, and Sig fields

Files modified:
- pkg/neo4j/query-events.go: Fix buildCypherQuery and parseEventsFromResult

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-14 10:56:31 +01:00
woikos
9da1784b1b Add Blossom bandwidth limiting and tune rate limiters (v0.49.0)
Some checks failed
Go / build-and-release (push) Has been cancelled
- Add token-bucket bandwidth rate limiting for Blossom uploads
  - ORLY_BLOSSOM_RATE_LIMIT enables limiting (default: false)
  - ORLY_BLOSSOM_DAILY_LIMIT_MB sets daily limit (default: 10MB)
  - ORLY_BLOSSOM_BURST_LIMIT_MB sets burst cap (default: 50MB)
  - Followed users, admins, owners are exempt (unlimited)
- Change emergency mode throttling from exponential to linear scaling
  - Old: 16x multiplier at emergency threshold entry
  - New: 1x at threshold, +1x per 20% excess pressure
- Reduce follows ACL throttle increment from 200ms to 25ms per event
- Update dependencies

Files modified:
- app/blossom.go: Pass rate limit config to blossom server
- app/config/config.go: Add Blossom rate limit config options
- pkg/blossom/ratelimit.go: New bandwidth limiter implementation
- pkg/blossom/server.go: Add rate limiter integration
- pkg/blossom/handlers.go: Check rate limits on upload/mirror/media
- pkg/ratelimit/limiter.go: Linear emergency throttling
- pkg/acl/follows.go: Reduce default throttle increment

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-14 08:09:39 +01:00
woikos
205f23fc0c Add message segmentation to NRC protocol spec (v0.48.15)
Some checks failed
Go / build-and-release (push) Has been cancelled
- Add CHUNK response type for large payload handling
- Document chunking threshold (40KB) accounting for encryption overhead
- Specify chunk message format with messageId, index, total, data fields
- Add sender chunking process with Base64 encoding steps
- Add receiver reassembly process with buffer management
- Document 60-second timeout for incomplete chunk buffers
- Update client/bridge implementation notes with chunking requirements
- Add Smesh as reference implementation for client-side chunking

Files modified:
- docs/NIP-NRC.md: Added Message Segmentation section and updated impl notes
- pkg/version/version: v0.48.14 -> v0.48.15

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-11 11:29:31 +01:00
19 changed files with 418 additions and 54 deletions

View File

@@ -22,6 +22,10 @@ func initializeBlossomServer(
MaxBlobSize: 100 * 1024 * 1024, // 100MB default MaxBlobSize: 100 * 1024 * 1024, // 100MB default
AllowedMimeTypes: nil, // Allow all MIME types by default AllowedMimeTypes: nil, // Allow all MIME types by default
RequireAuth: cfg.AuthRequired || cfg.AuthToWrite, RequireAuth: cfg.AuthRequired || cfg.AuthToWrite,
// Rate limiting for non-followed users
RateLimitEnabled: cfg.BlossomRateLimitEnabled,
DailyLimitMB: cfg.BlossomDailyLimitMB,
BurstLimitMB: cfg.BlossomBurstLimitMB,
} }
// Create blossom server with relay's ACL registry // Create blossom server with relay's ACL registry
@@ -31,7 +35,12 @@ func initializeBlossomServer(
// We'll need to modify the handler to inject the baseURL per request // We'll need to modify the handler to inject the baseURL per request
// For now, we'll use a middleware approach // For now, we'll use a middleware approach
if cfg.BlossomRateLimitEnabled {
log.I.F("blossom server initialized with ACL mode: %s, rate limit: %dMB/day (burst: %dMB)",
cfg.ACLMode, cfg.BlossomDailyLimitMB, cfg.BlossomBurstLimitMB)
} else {
log.I.F("blossom server initialized with ACL mode: %s", cfg.ACLMode) log.I.F("blossom server initialized with ACL mode: %s", cfg.ACLMode)
}
return bs, nil return bs, nil
} }

View File

@@ -69,13 +69,18 @@ type C struct {
// Progressive throttle for follows ACL mode - allows non-followed users to write with increasing delay // Progressive throttle for follows ACL mode - allows non-followed users to write with increasing delay
FollowsThrottleEnabled bool `env:"ORLY_FOLLOWS_THROTTLE" default:"false" usage:"enable progressive delay for non-followed users in follows ACL mode"` FollowsThrottleEnabled bool `env:"ORLY_FOLLOWS_THROTTLE" default:"false" usage:"enable progressive delay for non-followed users in follows ACL mode"`
FollowsThrottlePerEvent time.Duration `env:"ORLY_FOLLOWS_THROTTLE_INCREMENT" default:"200ms" usage:"delay added per event for non-followed users"` FollowsThrottlePerEvent time.Duration `env:"ORLY_FOLLOWS_THROTTLE_INCREMENT" default:"25ms" usage:"delay added per event for non-followed users"`
FollowsThrottleMaxDelay time.Duration `env:"ORLY_FOLLOWS_THROTTLE_MAX" default:"60s" usage:"maximum throttle delay cap"` FollowsThrottleMaxDelay time.Duration `env:"ORLY_FOLLOWS_THROTTLE_MAX" default:"60s" usage:"maximum throttle delay cap"`
// Blossom blob storage service settings // Blossom blob storage service settings
BlossomEnabled bool `env:"ORLY_BLOSSOM_ENABLED" default:"true" usage:"enable Blossom blob storage server (only works with Badger backend)"` BlossomEnabled bool `env:"ORLY_BLOSSOM_ENABLED" default:"true" usage:"enable Blossom blob storage server (only works with Badger backend)"`
BlossomServiceLevels string `env:"ORLY_BLOSSOM_SERVICE_LEVELS" usage:"comma-separated list of service levels in format: name:storage_mb_per_sat_per_month (e.g., basic:1,premium:10)"` BlossomServiceLevels string `env:"ORLY_BLOSSOM_SERVICE_LEVELS" usage:"comma-separated list of service levels in format: name:storage_mb_per_sat_per_month (e.g., basic:1,premium:10)"`
// Blossom upload rate limiting (for non-followed users)
BlossomRateLimitEnabled bool `env:"ORLY_BLOSSOM_RATE_LIMIT" default:"false" usage:"enable upload rate limiting for non-followed users"`
BlossomDailyLimitMB int64 `env:"ORLY_BLOSSOM_DAILY_LIMIT_MB" default:"10" usage:"daily upload limit in MB for non-followed users (EMA averaged)"`
BlossomBurstLimitMB int64 `env:"ORLY_BLOSSOM_BURST_LIMIT_MB" default:"50" usage:"max burst upload in MB (bucket cap)"`
// Web UI and dev mode settings // Web UI and dev mode settings
WebDisableEmbedded bool `env:"ORLY_WEB_DISABLE" default:"false" usage:"disable serving the embedded web UI; useful for hot-reload during development"` WebDisableEmbedded bool `env:"ORLY_WEB_DISABLE" default:"false" usage:"disable serving the embedded web UI; useful for hot-reload during development"`
WebDevProxyURL string `env:"ORLY_WEB_DEV_PROXY_URL" usage:"when ORLY_WEB_DISABLE is true, reverse-proxy non-API paths to this dev server URL (e.g. http://localhost:5173)"` WebDevProxyURL string `env:"ORLY_WEB_DEV_PROXY_URL" usage:"when ORLY_WEB_DISABLE is true, reverse-proxy non-API paths to this dev server URL (e.g. http://localhost:5173)"`

View File

@@ -124,6 +124,17 @@ func (s *Server) handleCashuKeysets(w http.ResponseWriter, r *http.Request) {
// handleCashuInfo handles GET /cashu/info - returns mint information. // handleCashuInfo handles GET /cashu/info - returns mint information.
func (s *Server) handleCashuInfo(w http.ResponseWriter, r *http.Request) { func (s *Server) handleCashuInfo(w http.ResponseWriter, r *http.Request) {
// CORS headers for browser-based CAT support detection
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "GET, OPTIONS")
w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Accept")
// Handle preflight
if r.Method == http.MethodOptions {
w.WriteHeader(http.StatusNoContent)
return
}
if s.CashuIssuer == nil { if s.CashuIssuer == nil {
http.Error(w, "Cashu tokens not enabled", http.StatusNotImplemented) http.Error(w, "Cashu tokens not enabled", http.StatusNotImplemented)
return return

View File

@@ -21,7 +21,7 @@ import (
) )
func (l *Listener) HandleEvent(msg []byte) (err error) { func (l *Listener) HandleEvent(msg []byte) (err error) {
log.D.F("HandleEvent: START handling event: %s", msg) log.I.F("HandleEvent: START handling event: %s", string(msg[:min(200, len(msg))]))
// 1. Raw JSON validation (before unmarshal) - use validation service // 1. Raw JSON validation (before unmarshal) - use validation service
if result := l.eventValidator.ValidateRawJSON(msg); !result.Valid { if result := l.eventValidator.ValidateRawJSON(msg); !result.Valid {
@@ -231,6 +231,11 @@ func (l *Listener) HandleEvent(msg []byte) (err error) {
// Authorization check (policy + ACL) - use authorization service // Authorization check (policy + ACL) - use authorization service
decision := l.eventAuthorizer.Authorize(env.E, l.authedPubkey.Load(), l.remote, env.E.Kind) decision := l.eventAuthorizer.Authorize(env.E, l.authedPubkey.Load(), l.remote, env.E.Kind)
// Debug: log ephemeral event authorization
if env.E.Kind >= 20000 && env.E.Kind < 30000 {
log.I.F("ephemeral auth check: kind %d, allowed=%v, reason=%s",
env.E.Kind, decision.Allowed, decision.DenyReason)
}
if !decision.Allowed { if !decision.Allowed {
log.D.F("HandleEvent: authorization denied: %s (requireAuth=%v)", decision.DenyReason, decision.RequireAuth) log.D.F("HandleEvent: authorization denied: %s (requireAuth=%v)", decision.DenyReason, decision.RequireAuth)
if decision.RequireAuth { if decision.RequireAuth {
@@ -256,6 +261,8 @@ func (l *Listener) HandleEvent(msg []byte) (err error) {
log.I.F("HandleEvent: authorized with access level %s", decision.AccessLevel) log.I.F("HandleEvent: authorized with access level %s", decision.AccessLevel)
// Progressive throttle for follows ACL mode (delays non-followed users) // Progressive throttle for follows ACL mode (delays non-followed users)
// Skip throttle if a Cashu Access Token is present (authenticated via CAT)
if l.cashuToken == nil {
if delay := l.getFollowsThrottleDelay(env.E); delay > 0 { if delay := l.getFollowsThrottleDelay(env.E); delay > 0 {
log.D.F("HandleEvent: applying progressive throttle delay of %v for %0x from %s", log.D.F("HandleEvent: applying progressive throttle delay of %v for %0x from %s",
delay, env.E.Pubkey, l.remote) delay, env.E.Pubkey, l.remote)
@@ -266,6 +273,7 @@ func (l *Listener) HandleEvent(msg []byte) (err error) {
// Delay completed, continue processing // Delay completed, continue processing
} }
} }
}
// Route special event kinds (ephemeral, etc.) - use routing service // Route special event kinds (ephemeral, etc.) - use routing service
if routeResult := l.eventRouter.Route(env.E, l.authedPubkey.Load()); routeResult.Action != routing.Continue { if routeResult := l.eventRouter.Route(env.E, l.authedPubkey.Load()); routeResult.Action != routing.Continue {

View File

@@ -159,12 +159,26 @@ func (p *P) Deliver(ev *event.E) {
sub Subscription sub Subscription
} }
var deliveries []delivery var deliveries []delivery
// Debug: log ephemeral event delivery attempts
isEphemeral := ev.Kind >= 20000 && ev.Kind < 30000
if isEphemeral {
var tagInfo string
if ev.Tags != nil {
tagInfo = string(ev.Tags.Marshal(nil))
}
log.I.F("ephemeral event kind %d, id %0x, checking %d connections for matches, tags: %s",
ev.Kind, ev.ID[:8], len(p.Map), tagInfo)
}
for w, subs := range p.Map { for w, subs := range p.Map {
for id, subscriber := range subs { for id, subscriber := range subs {
if subscriber.Match(ev) { if subscriber.Match(ev) {
deliveries = append( deliveries = append(
deliveries, delivery{w: w, id: id, sub: subscriber}, deliveries, delivery{w: w, id: id, sub: subscriber},
) )
} else if isEphemeral {
// Debug: log why ephemeral events don't match
log.I.F("ephemeral event kind %d did NOT match subscription %s (filters: %s)",
ev.Kind, id, string(subscriber.S.Marshal(nil)))
} }
} }
} }

View File

@@ -137,7 +137,7 @@ Where `payload` is the standard Nostr message array, e.g.:
The encrypted content structure: The encrypted content structure:
```json ```json
{ {
"type": "EVENT" | "OK" | "EOSE" | "NOTICE" | "CLOSED" | "COUNT" | "AUTH", "type": "EVENT" | "OK" | "EOSE" | "NOTICE" | "CLOSED" | "COUNT" | "AUTH" | "CHUNK",
"payload": <standard_nostr_response_array> "payload": <standard_nostr_response_array>
} }
``` ```
@@ -150,6 +150,7 @@ Where `payload` is the standard Nostr response array, e.g.:
- `["CLOSED", "<sub_id>", "<message>"]` - `["CLOSED", "<sub_id>", "<message>"]`
- `["COUNT", "<sub_id>", {"count": <n>}]` - `["COUNT", "<sub_id>", {"count": <n>}]`
- `["AUTH", "<challenge>"]` - `["AUTH", "<challenge>"]`
- `[<chunk_object>]` (for CHUNK type, see Message Segmentation)
### Session Management ### Session Management
@@ -168,6 +169,85 @@ The conversation key is derived from:
- **Secret-based auth**: ECDH between client's secret key (derived from URI secret) and relay's public key - **Secret-based auth**: ECDH between client's secret key (derived from URI secret) and relay's public key
- **CAT auth**: ECDH between client's Nostr key and relay's public key - **CAT auth**: ECDH between client's Nostr key and relay's public key
### Message Segmentation
Some Nostr events exceed the typical relay message size limits (commonly 64KB). NRC supports message segmentation to handle large payloads by splitting them into multiple chunks.
#### When to Chunk
Senders SHOULD chunk messages when the JSON-serialized response exceeds 40KB. This threshold accounts for:
- NIP-44 encryption overhead (~100 bytes)
- Base64 encoding expansion (~33%)
- Event wrapper overhead (tags, signature, etc.)
#### Chunk Message Format
When a response is too large, it is split into multiple CHUNK responses:
```json
{
"type": "CHUNK",
"payload": [{
"type": "CHUNK",
"messageId": "<uuid>",
"index": 0,
"total": 3,
"data": "<base64_encoded_chunk>"
}]
}
```
Fields:
- `messageId`: A unique identifier (UUID) for the chunked message, used to correlate chunks
- `index`: Zero-based chunk index (0, 1, 2, ...)
- `total`: Total number of chunks in this message
- `data`: Base64-encoded segment of the original message
#### Chunking Process (Sender)
1. Serialize the original response message to JSON
2. If the serialized length exceeds the threshold (40KB), proceed with chunking
3. Encode the JSON string as UTF-8, then Base64 encode it
4. Split the Base64 string into chunks of the maximum chunk size
5. Generate a unique `messageId` (UUID recommended)
6. Send each chunk as a separate CHUNK response event
Example encoding (JavaScript):
```javascript
const encoded = btoa(unescape(encodeURIComponent(jsonString)))
```
#### Reassembly Process (Receiver)
1. When receiving a CHUNK response, buffer it by `messageId`
2. Track received chunks by `index`
3. When all chunks are received (`chunks.size === total`):
a. Concatenate chunk data in index order (0, 1, 2, ...)
b. Base64 decode the concatenated string
c. Parse as UTF-8 JSON to recover the original response
4. Process the reassembled response as normal
5. Clean up the chunk buffer
Example decoding (JavaScript):
```javascript
const jsonString = decodeURIComponent(escape(atob(concatenatedBase64)))
const response = JSON.parse(jsonString)
```
#### Chunk Buffer Management
Receivers MUST implement chunk buffer cleanup:
- Discard incomplete chunk buffers after 60 seconds of inactivity
- Limit the number of concurrent incomplete messages to prevent memory exhaustion
- Log warnings when discarding stale buffers for debugging
#### Ordering and Reliability
- Chunks MAY arrive out of order; receivers MUST reassemble by index
- Missing chunks result in message loss; the incomplete buffer is eventually discarded
- Duplicate chunks (same messageId + index) SHOULD be ignored
- Each chunk is sent as a separate encrypted NRC response event
### Authentication ### Authentication
#### Secret-Based Authentication #### Secret-Based Authentication
@@ -208,6 +288,9 @@ The conversation key is derived from:
4. Match responses using the `e` tag (references request event ID) 4. Match responses using the `e` tag (references request event ID)
5. Handle EOSE by waiting for kind 24892 with type "EOSE" in content 5. Handle EOSE by waiting for kind 24892 with type "EOSE" in content
6. For subscriptions, maintain mapping of internal sub IDs to tunnel session 6. For subscriptions, maintain mapping of internal sub IDs to tunnel session
7. **Chunking**: Maintain a chunk buffer map keyed by `messageId`
8. **Chunking**: When receiving CHUNK responses, buffer chunks and reassemble when complete
9. **Chunking**: Implement 60-second timeout for incomplete chunk buffers
## Bridge Implementation Notes ## Bridge Implementation Notes
@@ -217,10 +300,14 @@ The conversation key is derived from:
4. Capture all relay responses and wrap in kind 24892 4. Capture all relay responses and wrap in kind 24892
5. Sign with relay's key and publish to rendezvous relay 5. Sign with relay's key and publish to rendezvous relay
6. Maintain session state for subscription mapping 6. Maintain session state for subscription mapping
7. **Chunking**: Check response size before sending; chunk if > 40KB
8. **Chunking**: Use consistent messageId (UUID) across all chunks of a message
9. **Chunking**: Send chunks in order (index 0, 1, 2, ...) for optimal reassembly
## Reference Implementations ## Reference Implementations
- ORLY Relay: [https://git.mleku.dev/mleku/next.orly.dev](https://git.mleku.dev/mleku/next.orly.dev) - ORLY Relay (Bridge): [https://git.mleku.dev/mleku/next.orly.dev](https://git.mleku.dev/mleku/next.orly.dev)
- Smesh Client: [https://git.mleku.dev/mleku/smesh](https://git.mleku.dev/mleku/smesh)
## See Also ## See Also

8
go.mod
View File

@@ -3,12 +3,14 @@ module next.orly.dev
go 1.25.3 go 1.25.3
require ( require (
git.mleku.dev/mleku/nostr v1.0.12 git.mleku.dev/mleku/nostr v1.0.13
github.com/adrg/xdg v0.5.3 github.com/adrg/xdg v0.5.3
github.com/alexflint/go-arg v1.6.1 github.com/alexflint/go-arg v1.6.1
github.com/aperturerobotics/go-indexeddb v0.2.3 github.com/aperturerobotics/go-indexeddb v0.2.3
github.com/bits-and-blooms/bloom/v3 v3.7.1
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0
github.com/dgraph-io/badger/v4 v4.8.0 github.com/dgraph-io/badger/v4 v4.8.0
github.com/google/uuid v1.6.0
github.com/gorilla/websocket v1.5.3 github.com/gorilla/websocket v1.5.3
github.com/hack-pad/safejs v0.1.1 github.com/hack-pad/safejs v0.1.1
github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0 github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0
@@ -22,6 +24,7 @@ require (
github.com/stretchr/testify v1.11.1 github.com/stretchr/testify v1.11.1
github.com/vertex-lab/nostr-sqlite v0.3.2 github.com/vertex-lab/nostr-sqlite v0.3.2
go-simpler.org/env v0.12.0 go-simpler.org/env v0.12.0
go.etcd.io/bbolt v1.4.3
go.uber.org/atomic v1.11.0 go.uber.org/atomic v1.11.0
golang.org/x/crypto v0.46.0 golang.org/x/crypto v0.46.0
golang.org/x/lint v0.0.0-20241112194109-818c5a804067 golang.org/x/lint v0.0.0-20241112194109-818c5a804067
@@ -37,7 +40,6 @@ require (
github.com/ImVexed/fasturl v0.0.0-20230304231329-4e41488060f3 // indirect github.com/ImVexed/fasturl v0.0.0-20230304231329-4e41488060f3 // indirect
github.com/alexflint/go-scalar v1.2.0 // indirect github.com/alexflint/go-scalar v1.2.0 // indirect
github.com/bits-and-blooms/bitset v1.24.2 // indirect github.com/bits-and-blooms/bitset v1.24.2 // indirect
github.com/bits-and-blooms/bloom/v3 v3.7.1 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.3.4 // indirect github.com/btcsuite/btcd/btcec/v2 v2.3.4 // indirect
github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0 // indirect github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0 // indirect
github.com/bytedance/sonic v1.13.1 // indirect github.com/bytedance/sonic v1.13.1 // indirect
@@ -56,7 +58,6 @@ require (
github.com/google/btree v1.1.2 // indirect github.com/google/btree v1.1.2 // indirect
github.com/google/flatbuffers v25.9.23+incompatible // indirect github.com/google/flatbuffers v25.9.23+incompatible // indirect
github.com/google/pprof v0.0.0-20251007162407-5df77e3f7d1d // indirect github.com/google/pprof v0.0.0-20251007162407-5df77e3f7d1d // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/josharian/intern v1.0.0 // indirect github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/cpuid/v2 v2.3.0 // indirect github.com/klauspost/cpuid/v2 v2.3.0 // indirect
@@ -72,7 +73,6 @@ require (
github.com/tidwall/match v1.1.1 // indirect github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.1 // indirect github.com/tidwall/pretty v1.2.1 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
go.etcd.io/bbolt v1.4.3 // indirect
go.opentelemetry.io/auto/sdk v1.2.1 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect
go.opentelemetry.io/otel v1.38.0 // indirect go.opentelemetry.io/otel v1.38.0 // indirect
go.opentelemetry.io/otel/metric v1.38.0 // indirect go.opentelemetry.io/otel/metric v1.38.0 // indirect

5
go.sum
View File

@@ -1,5 +1,5 @@
git.mleku.dev/mleku/nostr v1.0.12 h1:bjsFUh1Q3fGpU7qsqxggGgrGGUt2OBdu1w8hjDM4gJE= git.mleku.dev/mleku/nostr v1.0.13 h1:FqeOQ9ZX8AFVsAI6XisQkB6cgmhn9DNQ2a8li9gx7aY=
git.mleku.dev/mleku/nostr v1.0.12/go.mod h1:kJwSMmLRnAJ7QJtgXDv2wGgceFU0luwVqrgAL3MI93M= git.mleku.dev/mleku/nostr v1.0.13/go.mod h1:kJwSMmLRnAJ7QJtgXDv2wGgceFU0luwVqrgAL3MI93M=
github.com/BurntSushi/toml v1.5.0 h1:W5quZX/G/csjUnuI8SUYlsHs9M38FC7znL0lIO+DvMg= github.com/BurntSushi/toml v1.5.0 h1:W5quZX/G/csjUnuI8SUYlsHs9M38FC7znL0lIO+DvMg=
github.com/BurntSushi/toml v1.5.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho= github.com/BurntSushi/toml v1.5.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho=
github.com/ImVexed/fasturl v0.0.0-20230304231329-4e41488060f3 h1:ClzzXMDDuUbWfNNZqGeYq4PnYOlwlOVIvSyNaIy0ykg= github.com/ImVexed/fasturl v0.0.0-20230304231329-4e41488060f3 h1:ClzzXMDDuUbWfNNZqGeYq4PnYOlwlOVIvSyNaIy0ykg=
@@ -161,6 +161,7 @@ github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4=
github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI= github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI=
github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
github.com/twmb/murmur3 v1.1.8 h1:8Yt9taO/WN3l08xErzjeschgZU2QSrwm1kclYq+0aRg=
github.com/twmb/murmur3 v1.1.8/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ= github.com/twmb/murmur3 v1.1.8/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ=
github.com/vertex-lab/nostr-sqlite v0.3.2 h1:8nZYYIwiKnWLA446qA/wL/Gy+bU0kuaxdLfUyfeTt/E= github.com/vertex-lab/nostr-sqlite v0.3.2 h1:8nZYYIwiKnWLA446qA/wL/Gy+bU0kuaxdLfUyfeTt/E=
github.com/vertex-lab/nostr-sqlite v0.3.2/go.mod h1:5bw1wMgJhSdrumsZAWxqy+P0u1g+q02PnlGQn15dnSM= github.com/vertex-lab/nostr-sqlite v0.3.2/go.mod h1:5bw1wMgJhSdrumsZAWxqy+P0u1g+q02PnlGQn15dnSM=

View File

@@ -138,7 +138,7 @@ func (f *Follows) Configure(cfg ...any) (err error) {
if f.cfg.FollowsThrottleEnabled { if f.cfg.FollowsThrottleEnabled {
perEvent := f.cfg.FollowsThrottlePerEvent perEvent := f.cfg.FollowsThrottlePerEvent
if perEvent == 0 { if perEvent == 0 {
perEvent = 200 * time.Millisecond perEvent = 25 * time.Millisecond
} }
maxDelay := f.cfg.FollowsThrottleMaxDelay maxDelay := f.cfg.FollowsThrottleMaxDelay
if maxDelay == 0 { if maxDelay == 0 {

View File

@@ -200,6 +200,12 @@ func (s *Server) handleUpload(w http.ResponseWriter, r *http.Request) {
return return
} }
// Check bandwidth rate limit (non-followed users)
if !s.checkBandwidthLimit(pubkey, remoteAddr, int64(len(body))) {
s.setErrorResponse(w, http.StatusTooManyRequests, "upload rate limit exceeded, try again later")
return
}
// Calculate SHA256 after auth check // Calculate SHA256 after auth check
sha256Hash := CalculateSHA256(body) sha256Hash := CalculateSHA256(body)
sha256Hex := hex.Enc(sha256Hash) sha256Hex := hex.Enc(sha256Hash)
@@ -647,6 +653,12 @@ func (s *Server) handleMirror(w http.ResponseWriter, r *http.Request) {
return return
} }
// Check bandwidth rate limit (non-followed users)
if !s.checkBandwidthLimit(pubkey, remoteAddr, int64(len(body))) {
s.setErrorResponse(w, http.StatusTooManyRequests, "upload rate limit exceeded, try again later")
return
}
// Note: pubkey may be nil for anonymous uploads if ACL allows it // Note: pubkey may be nil for anonymous uploads if ACL allows it
// Detect MIME type from remote response // Detect MIME type from remote response
@@ -726,6 +738,12 @@ func (s *Server) handleMediaUpload(w http.ResponseWriter, r *http.Request) {
return return
} }
// Check bandwidth rate limit (non-followed users)
if !s.checkBandwidthLimit(pubkey, remoteAddr, int64(len(body))) {
s.setErrorResponse(w, http.StatusTooManyRequests, "upload rate limit exceeded, try again later")
return
}
// Note: pubkey may be nil for anonymous uploads if ACL allows it // Note: pubkey may be nil for anonymous uploads if ACL allows it
// Optimize media (placeholder - actual optimization would be implemented here) // Optimize media (placeholder - actual optimization would be implemented here)

131
pkg/blossom/ratelimit.go Normal file
View File

@@ -0,0 +1,131 @@
package blossom
import (
"sync"
"time"
)
// BandwidthState tracks upload bandwidth for an identity
type BandwidthState struct {
BucketBytes int64 // Current token bucket level (bytes available)
LastUpdate time.Time // Last time bucket was updated
}
// BandwidthLimiter implements token bucket rate limiting for uploads.
// Each identity gets a bucket that replenishes at dailyLimit/day rate.
// Uploads consume tokens from the bucket.
type BandwidthLimiter struct {
mu sync.Mutex
states map[string]*BandwidthState // keyed by pubkey hex or IP
dailyLimit int64 // bytes per day
burstLimit int64 // max bucket size (burst capacity)
refillRate float64 // bytes per second refill rate
}
// NewBandwidthLimiter creates a new bandwidth limiter.
// dailyLimitMB is the average daily limit in megabytes.
// burstLimitMB is the maximum burst capacity in megabytes.
func NewBandwidthLimiter(dailyLimitMB, burstLimitMB int64) *BandwidthLimiter {
dailyBytes := dailyLimitMB * 1024 * 1024
burstBytes := burstLimitMB * 1024 * 1024
return &BandwidthLimiter{
states: make(map[string]*BandwidthState),
dailyLimit: dailyBytes,
burstLimit: burstBytes,
refillRate: float64(dailyBytes) / 86400.0, // bytes per second
}
}
// CheckAndConsume checks if an upload of the given size is allowed for the identity,
// and if so, consumes the tokens. Returns true if allowed, false if rate limited.
// The identity should be pubkey hex for authenticated users, or IP for anonymous.
func (bl *BandwidthLimiter) CheckAndConsume(identity string, sizeBytes int64) bool {
bl.mu.Lock()
defer bl.mu.Unlock()
now := time.Now()
state, exists := bl.states[identity]
if !exists {
// New identity starts with full burst capacity
state = &BandwidthState{
BucketBytes: bl.burstLimit,
LastUpdate: now,
}
bl.states[identity] = state
} else {
// Refill bucket based on elapsed time
elapsed := now.Sub(state.LastUpdate).Seconds()
refill := int64(elapsed * bl.refillRate)
state.BucketBytes += refill
if state.BucketBytes > bl.burstLimit {
state.BucketBytes = bl.burstLimit
}
state.LastUpdate = now
}
// Check if upload fits in bucket
if state.BucketBytes >= sizeBytes {
state.BucketBytes -= sizeBytes
return true
}
return false
}
// GetAvailable returns the currently available bytes for an identity.
func (bl *BandwidthLimiter) GetAvailable(identity string) int64 {
bl.mu.Lock()
defer bl.mu.Unlock()
state, exists := bl.states[identity]
if !exists {
return bl.burstLimit // New users have full capacity
}
// Calculate current level with refill
now := time.Now()
elapsed := now.Sub(state.LastUpdate).Seconds()
refill := int64(elapsed * bl.refillRate)
available := state.BucketBytes + refill
if available > bl.burstLimit {
available = bl.burstLimit
}
return available
}
// GetTimeUntilAvailable returns how long until the given bytes will be available.
func (bl *BandwidthLimiter) GetTimeUntilAvailable(identity string, sizeBytes int64) time.Duration {
available := bl.GetAvailable(identity)
if available >= sizeBytes {
return 0
}
needed := sizeBytes - available
seconds := float64(needed) / bl.refillRate
return time.Duration(seconds * float64(time.Second))
}
// Cleanup removes entries that have fully replenished (at burst limit).
func (bl *BandwidthLimiter) Cleanup() {
bl.mu.Lock()
defer bl.mu.Unlock()
now := time.Now()
for key, state := range bl.states {
elapsed := now.Sub(state.LastUpdate).Seconds()
refill := int64(elapsed * bl.refillRate)
if state.BucketBytes+refill >= bl.burstLimit {
delete(bl.states, key)
}
}
}
// Stats returns the number of tracked identities.
func (bl *BandwidthLimiter) Stats() int {
bl.mu.Lock()
defer bl.mu.Unlock()
return len(bl.states)
}

View File

@@ -19,6 +19,9 @@ type Server struct {
maxBlobSize int64 maxBlobSize int64
allowedMimeTypes map[string]bool allowedMimeTypes map[string]bool
requireAuth bool requireAuth bool
// Rate limiting for uploads
bandwidthLimiter *BandwidthLimiter
} }
// Config holds configuration for the Blossom server // Config holds configuration for the Blossom server
@@ -27,6 +30,11 @@ type Config struct {
MaxBlobSize int64 MaxBlobSize int64
AllowedMimeTypes []string AllowedMimeTypes []string
RequireAuth bool RequireAuth bool
// Rate limiting (for non-followed users)
RateLimitEnabled bool
DailyLimitMB int64
BurstLimitMB int64
} }
// NewServer creates a new Blossom server instance // NewServer creates a new Blossom server instance
@@ -48,6 +56,20 @@ func NewServer(db *database.D, aclRegistry *acl.S, cfg *Config) *Server {
} }
} }
// Initialize bandwidth limiter if enabled
var bwLimiter *BandwidthLimiter
if cfg.RateLimitEnabled {
dailyMB := cfg.DailyLimitMB
if dailyMB <= 0 {
dailyMB = 10 // 10MB default
}
burstMB := cfg.BurstLimitMB
if burstMB <= 0 {
burstMB = 50 // 50MB default burst
}
bwLimiter = NewBandwidthLimiter(dailyMB, burstMB)
}
return &Server{ return &Server{
db: db, db: db,
storage: storage, storage: storage,
@@ -56,6 +78,7 @@ func NewServer(db *database.D, aclRegistry *acl.S, cfg *Config) *Server {
maxBlobSize: cfg.MaxBlobSize, maxBlobSize: cfg.MaxBlobSize,
allowedMimeTypes: allowedMap, allowedMimeTypes: allowedMap,
requireAuth: cfg.RequireAuth, requireAuth: cfg.RequireAuth,
bandwidthLimiter: bwLimiter,
} }
} }
@@ -208,6 +231,44 @@ func (s *Server) checkACL(
return actual >= required return actual >= required
} }
// isRateLimitExempt returns true if the user is exempt from rate limiting.
// Users with write access or higher (followed users, admins, owners) are exempt.
func (s *Server) isRateLimitExempt(pubkey []byte, remoteAddr string) bool {
if s.acl == nil {
return true // No ACL configured, no rate limiting
}
level := s.acl.GetAccessLevel(pubkey, remoteAddr)
// Followed users get "write" level, admins/owners get higher
// Only "read" and "none" are rate limited
return level == "write" || level == "admin" || level == "owner"
}
// checkBandwidthLimit checks if the upload is allowed under rate limits.
// Returns true if allowed, false if rate limited.
// Exempt users (followed, admin, owner) always return true.
func (s *Server) checkBandwidthLimit(pubkey []byte, remoteAddr string, sizeBytes int64) bool {
if s.bandwidthLimiter == nil {
return true // No rate limiting configured
}
// Check if user is exempt
if s.isRateLimitExempt(pubkey, remoteAddr) {
return true
}
// Use pubkey hex if available, otherwise IP
var identity string
if len(pubkey) > 0 {
identity = string(pubkey) // Will be converted to hex in handler
} else {
identity = remoteAddr
}
return s.bandwidthLimiter.CheckAndConsume(identity, sizeBytes)
}
// BaseURLKey is the context key for the base URL (exported for use by app handler) // BaseURLKey is the context key for the base URL (exported for use by app handler)
type BaseURLKey struct{} type BaseURLKey struct{}

View File

@@ -6,6 +6,7 @@ import (
"sort" "sort"
"lol.mleku.dev/chk" "lol.mleku.dev/chk"
"lol.mleku.dev/errorf"
"lol.mleku.dev/log" "lol.mleku.dev/log"
"next.orly.dev/pkg/database/indexes" "next.orly.dev/pkg/database/indexes"
types2 "next.orly.dev/pkg/database/indexes/types" types2 "next.orly.dev/pkg/database/indexes/types"
@@ -44,6 +45,12 @@ func NormalizeTagValueForHash(key byte, valueBytes []byte) []byte {
func CreateIdHashFromData(data []byte) (i *types2.IdHash, err error) { func CreateIdHashFromData(data []byte) (i *types2.IdHash, err error) {
i = new(types2.IdHash) i = new(types2.IdHash)
// Skip empty data to avoid noisy errors
if len(data) == 0 {
err = errorf.E("CreateIdHashFromData: empty ID provided")
return
}
// If data looks like hex string and has the right length for hex-encoded // If data looks like hex string and has the right length for hex-encoded
// sha256 // sha256
if len(data) == 64 { if len(data) == 64 {
@@ -95,6 +102,11 @@ func GetIndexesFromFilter(f *filter.F) (idxs []Range, err error) {
// should be an error, but convention just ignores it. // should be an error, but convention just ignores it.
if f.Ids.Len() > 0 { if f.Ids.Len() > 0 {
for _, id := range f.Ids.T { for _, id := range f.Ids.T {
// Skip empty IDs - some filters have empty ID values
if len(id) == 0 {
log.D.F("GetIndexesFromFilter: skipping empty ID in filter (ids=%d)", f.Ids.Len())
continue
}
if err = func() (err error) { if err = func() (err error) {
var i *types2.IdHash var i *types2.IdHash
if i, err = CreateIdHashFromData(id); chk.E(err) { if i, err = CreateIdHashFromData(id); chk.E(err) {

View File

@@ -20,6 +20,10 @@ import (
func (d *D) GetSerialById(id []byte) (ser *types.Uint40, err error) { func (d *D) GetSerialById(id []byte) (ser *types.Uint40, err error) {
// log.T.F("GetSerialById: input id=%s", hex.Enc(id)) // log.T.F("GetSerialById: input id=%s", hex.Enc(id))
if len(id) == 0 {
err = errorf.E("GetSerialById: called with empty ID")
return
}
var idxs []Range var idxs []Range
if idxs, err = GetIndexesFromFilter(&filter.F{Ids: tag.NewFromBytesSlice(id)}); chk.E(err) { if idxs, err = GetIndexesFromFilter(&filter.F{Ids: tag.NewFromBytesSlice(id)}); chk.E(err) {
return return
@@ -102,6 +106,10 @@ func (d *D) GetSerialsByIdsWithFilter(
// Process each ID sequentially // Process each ID sequentially
for _, id := range ids.T { for _, id := range ids.T {
// Skip empty IDs
if len(id) == 0 {
continue
}
// idHex := hex.Enc(id) // idHex := hex.Enc(id)
// Get the index prefix for this ID // Get the index prefix for this ID

View File

@@ -24,8 +24,8 @@ func (i *IdHash) Set(idh []byte) {
func (i *IdHash) FromId(id []byte) (err error) { func (i *IdHash) FromId(id []byte) (err error) {
if len(id) != sha256.Size { if len(id) != sha256.Size {
err = errorf.E( err = errorf.E(
"FromId: invalid ID length, got %d require %d", len(id), "FromId: invalid ID length, got %d require %d (data=%x)", len(id),
sha256.Size, sha256.Size, id,
) )
return return
} }

View File

@@ -3,6 +3,7 @@ package routing
import ( import (
"git.mleku.dev/mleku/nostr/encoders/event" "git.mleku.dev/mleku/nostr/encoders/event"
"git.mleku.dev/mleku/nostr/encoders/kind" "git.mleku.dev/mleku/nostr/encoders/kind"
"lol.mleku.dev/log"
) )
// Publisher abstracts event delivery to subscribers. // Publisher abstracts event delivery to subscribers.
@@ -22,6 +23,7 @@ func IsEphemeral(k uint16) bool {
// - Are immediately delivered to subscribers // - Are immediately delivered to subscribers
func MakeEphemeralHandler(publisher Publisher) Handler { func MakeEphemeralHandler(publisher Publisher) Handler {
return func(ev *event.E, authedPubkey []byte) Result { return func(ev *event.E, authedPubkey []byte) Result {
log.I.F("ephemeral handler received event kind %d, id %0x", ev.Kind, ev.ID[:8])
// Clone and deliver immediately without persistence // Clone and deliver immediately without persistence
cloned := ev.Clone() cloned := ev.Clone()
go publisher.Deliver(cloned) go publisher.Deliver(cloned)

View File

@@ -130,11 +130,13 @@ func (n *N) buildCypherQuery(f *filter.F, includeDeleteEvents bool) (string, map
} }
// Time range filters - for temporal queries // Time range filters - for temporal queries
if f.Since != nil { // Note: Check both pointer and value - a zero timestamp (Unix epoch 1970) is almost
// certainly not a valid constraint as Nostr events didn't exist then
if f.Since != nil && f.Since.V > 0 {
params["since"] = f.Since.V params["since"] = f.Since.V
whereClauses = append(whereClauses, "e.created_at >= $since") whereClauses = append(whereClauses, "e.created_at >= $since")
} }
if f.Until != nil { if f.Until != nil && f.Until.V > 0 {
params["until"] = f.Until.V params["until"] = f.Until.V
whereClauses = append(whereClauses, "e.created_at <= $until") whereClauses = append(whereClauses, "e.created_at <= $until")
} }
@@ -300,19 +302,17 @@ func (n *N) parseEventsFromResult(result *CollectedResult) ([]*event.E, error) {
_ = tags.UnmarshalJSON([]byte(tagsStr)) _ = tags.UnmarshalJSON([]byte(tagsStr))
} }
// Create event // Create event with decoded binary fields
e := &event.E{ e := &event.E{
ID: id,
Pubkey: pubkey,
Kind: uint16(kind), Kind: uint16(kind),
CreatedAt: createdAt, CreatedAt: createdAt,
Content: []byte(content), Content: []byte(content),
Tags: tags, Tags: tags,
Sig: sig,
} }
// Copy fixed-size arrays
copy(e.ID[:], id)
copy(e.Sig[:], sig)
copy(e.Pubkey[:], pubkey)
events = append(events, e) events = append(events, e)
} }

View File

@@ -377,16 +377,14 @@ func (l *Limiter) ComputeDelay(opType OperationType) time.Duration {
// In emergency mode, apply progressive throttling for writes // In emergency mode, apply progressive throttling for writes
if inEmergency { if inEmergency {
// Calculate how far above recovery threshold we are // Calculate how far above emergency threshold we are
// At emergency threshold, add 1x normal delay // Linear scaling: multiplier = 1 + (excess * 5)
// For every additional 10% above emergency, double the delay // At emergency threshold: 1x, at +20% above: 2x, at +40% above: 3x
excessPressure := metrics.MemoryPressure - l.config.RecoveryThreshold excessPressure := metrics.MemoryPressure - l.config.EmergencyThreshold
if excessPressure > 0 { if excessPressure < 0 {
// Progressive multiplier: starts at 2x, doubles every 10% excess excessPressure = 0
multiplier := 2.0
for excess := excessPressure; excess > 0.1; excess -= 0.1 {
multiplier *= 2
} }
multiplier := 1.0 + excessPressure*5.0
emergencyDelaySec := delaySec * multiplier emergencyDelaySec := delaySec * multiplier
maxEmergencySec := float64(l.config.EmergencyMaxDelayMs) / 1000.0 maxEmergencySec := float64(l.config.EmergencyMaxDelayMs) / 1000.0
@@ -400,7 +398,6 @@ func (l *Limiter) ComputeDelay(opType OperationType) time.Duration {
} }
delaySec = emergencyDelaySec delaySec = emergencyDelaySec
} }
}
if delaySec > 0 { if delaySec > 0 {
l.writeThrottles.Add(1) l.writeThrottles.Add(1)

View File

@@ -1 +1 @@
v0.48.14 v0.49.1