133 lines
3.7 KiB
Go
133 lines
3.7 KiB
Go
package openapi
|
|
|
|
import (
|
|
"errors"
|
|
"net/http"
|
|
|
|
"github.com/danielgtaylor/huma/v2"
|
|
"github.com/danielgtaylor/huma/v2/sse"
|
|
|
|
"realy.lol/chk"
|
|
"realy.lol/context"
|
|
"realy.lol/event"
|
|
"realy.lol/filter"
|
|
"realy.lol/filters"
|
|
"realy.lol/hex"
|
|
"realy.lol/httpauth"
|
|
"realy.lol/kind"
|
|
"realy.lol/kinds"
|
|
"realy.lol/log"
|
|
"realy.lol/publish"
|
|
"realy.lol/realy/helpers"
|
|
"realy.lol/tag"
|
|
"realy.lol/tags"
|
|
)
|
|
|
|
type SubscribeInput struct {
|
|
Auth string `header:"Authorization" doc:"nostr nip-98 (and expiring variant)" required:"false"`
|
|
Accept string `header:"Accept" default:"text/event-stream" enum:"text/event-stream" required:"true"`
|
|
// ContentType string `header:"Content-Type" default:"text/event-stream" enum:"text/event-stream" required:"true"`
|
|
Body SimpleFilter `body:"filter" doc:"filter criteria to match for events to return"`
|
|
}
|
|
|
|
func (fi SubscribeInput) ToFilter() (f *filter.T, err error) {
|
|
f = filter.New()
|
|
var ks []*kind.T
|
|
for _, k := range fi.Body.Kinds {
|
|
ks = append(ks, kind.New(k))
|
|
}
|
|
f.Kinds = kinds.New(ks...)
|
|
var as [][]byte
|
|
for _, a := range fi.Body.Authors {
|
|
var b []byte
|
|
if b, err = hex.Dec(a); chk.E(err) {
|
|
return
|
|
}
|
|
as = append(as, b)
|
|
}
|
|
f.Authors = tag.New(as...)
|
|
var ts []*tag.T
|
|
for _, t := range fi.Body.Tags {
|
|
ts = append(ts, tag.New(t...))
|
|
}
|
|
f.Tags = tags.New(ts...)
|
|
return
|
|
}
|
|
|
|
func (x *Operations) RegisterSubscribe(api huma.API) {
|
|
name := "Subscribe"
|
|
description := "Subscribe for newly published events by author, kind or tags; empty also allowed, which just sends all incoming events - uses Server Sent Events format for compatibility with standard libraries."
|
|
path := x.path + "/subscribe"
|
|
scopes := []string{"user", "read"}
|
|
method := http.MethodPost
|
|
sse.Register(api, huma.Operation{
|
|
OperationID: name,
|
|
Summary: name,
|
|
Path: path,
|
|
Method: method,
|
|
Tags: []string{"events"},
|
|
Description: helpers.GenerateDescription(description, scopes),
|
|
Security: []map[string][]string{{"auth": scopes}},
|
|
},
|
|
map[string]any{
|
|
"event": event.J{},
|
|
},
|
|
func(ctx context.T, input *SubscribeInput, send sse.Sender) {
|
|
var err error
|
|
var f *filter.T
|
|
if f, err = input.ToFilter(); chk.E(err) {
|
|
err = huma.Error422UnprocessableEntity(err.Error())
|
|
return
|
|
}
|
|
log.I.F("%s", f.Marshal(nil))
|
|
r := ctx.Value("http-request").(*http.Request)
|
|
remote := helpers.GetRemoteFromReq(r)
|
|
var valid bool
|
|
var pubkey []byte
|
|
valid, pubkey, err = httpauth.CheckAuth(r)
|
|
// if there is an error but not that the token is missing, or there is no error
|
|
// but the signature is invalid, return error that request is unauthorized.
|
|
if err != nil && !errors.Is(err, httpauth.ErrMissingKey) {
|
|
err = huma.Error400BadRequest(err.Error())
|
|
return
|
|
}
|
|
err = nil
|
|
if !valid {
|
|
err = huma.Error401Unauthorized("Authorization header is invalid")
|
|
return
|
|
}
|
|
allowed := filters.New(f)
|
|
var accepted, modified bool
|
|
allowed, accepted, modified = x.Server.AcceptReq(x.Context(), r, nil, filters.New(f), remote)
|
|
if !accepted {
|
|
err = huma.Error401Unauthorized("auth to get access for this filter")
|
|
return
|
|
} else if modified {
|
|
log.D.F("filter modified %s", allowed.F[0])
|
|
}
|
|
if len(allowed.F) == 0 {
|
|
err = huma.Error401Unauthorized("all kinds in event restricted; auth to get access for this filter")
|
|
return
|
|
}
|
|
// register the filter with the listeners
|
|
receiver := make(event.C, 32)
|
|
publish.P.Receive(&H{
|
|
Ctx: r.Context(),
|
|
Receiver: receiver,
|
|
Pubkey: pubkey,
|
|
Filter: f,
|
|
})
|
|
out:
|
|
for {
|
|
select {
|
|
case <-r.Context().Done():
|
|
break out
|
|
case ev := <-receiver:
|
|
if err = send.Data(ev.ToEventJ()); chk.E(err) {
|
|
}
|
|
}
|
|
}
|
|
return
|
|
})
|
|
}
|