Files
next.orly.dev/app/handle-req.go
mleku 6b72f1f2b7
Some checks failed
Go / build-and-release (push) Has been cancelled
Update privileged event filtering to respect ACL mode
Privileged events are now filtered based on ACL mode, allowing open access when ACL is "none." Added tests to verify behavior for different ACL modes, ensuring unauthorized and unauthenticated users can only access privileged events when explicitly permitted. Version bumped to v0.34.2.
2025-12-05 10:02:49 +00:00

784 lines
24 KiB
Go

package app
import (
"context"
"encoding/hex"
"errors"
"fmt"
"strings"
"time"
"github.com/dgraph-io/badger/v4"
"lol.mleku.dev/chk"
"lol.mleku.dev/log"
"next.orly.dev/pkg/acl"
"git.mleku.dev/mleku/nostr/encoders/bech32encoding"
"git.mleku.dev/mleku/nostr/encoders/envelopes/authenvelope"
"git.mleku.dev/mleku/nostr/encoders/envelopes/closedenvelope"
"git.mleku.dev/mleku/nostr/encoders/envelopes/eoseenvelope"
"git.mleku.dev/mleku/nostr/encoders/envelopes/eventenvelope"
"git.mleku.dev/mleku/nostr/encoders/envelopes/reqenvelope"
"git.mleku.dev/mleku/nostr/encoders/event"
"git.mleku.dev/mleku/nostr/encoders/filter"
hexenc "git.mleku.dev/mleku/nostr/encoders/hex"
"git.mleku.dev/mleku/nostr/encoders/kind"
"git.mleku.dev/mleku/nostr/encoders/reason"
"git.mleku.dev/mleku/nostr/encoders/tag"
"next.orly.dev/pkg/policy"
"next.orly.dev/pkg/protocol/graph"
"next.orly.dev/pkg/protocol/nip43"
"git.mleku.dev/mleku/nostr/utils/normalize"
"git.mleku.dev/mleku/nostr/utils/pointers"
)
func (l *Listener) HandleReq(msg []byte) (err error) {
log.D.F("handling REQ: %s", msg)
log.T.F("HandleReq: START processing from %s", l.remote)
// var rem []byte
env := reqenvelope.New()
if _, err = env.Unmarshal(msg); chk.E(err) {
// Provide more specific error context for JSON parsing failures
if strings.Contains(err.Error(), "invalid character") {
log.E.F("REQ JSON parsing failed from %s: %v", l.remote, err)
log.T.F("REQ malformed message from %s: %q", l.remote, string(msg))
return normalize.Error.Errorf("malformed REQ message: %s", err.Error())
}
return normalize.Error.Errorf(err.Error())
}
log.T.C(
func() string {
return fmt.Sprintf(
"REQ sub=%s filters=%d", env.Subscription, len(*env.Filters),
)
},
)
// send a challenge to the client to auth if an ACL is active, auth is required, or AuthToWrite is enabled
if len(l.authedPubkey.Load()) == 0 && (acl.Registry.Active.Load() != "none" || l.Config.AuthRequired || l.Config.AuthToWrite) {
if err = authenvelope.NewChallengeWith(l.challenge.Load()).
Write(l); chk.E(err) {
return
}
}
// check permissions of user
accessLevel := acl.Registry.GetAccessLevel(l.authedPubkey.Load(), l.remote)
// If auth is required but user is not authenticated, deny access
if l.Config.AuthRequired && len(l.authedPubkey.Load()) == 0 {
if err = closedenvelope.NewFrom(
env.Subscription,
reason.AuthRequired.F("authentication required"),
).Write(l); chk.E(err) {
return
}
return
}
// If AuthToWrite is enabled, allow REQ without auth (but still check ACL)
// Skip the auth requirement check for REQ when AuthToWrite is true
if l.Config.AuthToWrite && len(l.authedPubkey.Load()) == 0 {
// Allow unauthenticated REQ when AuthToWrite is enabled
// but still respect ACL access levels if ACL is active
if acl.Registry.Active.Load() != "none" {
switch accessLevel {
case "none", "blocked", "banned":
if err = closedenvelope.NewFrom(
env.Subscription,
reason.AuthRequired.F("user not authed or has no read access"),
).Write(l); chk.E(err) {
return
}
return
}
}
// Allow the request to proceed without authentication
}
// Only check ACL access level if not already handled by AuthToWrite
if !l.Config.AuthToWrite || len(l.authedPubkey.Load()) > 0 {
switch accessLevel {
case "none":
// For REQ denial, send a CLOSED with auth-required reason (NIP-01)
if err = closedenvelope.NewFrom(
env.Subscription,
reason.AuthRequired.F("user not authed or has no read access"),
).Write(l); chk.E(err) {
return
}
return
default:
// user has read access or better, continue
}
}
// Handle NIP-43 invite request (kind 28935) - ephemeral event
// Check if any filter requests kind 28935
for _, f := range *env.Filters {
if f != nil && f.Kinds != nil {
if f.Kinds.Contains(nip43.KindInviteReq) {
// Generate and send invite event
inviteEvent, err := l.Server.HandleNIP43InviteRequest(l.authedPubkey.Load())
if err != nil {
log.W.F("failed to generate NIP-43 invite: %v", err)
// Send EOSE and return
if err = eoseenvelope.NewFrom(env.Subscription).Write(l); chk.E(err) {
return err
}
return nil
}
// Send the invite event
evEnv, _ := eventenvelope.NewResultWith(env.Subscription, inviteEvent)
if err = evEnv.Write(l); chk.E(err) {
return err
}
// Send EOSE
if err = eoseenvelope.NewFrom(env.Subscription).Write(l); chk.E(err) {
return err
}
log.I.F("sent NIP-43 invite event to %s", l.remote)
return nil
}
}
}
// Check for NIP-XX graph queries in filters
// Graph queries use the _graph filter extension to traverse the social graph
for _, f := range *env.Filters {
if f != nil && graph.IsGraphQuery(f) {
graphQuery, graphErr := graph.ExtractFromFilter(f)
if graphErr != nil {
log.W.F("invalid _graph query from %s: %v", l.remote, graphErr)
if err = closedenvelope.NewFrom(
env.Subscription,
reason.Error.F("invalid _graph query: %s", graphErr.Error()),
).Write(l); chk.E(err) {
return
}
return
}
if graphQuery != nil {
log.I.F("graph query from %s: method=%s seed=%s depth=%d",
l.remote, graphQuery.Method, graphQuery.Seed, graphQuery.Depth)
// Check if graph executor is available
if l.graphExecutor == nil {
log.W.F("graph query received but executor not initialized")
if err = closedenvelope.NewFrom(
env.Subscription,
reason.Error.F("graph queries not supported on this relay"),
).Write(l); chk.E(err) {
return
}
return
}
// Execute the graph query
resultEvent, execErr := l.graphExecutor.Execute(graphQuery)
if execErr != nil {
log.W.F("graph query execution failed from %s: %v", l.remote, execErr)
if err = closedenvelope.NewFrom(
env.Subscription,
reason.Error.F("graph query failed: %s", execErr.Error()),
).Write(l); chk.E(err) {
return
}
return
}
// Send the result event
var res *eventenvelope.Result
if res, err = eventenvelope.NewResultWith(env.Subscription, resultEvent); chk.E(err) {
return
}
if err = res.Write(l); chk.E(err) {
return
}
// Send EOSE to signal completion
if err = eoseenvelope.NewFrom(env.Subscription).Write(l); chk.E(err) {
return
}
log.I.F("graph query completed for %s: method=%s, returned event kind %d",
l.remote, graphQuery.Method, resultEvent.Kind)
return
}
}
}
// Filter out policy config events (kind 12345) for non-policy-admin users
// Policy config events should only be visible to policy administrators
if l.policyManager != nil && l.policyManager.IsEnabled() {
isPolicyAdmin := l.policyManager.IsPolicyAdmin(l.authedPubkey.Load())
if !isPolicyAdmin {
// Remove kind 12345 from all filters
for _, f := range *env.Filters {
if f != nil && f.Kinds != nil && f.Kinds.Len() > 0 {
// Create a new kinds list without PolicyConfig
var filteredKinds []*kind.K
for _, k := range f.Kinds.K {
if k.K != kind.PolicyConfig.K {
filteredKinds = append(filteredKinds, k)
}
}
f.Kinds.K = filteredKinds
}
}
}
}
var events event.S
// Create a single context for all filter queries, isolated from the connection context
// to prevent query timeouts from affecting the long-lived websocket connection
queryCtx, queryCancel := context.WithTimeout(
context.Background(), 30*time.Second,
)
defer queryCancel()
// Check cache first for single-filter queries (most common case)
// Multi-filter queries are not cached as they're more complex
if len(*env.Filters) == 1 && env.Filters != nil {
f := (*env.Filters)[0]
if cachedEvents, found := l.DB.GetCachedEvents(f); found {
log.D.F("REQ %s: cache HIT, sending %d cached events", env.Subscription, len(cachedEvents))
// Wrap cached events with current subscription ID
for _, ev := range cachedEvents {
var res *eventenvelope.Result
if res, err = eventenvelope.NewResultWith(env.Subscription, ev); chk.E(err) {
return
}
if err = res.Write(l); err != nil {
if !strings.Contains(err.Error(), "context canceled") {
chk.E(err)
}
return
}
}
// Send EOSE
if err = eoseenvelope.NewFrom(env.Subscription).Write(l); chk.E(err) {
return
}
// Don't create subscription for cached results with satisfied limits
if f.Limit != nil && len(cachedEvents) >= int(*f.Limit) {
log.D.F("REQ %s: limit satisfied by cache, not creating subscription", env.Subscription)
return
}
// Fall through to create subscription for ongoing updates
}
}
// Collect all events from all filters
var allEvents event.S
for _, f := range *env.Filters {
if f != nil {
// Summarize filter details for diagnostics (avoid internal fields)
var kindsLen int
if f.Kinds != nil {
kindsLen = f.Kinds.Len()
}
var authorsLen int
if f.Authors != nil {
authorsLen = f.Authors.Len()
}
var idsLen int
if f.Ids != nil {
idsLen = f.Ids.Len()
}
var dtag string
if f.Tags != nil {
if d := f.Tags.GetFirst([]byte("d")); d != nil {
dtag = string(d.Value())
}
}
var lim any
if f.Limit != nil {
lim = *f.Limit
}
var since any
if f.Since != nil {
since = f.Since.Int()
}
var until any
if f.Until != nil {
until = f.Until.Int()
}
log.T.C(
func() string {
return fmt.Sprintf(
"REQ %s filter: kinds.len=%d authors.len=%d ids.len=%d d=%q limit=%v since=%v until=%v",
env.Subscription, kindsLen, authorsLen, idsLen, dtag,
lim, since, until,
)
},
)
// Process large author lists by breaking them into chunks
if f.Authors != nil && f.Authors.Len() > 1000 {
log.W.F("REQ %s: breaking down large author list (%d authors) into chunks", env.Subscription, f.Authors.Len())
// Calculate chunk size to stay under message size limits
// Each pubkey is 64 hex chars, plus JSON overhead, so ~100 bytes per author
// Target ~50MB per chunk to stay well under 100MB limit
chunkSize := ClientMessageSizeLimit / 200 // ~500KB per chunk
if f.Kinds != nil && f.Kinds.Len() > 0 {
// Reduce chunk size if there are multiple kinds to prevent too many index ranges
chunkSize = chunkSize / f.Kinds.Len()
if chunkSize < 100 {
chunkSize = 100 // Minimum chunk size
}
}
// Process authors in chunks
for i := 0; i < f.Authors.Len(); i += chunkSize {
end := i + chunkSize
if end > f.Authors.Len() {
end = f.Authors.Len()
}
// Create a chunk filter
chunkAuthors := tag.NewFromBytesSlice(f.Authors.T[i:end]...)
chunkFilter := &filter.F{
Kinds: f.Kinds,
Authors: chunkAuthors,
Ids: f.Ids,
Tags: f.Tags,
Since: f.Since,
Until: f.Until,
Limit: f.Limit,
Search: f.Search,
}
log.T.F("REQ %s: processing chunk %d-%d of %d authors", env.Subscription, i+1, end, f.Authors.Len())
// Process this chunk
var chunkEvents event.S
if chunkEvents, err = l.QueryEvents(queryCtx, chunkFilter); chk.E(err) {
if errors.Is(err, badger.ErrDBClosed) {
return
}
log.E.F("QueryEvents failed for chunk filter: %v", err)
err = nil
continue
}
// Add chunk results to overall results
allEvents = append(allEvents, chunkEvents...)
// Check if we've hit the limit
if f.Limit != nil && len(allEvents) >= int(*f.Limit) {
log.T.F("REQ %s: reached limit of %d events, stopping chunk processing", env.Subscription, *f.Limit)
break
}
}
// Skip the normal processing since we handled it in chunks
continue
}
}
if f != nil && pointers.Present(f.Limit) {
if *f.Limit == 0 {
continue
}
}
var filterEvents event.S
if filterEvents, err = l.QueryEvents(queryCtx, f); chk.E(err) {
if errors.Is(err, badger.ErrDBClosed) {
return
}
log.E.F("QueryEvents failed for filter: %v", err)
err = nil
continue
}
// Append events from this filter to the overall collection
allEvents = append(allEvents, filterEvents...)
}
events = allEvents
defer func() {
for _, ev := range events {
ev.Free()
}
}()
var tmp event.S
for _, ev := range events {
// Check for private tag first
privateTags := ev.Tags.GetAll([]byte("private"))
if len(privateTags) > 0 && accessLevel != "admin" {
pk := l.authedPubkey.Load()
if pk == nil {
continue // no auth, can't access private events
}
// Convert authenticated pubkey to npub for comparison
authedNpub, err := bech32encoding.BinToNpub(pk)
if err != nil {
continue // couldn't convert pubkey, skip
}
// Check if authenticated npub is in any private tag
authorized := false
for _, privateTag := range privateTags {
authorizedNpubs := strings.Split(
string(privateTag.Value()), ",",
)
for _, npub := range authorizedNpubs {
if strings.TrimSpace(npub) == string(authedNpub) {
authorized = true
break
}
}
if authorized {
break
}
}
if !authorized {
continue // not authorized to see this private event
}
// Event has private tag and user is authorized - continue to privileged check
}
// Filter privileged events based on kind when ACL is active
// When ACL is "none", skip privileged filtering to allow open access
// Privileged events should only be sent to users who are authenticated and
// are either the event author or listed in p tags
aclActive := acl.Registry.Active.Load() != "none"
if kind.IsPrivileged(ev.Kind) && aclActive && accessLevel != "admin" { // admins can see all events
log.T.C(
func() string {
return fmt.Sprintf(
"checking privileged event %0x", ev.ID,
)
},
)
pk := l.authedPubkey.Load()
// Use centralized IsPartyInvolved function for consistent privilege checking
if policy.IsPartyInvolved(ev, pk) {
log.T.C(
func() string {
return fmt.Sprintf(
"privileged event %s allowed for logged in pubkey %0x",
ev.ID, pk,
)
},
)
tmp = append(tmp, ev)
} else {
log.T.C(
func() string {
return fmt.Sprintf(
"privileged event %s denied for pubkey %0x (not authenticated or not a party involved)",
ev.ID, pk,
)
},
)
}
} else {
// Policy-defined privileged events are handled by the policy engine
// at line 455+. No early filtering needed here - delegate entirely to
// the policy engine to avoid duplicate logic.
tmp = append(tmp, ev)
}
}
events = tmp
// Apply policy filtering for read access if policy is enabled
if l.policyManager.IsEnabled() {
var policyFilteredEvents event.S
for _, ev := range events {
allowed, policyErr := l.policyManager.CheckPolicy("read", ev, l.authedPubkey.Load(), l.remote)
if chk.E(policyErr) {
log.E.F("policy check failed for read: %v", policyErr)
// Default to allow on policy error
policyFilteredEvents = append(policyFilteredEvents, ev)
continue
}
if allowed {
policyFilteredEvents = append(policyFilteredEvents, ev)
} else {
log.D.F("policy filtered out event %0x for read access", ev.ID)
}
}
events = policyFilteredEvents
}
// Deduplicate events (in case chunk processing returned duplicates)
// Use events (already filtered for privileged/policy) instead of allEvents
if len(events) > 0 {
seen := make(map[string]struct{})
var deduplicatedEvents event.S
originalCount := len(events)
for _, ev := range events {
eventID := hexenc.Enc(ev.ID)
if _, exists := seen[eventID]; !exists {
seen[eventID] = struct{}{}
deduplicatedEvents = append(deduplicatedEvents, ev)
}
}
events = deduplicatedEvents
if originalCount != len(events) {
log.T.F("REQ %s: deduplicated %d events to %d unique events", env.Subscription, originalCount, len(events))
}
}
// Apply managed ACL filtering for read access if managed ACL is active
if acl.Registry.Active.Load() == "managed" {
var aclFilteredEvents event.S
for _, ev := range events {
// Check if event is banned
eventID := hex.EncodeToString(ev.ID)
if banned, err := l.getManagedACL().IsEventBanned(eventID); err == nil && banned {
log.D.F("managed ACL filtered out banned event %s", hexenc.Enc(ev.ID))
continue
}
// Check if event author is banned
authorHex := hex.EncodeToString(ev.Pubkey)
if banned, err := l.getManagedACL().IsPubkeyBanned(authorHex); err == nil && banned {
log.D.F("managed ACL filtered out event %s from banned pubkey %s", hexenc.Enc(ev.ID), authorHex)
continue
}
// Check if event kind is allowed (only if allowed kinds are configured)
if allowed, err := l.getManagedACL().IsKindAllowed(int(ev.Kind)); err == nil && !allowed {
allowedKinds, err := l.getManagedACL().ListAllowedKinds()
if err == nil && len(allowedKinds) > 0 {
log.D.F("managed ACL filtered out event %s with disallowed kind %d", hexenc.Enc(ev.ID), ev.Kind)
continue
}
}
aclFilteredEvents = append(aclFilteredEvents, ev)
}
events = aclFilteredEvents
}
// Apply private tag filtering - only show events with "private" tags to authorized users
var privateFilteredEvents event.S
authedPubkey := l.authedPubkey.Load()
for _, ev := range events {
// Check if event has private tags
hasPrivateTag := false
var privatePubkey []byte
if ev.Tags != nil && ev.Tags.Len() > 0 {
for _, t := range *ev.Tags {
if t.Len() >= 2 {
keyBytes := t.Key()
if len(keyBytes) == 7 && string(keyBytes) == "private" {
hasPrivateTag = true
privatePubkey = t.Value()
break
}
}
}
}
// If no private tag, include the event
if !hasPrivateTag {
privateFilteredEvents = append(privateFilteredEvents, ev)
continue
}
// Event has private tag - check if user is authorized to see it
canSeePrivate := l.canSeePrivateEvent(authedPubkey, privatePubkey)
if canSeePrivate {
privateFilteredEvents = append(privateFilteredEvents, ev)
log.D.F("private tag: allowing event %s for authorized user", hexenc.Enc(ev.ID))
} else {
log.D.F("private tag: filtering out event %s from unauthorized user", hexenc.Enc(ev.ID))
}
}
events = privateFilteredEvents
seen := make(map[string]struct{})
// Cache events for single-filter queries (without subscription ID)
shouldCache := len(*env.Filters) == 1 && len(events) > 0
for _, ev := range events {
log.T.C(
func() string {
return fmt.Sprintf(
"REQ %s: sending EVENT id=%s kind=%d", env.Subscription,
hexenc.Enc(ev.ID), ev.Kind,
)
},
)
log.T.C(
func() string {
return fmt.Sprintf("event:\n%s\n", ev.Serialize())
},
)
var res *eventenvelope.Result
if res, err = eventenvelope.NewResultWith(
env.Subscription, ev,
); chk.E(err) {
return
}
if err = res.Write(l); err != nil {
// Don't log context canceled errors as they're expected during shutdown
if !strings.Contains(err.Error(), "context canceled") {
chk.E(err)
}
return
}
// track the IDs we've sent (use hex encoding for stable key)
seen[hexenc.Enc(ev.ID)] = struct{}{}
}
// Populate cache after successfully sending all events
// Cache the events themselves (not marshaled JSON with subscription ID)
if shouldCache && len(events) > 0 {
f := (*env.Filters)[0]
l.DB.CacheEvents(f, events)
log.D.F("REQ %s: cached %d events", env.Subscription, len(events))
}
// write the EOSE to signal to the client that all events found have been
// sent.
log.T.F("sending EOSE to %s", l.remote)
if err = eoseenvelope.NewFrom(env.Subscription).
Write(l); chk.E(err) {
return
}
// if the query was for just Ids, we know there can't be any more results,
// so cancel the subscription.
cancel := true
log.T.F(
"REQ %s: computing cancel/subscription; events_sent=%d",
env.Subscription, len(events),
)
var subbedFilters filter.S
for _, f := range *env.Filters {
// Check if this filter's limit was satisfied
limitSatisfied := false
if pointers.Present(f.Limit) {
if len(events) >= int(*f.Limit) {
limitSatisfied = true
}
}
if f.Ids.Len() < 1 {
// Filter has no IDs - keep subscription open unless limit was satisfied
if !limitSatisfied {
cancel = false
subbedFilters = append(subbedFilters, f)
}
} else {
// remove the IDs that we already sent, as it's one less
// comparison we have to make.
var notFounds [][]byte
for _, id := range f.Ids.T {
if _, ok := seen[hexenc.Enc(id)]; ok {
continue
}
notFounds = append(notFounds, id)
}
log.T.F(
"REQ %s: ids outstanding=%d of %d", env.Subscription,
len(notFounds), f.Ids.Len(),
)
// if all were found, don't add to subbedFilters
if len(notFounds) == 0 {
continue
}
// Check if limit was satisfied
if limitSatisfied {
continue
}
// rewrite the filter Ids to remove the ones we already sent
f.Ids = tag.NewFromBytesSlice(notFounds...)
// add the filter to the list of filters we're subscribing to
cancel = false
subbedFilters = append(subbedFilters, f)
}
}
receiver := make(event.C, 32)
// if the subscription should be cancelled, do so
if !cancel {
// Create a dedicated context for this subscription that's independent of query context
// but is child of the listener context so it gets cancelled when connection closes
subCtx, subCancel := context.WithCancel(l.ctx)
// Track this subscription so we can cancel it on CLOSE or connection close
subID := string(env.Subscription)
l.subscriptionsMu.Lock()
l.subscriptions[subID] = subCancel
l.subscriptionsMu.Unlock()
// Register subscription with publisher
// Set AuthRequired based on ACL mode - when ACL is "none", don't require auth for privileged events
authRequired := acl.Registry.Active.Load() != "none"
l.publishers.Receive(
&W{
Conn: l.conn,
remote: l.remote,
Id: subID,
Receiver: receiver,
Filters: &subbedFilters,
AuthedPubkey: l.authedPubkey.Load(),
AuthRequired: authRequired,
},
)
// Launch goroutine to consume from receiver channel and forward to client
// This is the critical missing piece - without this, the receiver channel fills up
// and the publisher times out trying to send, causing subscription to be removed
go func() {
defer func() {
// Clean up when subscription ends
l.subscriptionsMu.Lock()
delete(l.subscriptions, subID)
l.subscriptionsMu.Unlock()
log.D.F("subscription goroutine exiting for %s @ %s", subID, l.remote)
}()
for {
select {
case <-subCtx.Done():
// Subscription cancelled (CLOSE message or connection closing)
log.D.F("subscription %s cancelled for %s", subID, l.remote)
return
case ev, ok := <-receiver:
if !ok {
// Channel closed - subscription ended
log.D.F("subscription %s receiver channel closed for %s", subID, l.remote)
return
}
// Forward event to client via write channel
var res *eventenvelope.Result
var err error
if res, err = eventenvelope.NewResultWith(subID, ev); chk.E(err) {
log.E.F("failed to create event envelope for subscription %s: %v", subID, err)
continue
}
// Write to client - this goes through the write worker
if err = res.Write(l); err != nil {
if !strings.Contains(err.Error(), "context canceled") {
log.E.F("failed to write event to subscription %s @ %s: %v", subID, l.remote, err)
}
// Don't return here - write errors shouldn't kill the subscription
// The connection cleanup will handle removing the subscription
continue
}
log.D.F("delivered real-time event %s to subscription %s @ %s",
hexenc.Enc(ev.ID), subID, l.remote)
}
}
}()
log.D.F("subscription %s created and goroutine launched for %s", subID, l.remote)
} else {
// suppress server-sent CLOSED; client will close subscription if desired
log.D.F("subscription request cancelled immediately (all IDs found or limit satisfied)")
}
log.T.F("HandleReq: COMPLETED processing from %s", l.remote)
return
}