working http SSE subscribe

and fixed non-authed public readable subscriptions
This commit is contained in:
2025-03-29 09:11:31 -01:06
parent f890bb71e0
commit dc86c9b297
10 changed files with 107 additions and 102 deletions

View File

@@ -2,7 +2,6 @@ package httpauth
import (
"encoding/base64"
"errors"
"fmt"
"net/http"
"strings"
@@ -13,8 +12,8 @@ import (
"realy.lol/tag"
)
var ErrMissingKey = errors.New(fmt.Sprintf(
"'%s' key missing from request header", HeaderKey))
var ErrMissingKey = fmt.Errorf(
"'%s' key missing from request header", HeaderKey)
// CheckAuth verifies a received http.Request has got a valid
// authentication event or token in it, and provides the public key that should be

View File

@@ -65,7 +65,7 @@ func (s *Server) addEvent(c context.T, rl relay.I, ev *event.T,
if ar, ok := rl.(relay.Authenticator); ok {
authRequired = ar.AuthEnabled()
}
s.Listeners.NotifyListeners(authRequired, ev)
s.Listeners.NotifyListeners(authRequired, s.publicReadable, ev)
accepted = true
log.I.F("event id %0x stored", ev.ID)
return

View File

@@ -68,7 +68,9 @@ func (ep *Event) RegisterEvent(api huma.API) {
// but the signature is invalid, return error that request is unauthorized.
if err != nil && !errors.Is(err, httpauth.ErrMissingKey) {
err = huma.Error400BadRequest(err.Error())
return
}
err = nil
if !valid {
err = huma.Error401Unauthorized("Authorization header is invalid")
return
@@ -218,10 +220,11 @@ func (ep *Event) RegisterEvent(api huma.API) {
output = &EventOutput{"event accepted"}
// notify subscribers
var authRequired bool
if ar, ok := s.relay.(relay.Authenticator); ok {
var ar relay.Authenticator
if ar, ok = s.relay.(relay.Authenticator); ok {
authRequired = ar.AuthEnabled()
}
s.Listeners.NotifyListeners(authRequired, ev)
s.Listeners.NotifyListeners(authRequired, s.publicReadable, ev)
return
})
}

View File

@@ -62,6 +62,13 @@ func (ep *Events) RegisterEvents(api huma.API) {
var valid bool
var pubkey []byte
valid, pubkey, err = httpauth.CheckAuth(r, ep.JWTVerifyFunc)
// if there is an error but not that the token is missing, or there is no error
// but the signature is invalid, return error that request is unauthorized.
if err != nil && !errors.Is(err, httpauth.ErrMissingKey) {
err = huma.Error400BadRequest(err.Error())
return
}
err = nil
if authrequired && len(pubkey) != schnorr.PubKeyBytesLen {
err = huma.Error400BadRequest(
"cannot process more than 1000 events in a request without being authenticated")
@@ -85,12 +92,6 @@ func (ep *Events) RegisterEvents(api huma.API) {
}
}
}
// if there is an error but not that the token is missing, or there is no error
// but the signature is invalid, return error that request is unauthorized.
if err != nil && !errors.Is(err, httpauth.ErrMissingKey) {
err = huma.Error400BadRequest(err.Error())
return
}
if !valid {
err = huma.Error401Unauthorized("Authorization header is invalid")
return

View File

@@ -57,50 +57,8 @@ func (ep *Export) RegisterExport(api huma.API) {
} else {
log.W.F("error: unable to flush")
}
},
}
return
})
}
// func (ep *Export) RegisterExport(api huma.API) {
// name := "Export"
// description := "Export all events (only works with NIP-98/JWT capable client, will not work with UI), produces standard HTTP SSE format - event: event (new line) data: (wire format json of event) (newline)"
// path := "/export"
// scopes := []string{"admin", "read"}
// method := http.MethodGet
// sse.Register(api, huma.Operation{
// OperationID: name,
// Summary: name,
// Path: path,
// Method: method,
// Tags: []string{"admin"},
// Description: generateDescription(description, scopes),
// Security: []map[string][]string{{"auth": scopes}},
// DefaultStatus: 204,
// }, map[string]any{
// // "event": event.J{},
// }, func(ctx context.T, input *ExportInput, send sse.Sender) {
// s := ep.Server
// w := make(chan *event.J)
// var err error
// go func() {
// // start up the sender
// for ev := range w {
// send(sse.Message{})
// if err = send.Data(ev); chk.E(err) {
// return
// }
// }
// }()
// sto := s.relay.Storage()
// if exporter, ok := sto.(store.ConcurrentExporter); ok {
// // start the exporter
// exporter.ExportConcurrent(s.Ctx, w)
// } else {
// log.I.F("no concurrent exporter available")
// return
// }
// })
// }

View File

@@ -106,19 +106,21 @@ func (ep *Filter) RegisterFilter(api huma.API) {
// w := ctx.Value("http-response").(http.ResponseWriter)
rr := GetRemoteFromReq(r)
// s := ep.Server
var valid bool
var pubkey []byte
valid, pubkey, err = httpauth.CheckAuth(r, ep.JWTVerifyFunc)
if len(input.Body.Authors) < 1 && len(input.Body.Kinds) < 1 && len(input.Body.Tags) < 1 {
err = huma.Error400BadRequest(
"cannot process filter with none of Authors/Kinds/Tags")
return
}
var valid bool
var pubkey []byte
valid, pubkey, err = httpauth.CheckAuth(r, ep.JWTVerifyFunc)
// if there is an error but not that the token is missing, or there is no error
// but the signature is invalid, return error that request is unauthorized.
if err != nil && !errors.Is(err, httpauth.ErrMissingKey) {
err = huma.Error400BadRequest(err.Error())
return
}
err = nil
if !valid {
err = huma.Error401Unauthorized("Authorization header is invalid")
return

View File

@@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"net/http"
"time"
"github.com/danielgtaylor/huma/v2"
@@ -19,7 +20,6 @@ import (
"realy.lol/relay"
"realy.lol/tag"
"realy.lol/tags"
"realy.lol/timestamp"
)
type Subscribe struct{ *Server }
@@ -27,12 +27,8 @@ type Subscribe struct{ *Server }
func NewSubscribe(s *Server) (ep *Subscribe) { return &Subscribe{Server: s} }
type SubscribeInput struct {
Auth string `header:"Authorization" doc:"nostr nip-98 or JWT token for authentication" required:"false" example:"Bearer eyJhbGciOiJFUzI1NiIsInR5cCI6IkpXVCJ9.eyJhbGciOiJFUzI1N2ZGFkNjZlNDdkYjJmIiwic3ViIjoiaHR0cDovLzEyNy4wLjAuMSJ9.cHT_pB3wTLxUNOqxYL6fxAYUJXNKBXcOnYLlkO1nwa7BHr9pOTQzNywJpc3MM2I0N2UziOiI0YzgwMDI1N2E1ODhhODI4NDlkMDIsImV4cCIQ5ODE3YzJiZGFhZDk4NGMgYtGi6MTc0Mjg40NWFkOWYCzvHyiXtIyNWEVZiaWF0IjoxNzQyNjMwMjM3LClZPtt0w_dJxEpYcSIEcY4wg"`
Since int64 `query:"since" doc:"timestamp of the oldest events to return (inclusive)"`
Until int64 `query:"until" doc:"timestamp of the newest events to return (inclusive)"`
Limit uint `query:"limit" doc:"maximum number of results to return"`
Sort string `query:"sort" enum:"asc,desc" default:"desc" doc:"sort order by created_at timestamp"`
Body SimpleFilter `body:"filter" doc:"filter criteria to match for events to return"`
Auth string `header:"Authorization" doc:"nostr nip-98 or JWT token for authentication" required:"false" example:"Bearer eyJhbGciOiJFUzI1NiIsInR5cCI6IkpXVCJ9.eyJhbGciOiJFUzI1N2ZGFkNjZlNDdkYjJmIiwic3ViIjoiaHR0cDovLzEyNy4wLjAuMSJ9.cHT_pB3wTLxUNOqxYL6fxAYUJXNKBXcOnYLlkO1nwa7BHr9pOTQzNywJpc3MM2I0N2UziOiI0YzgwMDI1N2E1ODhhODI4NDlkMDIsImV4cCIQ5ODE3YzJiZGFhZDk4NGMgYtGi6MTc0Mjg40NWFkOWYCzvHyiXtIyNWEVZiaWF0IjoxNzQyNjMwMjM3LClZPtt0w_dJxEpYcSIEcY4wg"`
Body SimpleFilter `body:"filter" doc:"filter criteria to match for events to return"`
}
func (fi SubscribeInput) ToFilter() (f *filter.T, err error) {
@@ -56,21 +52,12 @@ func (fi SubscribeInput) ToFilter() (f *filter.T, err error) {
ts = append(ts, tag.New(t...))
}
f.Tags = tags.New(ts...)
if fi.Limit != 0 {
f.Limit = &fi.Limit
}
if fi.Since != 0 {
f.Since = timestamp.NewFromUnix(fi.Since)
}
if fi.Until != 0 {
f.Until = timestamp.NewFromUnix(fi.Until)
}
return
}
func (ep *Subscribe) RegisterSubscribe(api huma.API) {
name := "Subscribe"
description := "Search for events and receive a sorted list of event Ids (one of authors, kinds or tags must be present)"
description := "Subscribe for newly published events by author, kind or tags (empty also allowed)"
path := "/subscribe"
scopes := []string{"user", "read"}
method := http.MethodPost
@@ -97,16 +84,13 @@ func (ep *Subscribe) RegisterSubscribe(api huma.API) {
var valid bool
var pubkey []byte
valid, pubkey, err = httpauth.CheckAuth(r, ep.JWTVerifyFunc)
if len(input.Body.Authors) < 1 && len(input.Body.Kinds) < 1 && len(input.Body.Tags) < 1 {
err = huma.Error400BadRequest(
"cannot process filter with none of Authors/Kinds/Tags")
return
}
// if there is an error but not that the token is missing, or there is no error
// but the signature is invalid, return error that request is unauthorized.
if err != nil && !errors.Is(err, httpauth.ErrMissingKey) {
err = huma.Error400BadRequest(err.Error())
return
}
err = nil
if !valid {
err = huma.Error401Unauthorized("Authorization header is invalid")
return
@@ -151,14 +135,24 @@ func (ep *Subscribe) RegisterSubscribe(api huma.API) {
}
// register the filter with the Listeners
receiver := make(event.C, 32)
s.Listeners.Hchan <- listeners.H{Ctx: r.Context(), Receiver: receiver}
s.Listeners.Hchan <- listeners.H{
Ctx: r.Context(),
Receiver: receiver,
Pubkey: pubkey,
Filter: f,
}
output = &huma.StreamResponse{
func(ctx huma.Context) {
ctx.SetHeader("Content-Type", "application/x-jsonl")
ctx.SetHeader("Content-Type", "text/event-stream")
ctx.SetHeader("X-Accel-Buffering", "no")
ctx.SetHeader("Cache-Control", "no-cache")
w := ctx.BodyWriter()
tick := time.NewTicker(time.Second)
out:
for {
select {
case <-tick.C:
log.I.F("tick")
case <-r.Context().Done():
break out
case ev := <-receiver:

View File

@@ -16,6 +16,7 @@ import (
"realy.lol/ec/bech32"
"realy.lol/envelopes/eventenvelope"
"realy.lol/event"
"realy.lol/filter"
"realy.lol/filters"
"realy.lol/tag"
"realy.lol/units"
@@ -34,15 +35,19 @@ type (
// Receiver is a channel that the listener sends subscription events to for http
// subscribe endpoint.
Receiver event.C
// Pubkey is the pubkey authed to this subscription
Pubkey []byte
// Filter is the filter associated with the http subscription
Filter *filter.T
}
HMap map[*H]struct{}
Subs map[*H]struct{}
T struct {
Ctx context.T
sync.Mutex
Map
HMap
Subs
Hchan chan H
Hlock sync.Mutex
ChallengeHRP string
@@ -75,7 +80,7 @@ func New(ctx context.T) (l *T) {
l = &T{
Ctx: ctx,
Map: make(Map),
HMap: make(HMap),
Subs: make(Subs),
Hchan: make(chan H),
ChallengeHRP: DefaultChallengeHRP,
WriteWait: DefaultWriteWait,
@@ -91,7 +96,7 @@ func New(ctx context.T) (l *T) {
return
case h := <-l.Hchan:
l.Hlock.Lock()
l.HMap[&h] = struct{}{}
l.Subs[&h] = struct{}{}
l.Hlock.Unlock()
}
}
@@ -147,17 +152,18 @@ func (l *T) RemoveListener(ws *web.Socket) {
l.Mutex.Unlock()
}
func (l *T) NotifyListeners(authRequired bool, ev *event.T) {
func (l *T) NotifyListeners(authRequired, publicReadable bool, ev *event.T) {
if ev == nil {
return
}
var err error
l.Mutex.Lock()
defer l.Mutex.Unlock()
for ws, subs := range l.Map {
for id, listener := range subs {
if authRequired && !ws.IsAuthed() {
continue
if !publicReadable {
if authRequired && !ws.IsAuthed() {
continue
}
}
if !listener.filters.Match(ev) {
continue
@@ -170,7 +176,7 @@ func (l *T) NotifyListeners(authRequired bool, ev *event.T) {
}
if !bytes.Equal(ev.PubKey, ab) || containsPubkey {
log.I.F("authed user %0x not privileged to receive event\n%s",
ws.AuthedBytes(), ev.Serialize())
ab, ev.Serialize())
continue
}
}
@@ -183,4 +189,45 @@ func (l *T) NotifyListeners(authRequired bool, ev *event.T) {
}
}
}
l.Mutex.Unlock()
l.Hlock.Lock()
var subs []*H
for sub := range l.Subs {
// check if the subscription's subscriber is still alive
select {
case <-sub.Ctx.Done():
subs = append(subs, sub)
default:
}
}
for _, sub := range subs {
delete(l.Subs, sub)
}
subs = subs[:0]
for sub := range l.Subs {
// if auth required, check the subscription pubkey matches
if !publicReadable {
if authRequired && len(sub.Pubkey) == 0 {
continue
}
}
// if the filter doesn't match, skip
if !sub.Filter.Matches(ev) {
continue
}
// if the filter is privileged and the user doesn't have matching auth, skip
if ev.Kind.IsPrivileged() {
ab := sub.Pubkey
var containsPubkey bool
if ev.Tags != nil {
containsPubkey = ev.Tags.ContainsAny([]byte{'p'}, tag.New(ab))
}
if !bytes.Equal(ev.PubKey, ab) || containsPubkey {
continue
}
}
// send the event to the subscriber
sub.Receiver <- ev
}
l.Hlock.Unlock()
}

View File

@@ -71,7 +71,7 @@ func NewServer(sp *ServerParams, opts ...options.O) (*Server, error) {
return nil, fmt.Errorf("realy init: %w", err)
}
serveMux := NewServeMux()
srv := &Server{
s := &Server{
Ctx: sp.Ctx,
Cancel: sp.Cancel,
relay: sp.Rl,
@@ -86,25 +86,26 @@ func NewServer(sp *ServerParams, opts ...options.O) (*Server, error) {
Listeners: listeners.New(sp.Ctx),
API: NewHuma(serveMux, sp.Rl.Name(), realy_lol.Version, realy_lol.Description),
}
huma.AutoRegister(srv.API, NewEvent(srv))
huma.AutoRegister(srv.API, NewFilter(srv))
huma.AutoRegister(srv.API, NewEvents(srv))
huma.AutoRegister(s.API, NewEvent(s))
huma.AutoRegister(s.API, NewFilter(s))
huma.AutoRegister(s.API, NewEvents(s))
huma.AutoRegister(s.API, NewSubscribe(s))
huma.AutoRegister(srv.API, NewExport(srv))
huma.AutoRegister(srv.API, NewImport(srv))
huma.AutoRegister(s.API, NewExport(s))
huma.AutoRegister(s.API, NewImport(s))
huma.AutoRegister(srv.API, NewRescan(srv))
huma.AutoRegister(srv.API, NewShutdown(srv))
huma.AutoRegister(srv.API, NewNuke(srv))
huma.AutoRegister(s.API, NewRescan(s))
huma.AutoRegister(s.API, NewShutdown(s))
huma.AutoRegister(s.API, NewNuke(s))
if inj, ok := sp.Rl.(relay.Injector); ok {
go func() {
for ev := range inj.InjectEvents() {
srv.Listeners.NotifyListeners(srv.authRequired, ev)
s.Listeners.NotifyListeners(s.authRequired, s.publicReadable, ev)
}
}()
}
return srv, nil
return s, nil
}
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {

View File

@@ -1 +1 @@
v1.10.4
v1.11.0