Introduce ServeMux and OpenAPI export endpoint

This commit is contained in:
2025-07-23 04:10:50 +01:00
parent fab2f104ff
commit af04f89df8
8 changed files with 159 additions and 20 deletions

21
main.go
View File

@@ -7,6 +7,8 @@ import (
"fmt" "fmt"
"net/http" "net/http"
_ "net/http/pprof" _ "net/http/pprof"
"orly.dev/pkg/protocol/openapi"
"orly.dev/pkg/protocol/servemux"
"os" "os"
"github.com/pkg/profile" "github.com/pkg/profile"
@@ -50,8 +52,10 @@ func main() {
}() }()
} }
c, cancel := context.Cancel(context.Bg()) c, cancel := context.Cancel(context.Bg())
storage, err := database.New(c, cancel, cfg.DataDir, cfg.DbLogLevel) var storage *database.D
if chk.E(err) { if storage, err = database.New(
c, cancel, cfg.DataDir, cfg.DbLogLevel,
); chk.E(err) {
os.Exit(1) os.Exit(1)
} }
r := &app2.Relay{C: cfg, Store: storage} r := &app2.Relay{C: cfg, Store: storage}
@@ -66,9 +70,20 @@ func main() {
C: cfg, C: cfg,
} }
var opts []options.O var opts []options.O
if server, err = relay.NewServer(serverParams, opts...); chk.E(err) { serveMux := servemux.NewServeMux()
if server, err = relay.NewServer(
serverParams, serveMux, opts...,
); chk.E(err) {
os.Exit(1) os.Exit(1)
} }
openapi.New(
server,
cfg.AppName,
version.V,
version.Description,
"/api",
serveMux,
)
if err != nil { if err != nil {
log.F.F("failed to create server: %v", err) log.F.F("failed to create server: %v", err)
} }

View File

@@ -0,0 +1,40 @@
package relay
import (
"bytes"
"net/http"
"orly.dev/pkg/protocol/httpauth"
"orly.dev/pkg/utils/chk"
"orly.dev/pkg/utils/log"
"time"
)
func (s *Server) AdminAuth(
r *http.Request, remote string,
tolerance ...time.Duration,
) (authed bool, pubkey []byte) {
var valid bool
var err error
var tolerate time.Duration
if len(tolerance) > 0 {
tolerate = tolerance[0]
}
if valid, pubkey, err = httpauth.CheckAuth(r, tolerate); chk.E(err) {
return
}
if !valid {
log.E.F(
"invalid auth %s from %s",
r.Header.Get("Authorization"), remote,
)
return
}
for _, pk := range s.ownersPubkeys {
if bytes.Equal(pk, pubkey) {
authed = true
return
}
}
return
}

View File

@@ -37,6 +37,7 @@ type Server struct {
listeners *publish.S listeners *publish.S
*config.C *config.C
*Lists *Lists
Mux *servemux.S
} }
// ServerParams represents the configuration parameters for initializing a // ServerParams represents the configuration parameters for initializing a
@@ -48,6 +49,7 @@ type ServerParams struct {
Rl relay.I Rl relay.I
DbPath string DbPath string
MaxLimit int MaxLimit int
Mux *servemux.S
*config.C *config.C
} }
@@ -78,7 +80,9 @@ type ServerParams struct {
// - Sets up a ServeMux for handling HTTP requests. // - Sets up a ServeMux for handling HTTP requests.
// //
// - Initializes the relay, starting its operation in a separate goroutine. // - Initializes the relay, starting its operation in a separate goroutine.
func NewServer(sp *ServerParams, opts ...options.O) (s *Server, err error) { func NewServer(
sp *ServerParams, serveMux *servemux.S, opts ...options.O,
) (s *Server, err error) {
op := options.Default() op := options.Default()
for _, opt := range opts { for _, opt := range opts {
opt(op) opt(op)
@@ -88,7 +92,6 @@ func NewServer(sp *ServerParams, opts ...options.O) (s *Server, err error) {
return nil, fmt.Errorf("storage init: %w", err) return nil, fmt.Errorf("storage init: %w", err)
} }
} }
serveMux := servemux.NewServeMux()
s = &Server{ s = &Server{
Ctx: sp.Ctx, Ctx: sp.Ctx,
Cancel: sp.Cancel, Cancel: sp.Cancel,
@@ -209,8 +212,8 @@ func (s *Server) Start(
}() }()
addr := net.JoinHostPort(host, strconv.Itoa(port)) addr := net.JoinHostPort(host, strconv.Itoa(port))
log.I.F("starting relay listener at %s", addr) log.I.F("starting relay listener at %s", addr)
ln, err := net.Listen("tcp", addr) var ln net.Listener
if err != nil { if ln, err = net.Listen("tcp", addr); err != nil {
return err return err
} }
s.httpServer = &http.Server{ s.httpServer = &http.Server{

View File

@@ -7,6 +7,7 @@ import (
"orly.dev/pkg/encoders/eventid" "orly.dev/pkg/encoders/eventid"
"orly.dev/pkg/encoders/filter" "orly.dev/pkg/encoders/filter"
"orly.dev/pkg/interfaces/store" "orly.dev/pkg/interfaces/store"
"orly.dev/pkg/protocol/servemux"
"orly.dev/pkg/utils/context" "orly.dev/pkg/utils/context"
"orly.dev/pkg/utils/units" "orly.dev/pkg/utils/units"
"testing" "testing"
@@ -14,6 +15,7 @@ import (
func startTestRelay(c context.T, t *testing.T, tr *testRelay) *Server { func startTestRelay(c context.T, t *testing.T, tr *testRelay) *Server {
t.Helper() t.Helper()
serveMux := servemux.NewServeMux()
srv, _ := NewServer( srv, _ := NewServer(
&ServerParams{ &ServerParams{
Ctx: c, Ctx: c,
@@ -21,6 +23,7 @@ func startTestRelay(c context.T, t *testing.T, tr *testRelay) *Server {
Rl: tr, Rl: tr,
MaxLimit: 500 * units.Kb, MaxLimit: 500 * units.Kb,
}, },
serveMux,
) )
started := make(chan bool) started := make(chan bool)
go srv.Start("127.0.0.1", 0, started) go srv.Start("127.0.0.1", 0, started)

View File

@@ -17,6 +17,8 @@ import (
// JSON. // JSON.
func (d *D) Export(c context.T, w io.Writer, pubkeys ...[]byte) { func (d *D) Export(c context.T, w io.Writer, pubkeys ...[]byte) {
var err error var err error
evB := make([]byte, 0, units.Mb)
evBuf := bytes.NewBuffer(evB)
if len(pubkeys) == 0 { if len(pubkeys) == 0 {
if err = d.View( if err = d.View(
func(txn *badger.Txn) (err error) { func(txn *badger.Txn) (err error) {
@@ -26,25 +28,29 @@ func (d *D) Export(c context.T, w io.Writer, pubkeys ...[]byte) {
return return
} }
it := txn.NewIterator(badger.IteratorOptions{Prefix: buf.Bytes()}) it := txn.NewIterator(badger.IteratorOptions{Prefix: buf.Bytes()})
evB := make([]byte, 0, units.Mb)
defer it.Close() defer it.Close()
for it.Rewind(); it.Valid(); it.Next() { for it.Rewind(); it.Valid(); it.Next() {
item := it.Item() item := it.Item()
if evB, err = item.ValueCopy(evB); chk.E(err) { if err = item.Value(
func(val []byte) (err error) {
evBuf.Write(val)
return
},
); chk.E(err) {
continue continue
} }
evBuf := bytes.NewBuffer(evB)
ev := event.New() ev := event.New()
if err = ev.UnmarshalBinary(evBuf); chk.E(err) { if err = ev.UnmarshalBinary(evBuf); chk.E(err) {
continue continue
} }
// Serialize the event to JSON and write it to the output // Serialize the event to JSON and write it to the output
if _, err = w.Write(ev.Serialize()); chk.E(err) { if _, err = w.Write(ev.Serialize()); chk.E(err) {
continue return
} }
if _, err = w.Write([]byte{'\n'}); chk.E(err) { if _, err = w.Write([]byte{'\n'}); chk.E(err) {
continue return
} }
evBuf.Reset()
} }
return return
}, },
@@ -67,14 +73,17 @@ func (d *D) Export(c context.T, w io.Writer, pubkeys ...[]byte) {
return return
} }
it := txn.NewIterator(badger.IteratorOptions{Prefix: pkBuf.Bytes()}) it := txn.NewIterator(badger.IteratorOptions{Prefix: pkBuf.Bytes()})
evB := make([]byte, 0, units.Mb)
defer it.Close() defer it.Close()
for it.Rewind(); it.Valid(); it.Next() { for it.Rewind(); it.Valid(); it.Next() {
item := it.Item() item := it.Item()
if evB, err = item.ValueCopy(evB); chk.E(err) { if err = item.Value(
func(val []byte) (err error) {
evBuf.Write(val)
return
},
); chk.E(err) {
continue continue
} }
evBuf := bytes.NewBuffer(evB)
ev := event.New() ev := event.New()
if err = ev.UnmarshalBinary(evBuf); chk.E(err) { if err = ev.UnmarshalBinary(evBuf); chk.E(err) {
continue continue
@@ -86,6 +95,7 @@ func (d *D) Export(c context.T, w io.Writer, pubkeys ...[]byte) {
if _, err = w.Write([]byte{'\n'}); chk.E(err) { if _, err = w.Write([]byte{'\n'}); chk.E(err) {
continue continue
} }
evBuf.Reset()
} }
return return
}, },

View File

@@ -8,6 +8,7 @@ import (
"orly.dev/pkg/interfaces/relay" "orly.dev/pkg/interfaces/relay"
"orly.dev/pkg/interfaces/store" "orly.dev/pkg/interfaces/store"
"orly.dev/pkg/utils/context" "orly.dev/pkg/utils/context"
"time"
) )
type I interface { type I interface {
@@ -22,6 +23,9 @@ type I interface {
AddEvent( AddEvent(
c context.T, rl relay.I, ev *event.E, hr *http.Request, origin string, c context.T, rl relay.I, ev *event.E, hr *http.Request, origin string,
) (accepted bool, message []byte) ) (accepted bool, message []byte)
AdminAuth(
r *http.Request, remote string, tolerance ...time.Duration,
) (authed bool, pubkey []byte)
Context() context.T Context() context.T
Publisher() *publish.S Publisher() *publish.S
Publish(c context.T, evt *event.E) (err error) Publish(c context.T, evt *event.E) (err error)

View File

@@ -0,0 +1,69 @@
package openapi
import (
"github.com/danielgtaylor/huma/v2"
"net/http"
"orly.dev/pkg/app/relay/helpers"
"orly.dev/pkg/utils/context"
"orly.dev/pkg/utils/log"
"orly.dev/pkg/utils/lol"
)
// ExportInput is the parameters for the HTTP API Export method.
type ExportInput struct {
Auth string `header:"Authorization" doc:"nostr nip-98 (and expiring variant)" required:"true"`
}
// ExportOutput is the return value of Export. It usually will be line structured JSON. In
// future there may be more output formats.
type ExportOutput struct{ RawBody []byte }
// RegisterExport implements the Export HTTP API method.
func (x *Operations) RegisterExport(api huma.API) {
lol.Tracer("RegisterExport")
defer func() { lol.Tracer("end RegisterExport") }()
name := "Export"
description := "Export all events (only works with NIP-98 capable client, will not work with UI)"
path := x.path + "/export"
scopes := []string{"admin", "read"}
method := http.MethodGet
huma.Register(
api, huma.Operation{
OperationID: name,
Summary: name,
Path: path,
Method: method,
Tags: []string{"admin"},
Description: helpers.GenerateDescription(description, scopes),
Security: []map[string][]string{{"auth": scopes}},
}, func(ctx context.T, input *ExportInput) (
resp *huma.StreamResponse, err error,
) {
r := ctx.Value("http-request").(*http.Request)
remote := helpers.GetRemoteFromReq(r)
log.I.F("processing export from %s", remote)
authed, pubkey := x.AdminAuth(r, remote)
if !authed {
err = huma.Error401Unauthorized("Not Authorized")
return
}
log.I.F(
"%s export of event data requested on admin port pubkey %0x",
remote, pubkey,
)
sto := x.Storage()
resp = &huma.StreamResponse{
Body: func(ctx huma.Context) {
ctx.SetHeader("Content-Type", "application/nostr+jsonl")
sto.Export(x.Context(), ctx.BodyWriter())
if f, ok := ctx.BodyWriter().(http.Flusher); ok {
f.Flush()
} else {
log.W.F("error: unable to flush")
}
},
}
return
},
)
}

View File

@@ -7,14 +7,11 @@ import (
"github.com/danielgtaylor/huma/v2/adapters/humago" "github.com/danielgtaylor/huma/v2/adapters/humago"
"orly.dev/pkg/protocol/servemux" "orly.dev/pkg/protocol/servemux"
"orly.dev/pkg/utils/lol"
) )
// ExposeMiddleware adds the http.Request and http.ResponseWriter to the context // ExposeMiddleware adds the http.Request and http.ResponseWriter to the context
// for the Operations handler. // for the Operations handler.
func ExposeMiddleware(ctx huma.Context, next func(huma.Context)) { func ExposeMiddleware(ctx huma.Context, next func(huma.Context)) {
lol.Tracer("ExposeMiddleware")
defer func() { lol.Tracer("end ExposeMiddleware") }()
// Unwrap the request and response objects. // Unwrap the request and response objects.
r, w := humago.Unwrap(ctx) r, w := humago.Unwrap(ctx)
ctx = huma.WithValue(ctx, "http-request", r) ctx = huma.WithValue(ctx, "http-request", r)
@@ -27,8 +24,6 @@ func ExposeMiddleware(ctx huma.Context, next func(huma.Context)) {
func NewHuma( func NewHuma(
router *servemux.S, name, version, description string, router *servemux.S, name, version, description string,
) (api huma.API) { ) (api huma.API) {
lol.Tracer("NewHuma", name, version, description)
defer func() { lol.Tracer("end NewHuma") }()
config := huma.DefaultConfig(name, version) config := huma.DefaultConfig(name, version)
config.Info.Description = description config.Info.Description = description
config.DocsPath = "" config.DocsPath = ""