From af04f89df8c5f7931f7b87aaa32abc0a76b412dc Mon Sep 17 00:00:00 2001 From: mleku Date: Wed, 23 Jul 2025 04:10:50 +0100 Subject: [PATCH] Introduce ServeMux and OpenAPI export endpoint --- main.go | 21 ++++++++-- pkg/app/relay/admin-auth.go | 40 +++++++++++++++++++ pkg/app/relay/server.go | 11 ++++-- pkg/app/relay/testrelay.go | 3 ++ pkg/database/export.go | 26 +++++++++---- pkg/interfaces/server/server.go | 4 ++ pkg/protocol/openapi/export.go | 69 +++++++++++++++++++++++++++++++++ pkg/protocol/openapi/huma.go | 5 --- 8 files changed, 159 insertions(+), 20 deletions(-) create mode 100644 pkg/app/relay/admin-auth.go create mode 100644 pkg/protocol/openapi/export.go diff --git a/main.go b/main.go index dab2cb6..a1c3e00 100644 --- a/main.go +++ b/main.go @@ -7,6 +7,8 @@ import ( "fmt" "net/http" _ "net/http/pprof" + "orly.dev/pkg/protocol/openapi" + "orly.dev/pkg/protocol/servemux" "os" "github.com/pkg/profile" @@ -50,8 +52,10 @@ func main() { }() } c, cancel := context.Cancel(context.Bg()) - storage, err := database.New(c, cancel, cfg.DataDir, cfg.DbLogLevel) - if chk.E(err) { + var storage *database.D + if storage, err = database.New( + c, cancel, cfg.DataDir, cfg.DbLogLevel, + ); chk.E(err) { os.Exit(1) } r := &app2.Relay{C: cfg, Store: storage} @@ -66,9 +70,20 @@ func main() { C: cfg, } var opts []options.O - if server, err = relay.NewServer(serverParams, opts...); chk.E(err) { + serveMux := servemux.NewServeMux() + if server, err = relay.NewServer( + serverParams, serveMux, opts..., + ); chk.E(err) { os.Exit(1) } + openapi.New( + server, + cfg.AppName, + version.V, + version.Description, + "/api", + serveMux, + ) if err != nil { log.F.F("failed to create server: %v", err) } diff --git a/pkg/app/relay/admin-auth.go b/pkg/app/relay/admin-auth.go new file mode 100644 index 0000000..3daa020 --- /dev/null +++ b/pkg/app/relay/admin-auth.go @@ -0,0 +1,40 @@ +package relay + +import ( + "bytes" + "net/http" + "orly.dev/pkg/protocol/httpauth" + "orly.dev/pkg/utils/chk" + "orly.dev/pkg/utils/log" + "time" +) + +func (s *Server) AdminAuth( + r *http.Request, remote string, + tolerance ...time.Duration, +) (authed bool, pubkey []byte) { + var valid bool + var err error + var tolerate time.Duration + if len(tolerance) > 0 { + tolerate = tolerance[0] + } + if valid, pubkey, err = httpauth.CheckAuth(r, tolerate); chk.E(err) { + return + } + if !valid { + log.E.F( + "invalid auth %s from %s", + r.Header.Get("Authorization"), remote, + ) + return + } + for _, pk := range s.ownersPubkeys { + + if bytes.Equal(pk, pubkey) { + authed = true + return + } + } + return +} diff --git a/pkg/app/relay/server.go b/pkg/app/relay/server.go index 08150f1..afe9235 100644 --- a/pkg/app/relay/server.go +++ b/pkg/app/relay/server.go @@ -37,6 +37,7 @@ type Server struct { listeners *publish.S *config.C *Lists + Mux *servemux.S } // ServerParams represents the configuration parameters for initializing a @@ -48,6 +49,7 @@ type ServerParams struct { Rl relay.I DbPath string MaxLimit int + Mux *servemux.S *config.C } @@ -78,7 +80,9 @@ type ServerParams struct { // - Sets up a ServeMux for handling HTTP requests. // // - Initializes the relay, starting its operation in a separate goroutine. -func NewServer(sp *ServerParams, opts ...options.O) (s *Server, err error) { +func NewServer( + sp *ServerParams, serveMux *servemux.S, opts ...options.O, +) (s *Server, err error) { op := options.Default() for _, opt := range opts { opt(op) @@ -88,7 +92,6 @@ func NewServer(sp *ServerParams, opts ...options.O) (s *Server, err error) { return nil, fmt.Errorf("storage init: %w", err) } } - serveMux := servemux.NewServeMux() s = &Server{ Ctx: sp.Ctx, Cancel: sp.Cancel, @@ -209,8 +212,8 @@ func (s *Server) Start( }() addr := net.JoinHostPort(host, strconv.Itoa(port)) log.I.F("starting relay listener at %s", addr) - ln, err := net.Listen("tcp", addr) - if err != nil { + var ln net.Listener + if ln, err = net.Listen("tcp", addr); err != nil { return err } s.httpServer = &http.Server{ diff --git a/pkg/app/relay/testrelay.go b/pkg/app/relay/testrelay.go index 2afd92f..091bce1 100644 --- a/pkg/app/relay/testrelay.go +++ b/pkg/app/relay/testrelay.go @@ -7,6 +7,7 @@ import ( "orly.dev/pkg/encoders/eventid" "orly.dev/pkg/encoders/filter" "orly.dev/pkg/interfaces/store" + "orly.dev/pkg/protocol/servemux" "orly.dev/pkg/utils/context" "orly.dev/pkg/utils/units" "testing" @@ -14,6 +15,7 @@ import ( func startTestRelay(c context.T, t *testing.T, tr *testRelay) *Server { t.Helper() + serveMux := servemux.NewServeMux() srv, _ := NewServer( &ServerParams{ Ctx: c, @@ -21,6 +23,7 @@ func startTestRelay(c context.T, t *testing.T, tr *testRelay) *Server { Rl: tr, MaxLimit: 500 * units.Kb, }, + serveMux, ) started := make(chan bool) go srv.Start("127.0.0.1", 0, started) diff --git a/pkg/database/export.go b/pkg/database/export.go index 0284722..bcc0a53 100644 --- a/pkg/database/export.go +++ b/pkg/database/export.go @@ -17,6 +17,8 @@ import ( // JSON. func (d *D) Export(c context.T, w io.Writer, pubkeys ...[]byte) { var err error + evB := make([]byte, 0, units.Mb) + evBuf := bytes.NewBuffer(evB) if len(pubkeys) == 0 { if err = d.View( func(txn *badger.Txn) (err error) { @@ -26,25 +28,29 @@ func (d *D) Export(c context.T, w io.Writer, pubkeys ...[]byte) { return } it := txn.NewIterator(badger.IteratorOptions{Prefix: buf.Bytes()}) - evB := make([]byte, 0, units.Mb) defer it.Close() for it.Rewind(); it.Valid(); it.Next() { item := it.Item() - if evB, err = item.ValueCopy(evB); chk.E(err) { + if err = item.Value( + func(val []byte) (err error) { + evBuf.Write(val) + return + }, + ); chk.E(err) { continue } - evBuf := bytes.NewBuffer(evB) ev := event.New() if err = ev.UnmarshalBinary(evBuf); chk.E(err) { continue } // Serialize the event to JSON and write it to the output if _, err = w.Write(ev.Serialize()); chk.E(err) { - continue + return } if _, err = w.Write([]byte{'\n'}); chk.E(err) { - continue + return } + evBuf.Reset() } return }, @@ -67,14 +73,17 @@ func (d *D) Export(c context.T, w io.Writer, pubkeys ...[]byte) { return } it := txn.NewIterator(badger.IteratorOptions{Prefix: pkBuf.Bytes()}) - evB := make([]byte, 0, units.Mb) defer it.Close() for it.Rewind(); it.Valid(); it.Next() { item := it.Item() - if evB, err = item.ValueCopy(evB); chk.E(err) { + if err = item.Value( + func(val []byte) (err error) { + evBuf.Write(val) + return + }, + ); chk.E(err) { continue } - evBuf := bytes.NewBuffer(evB) ev := event.New() if err = ev.UnmarshalBinary(evBuf); chk.E(err) { continue @@ -86,6 +95,7 @@ func (d *D) Export(c context.T, w io.Writer, pubkeys ...[]byte) { if _, err = w.Write([]byte{'\n'}); chk.E(err) { continue } + evBuf.Reset() } return }, diff --git a/pkg/interfaces/server/server.go b/pkg/interfaces/server/server.go index 3696dfe..8c2bb5a 100644 --- a/pkg/interfaces/server/server.go +++ b/pkg/interfaces/server/server.go @@ -8,6 +8,7 @@ import ( "orly.dev/pkg/interfaces/relay" "orly.dev/pkg/interfaces/store" "orly.dev/pkg/utils/context" + "time" ) type I interface { @@ -22,6 +23,9 @@ type I interface { AddEvent( c context.T, rl relay.I, ev *event.E, hr *http.Request, origin string, ) (accepted bool, message []byte) + AdminAuth( + r *http.Request, remote string, tolerance ...time.Duration, + ) (authed bool, pubkey []byte) Context() context.T Publisher() *publish.S Publish(c context.T, evt *event.E) (err error) diff --git a/pkg/protocol/openapi/export.go b/pkg/protocol/openapi/export.go new file mode 100644 index 0000000..a9dfc02 --- /dev/null +++ b/pkg/protocol/openapi/export.go @@ -0,0 +1,69 @@ +package openapi + +import ( + "github.com/danielgtaylor/huma/v2" + "net/http" + "orly.dev/pkg/app/relay/helpers" + "orly.dev/pkg/utils/context" + "orly.dev/pkg/utils/log" + "orly.dev/pkg/utils/lol" +) + +// ExportInput is the parameters for the HTTP API Export method. +type ExportInput struct { + Auth string `header:"Authorization" doc:"nostr nip-98 (and expiring variant)" required:"true"` +} + +// ExportOutput is the return value of Export. It usually will be line structured JSON. In +// future there may be more output formats. +type ExportOutput struct{ RawBody []byte } + +// RegisterExport implements the Export HTTP API method. +func (x *Operations) RegisterExport(api huma.API) { + lol.Tracer("RegisterExport") + defer func() { lol.Tracer("end RegisterExport") }() + name := "Export" + description := "Export all events (only works with NIP-98 capable client, will not work with UI)" + path := x.path + "/export" + scopes := []string{"admin", "read"} + method := http.MethodGet + huma.Register( + api, huma.Operation{ + OperationID: name, + Summary: name, + Path: path, + Method: method, + Tags: []string{"admin"}, + Description: helpers.GenerateDescription(description, scopes), + Security: []map[string][]string{{"auth": scopes}}, + }, func(ctx context.T, input *ExportInput) ( + resp *huma.StreamResponse, err error, + ) { + r := ctx.Value("http-request").(*http.Request) + remote := helpers.GetRemoteFromReq(r) + log.I.F("processing export from %s", remote) + authed, pubkey := x.AdminAuth(r, remote) + if !authed { + err = huma.Error401Unauthorized("Not Authorized") + return + } + log.I.F( + "%s export of event data requested on admin port pubkey %0x", + remote, pubkey, + ) + sto := x.Storage() + resp = &huma.StreamResponse{ + Body: func(ctx huma.Context) { + ctx.SetHeader("Content-Type", "application/nostr+jsonl") + sto.Export(x.Context(), ctx.BodyWriter()) + if f, ok := ctx.BodyWriter().(http.Flusher); ok { + f.Flush() + } else { + log.W.F("error: unable to flush") + } + }, + } + return + }, + ) +} diff --git a/pkg/protocol/openapi/huma.go b/pkg/protocol/openapi/huma.go index 5b55969..60f9723 100644 --- a/pkg/protocol/openapi/huma.go +++ b/pkg/protocol/openapi/huma.go @@ -7,14 +7,11 @@ import ( "github.com/danielgtaylor/huma/v2/adapters/humago" "orly.dev/pkg/protocol/servemux" - "orly.dev/pkg/utils/lol" ) // ExposeMiddleware adds the http.Request and http.ResponseWriter to the context // for the Operations handler. func ExposeMiddleware(ctx huma.Context, next func(huma.Context)) { - lol.Tracer("ExposeMiddleware") - defer func() { lol.Tracer("end ExposeMiddleware") }() // Unwrap the request and response objects. r, w := humago.Unwrap(ctx) ctx = huma.WithValue(ctx, "http-request", r) @@ -27,8 +24,6 @@ func ExposeMiddleware(ctx huma.Context, next func(huma.Context)) { func NewHuma( router *servemux.S, name, version, description string, ) (api huma.API) { - lol.Tracer("NewHuma", name, version, description) - defer func() { lol.Tracer("end NewHuma") }() config := huma.DefaultConfig(name, version) config.Info.Description = description config.DocsPath = ""