349 lines
9.1 KiB
Go
349 lines
9.1 KiB
Go
package relay
|
|
|
|
import (
|
|
_ "embed"
|
|
"errors"
|
|
"fmt"
|
|
"net"
|
|
"net/http"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"orly.dev/pkg/database"
|
|
"orly.dev/pkg/protocol/openapi"
|
|
"orly.dev/pkg/protocol/socketapi"
|
|
|
|
"orly.dev/pkg/app/config"
|
|
"orly.dev/pkg/app/relay/helpers"
|
|
"orly.dev/pkg/app/relay/options"
|
|
"orly.dev/pkg/app/relay/publish"
|
|
"orly.dev/pkg/interfaces/relay"
|
|
"orly.dev/pkg/protocol/servemux"
|
|
"orly.dev/pkg/utils/chk"
|
|
"orly.dev/pkg/utils/context"
|
|
"orly.dev/pkg/utils/keys"
|
|
"orly.dev/pkg/utils/log"
|
|
|
|
"github.com/rs/cors"
|
|
)
|
|
|
|
// Server represents the core structure for running a nostr relay. It
|
|
// encapsulates various components such as context, cancel function, options,
|
|
// relay interface, address, HTTP server, and configuration settings.
|
|
type Server struct {
|
|
Ctx context.T
|
|
Cancel context.F
|
|
options *options.T
|
|
relay relay.I
|
|
Addr string
|
|
mux *servemux.S
|
|
httpServer *http.Server
|
|
listeners *publish.S
|
|
blacklistPubkeys [][]byte
|
|
*config.C
|
|
*Lists
|
|
*Peers
|
|
Mux *servemux.S
|
|
MetricsCollector *MetricsCollector
|
|
subscriptionCache map[string]time.Time // pubkey hex -> cache expiry time
|
|
subscriptionMutex sync.RWMutex
|
|
paymentProcessor *PaymentProcessor
|
|
}
|
|
|
|
// ServerParams represents the configuration parameters for initializing a
|
|
// server. It encapsulates various components such as context, cancel function,
|
|
// relay interface, database path, maximum limit, and configuration settings.
|
|
type ServerParams struct {
|
|
Ctx context.T
|
|
Cancel context.F
|
|
Rl relay.I
|
|
DbPath string
|
|
MaxLimit int
|
|
Mux *servemux.S
|
|
*config.C
|
|
}
|
|
|
|
// NewServer initializes and returns a new Server instance based on the provided
|
|
// ServerParams and optional settings. It sets up storage, initializes the
|
|
// relay, and configures necessary components for server operation.
|
|
//
|
|
// # Parameters
|
|
//
|
|
// - sp (*ServerParams): The configuration parameters for initializing the
|
|
// server.
|
|
//
|
|
// - opts (...options.O): Optional settings that modify the server's behavior.
|
|
//
|
|
// # Return Values
|
|
//
|
|
// - s (*Server): The newly created Server instance.
|
|
//
|
|
// - err (error): An error if any step fails during initialization.
|
|
//
|
|
// # Expected Behaviour
|
|
//
|
|
// - Initializes storage with the provided database path.
|
|
//
|
|
// - Configures the server's options using the default settings and applies any
|
|
// optional settings provided.
|
|
//
|
|
// - Sets up a ServeMux for handling HTTP requests.
|
|
//
|
|
// - Initializes the relay, starting its operation in a separate goroutine.
|
|
func NewServer(
|
|
sp *ServerParams, serveMux *servemux.S, opts ...options.O,
|
|
) (s *Server, err error) {
|
|
op := options.Default()
|
|
for _, opt := range opts {
|
|
opt(op)
|
|
}
|
|
if storage := sp.Rl.Storage(); storage != nil {
|
|
if err = storage.Init(sp.DbPath); chk.T(err) {
|
|
return nil, fmt.Errorf("storage init: %w", err)
|
|
}
|
|
}
|
|
s = &Server{
|
|
Ctx: sp.Ctx,
|
|
Cancel: sp.Cancel,
|
|
relay: sp.Rl,
|
|
mux: serveMux,
|
|
options: op,
|
|
C: sp.C,
|
|
Lists: new(Lists),
|
|
Peers: new(Peers),
|
|
subscriptionCache: make(map[string]time.Time),
|
|
}
|
|
// Parse blacklist pubkeys
|
|
for _, v := range s.C.Blacklist {
|
|
if len(v) == 0 {
|
|
continue
|
|
}
|
|
var pk []byte
|
|
if pk, err = keys.DecodeNpubOrHex(v); chk.E(err) {
|
|
continue
|
|
}
|
|
s.blacklistPubkeys = append(s.blacklistPubkeys, pk)
|
|
}
|
|
chk.E(
|
|
s.Peers.Init(sp.C.PeerRelays, sp.C.RelaySecret),
|
|
)
|
|
s.listeners = publish.New(socketapi.New(s), openapi.NewPublisher(s))
|
|
go func() {
|
|
if err := s.relay.Init(); chk.E(err) {
|
|
s.Shutdown()
|
|
}
|
|
}()
|
|
return s, nil
|
|
}
|
|
|
|
// ServeHTTP handles incoming HTTP requests according to the standard Nostr
|
|
// protocol. It specifically processes WebSocket upgrades and
|
|
// "application/nostr+json" Accept headers.
|
|
//
|
|
// # Parameters
|
|
//
|
|
// - w (http.ResponseWriter): The response writer for sending responses.
|
|
//
|
|
// - r (*http.Request): The request object containing client's details and data.
|
|
//
|
|
// # Expected Behaviour
|
|
//
|
|
// - Checks if the request URL path is "/".
|
|
//
|
|
// - For WebSocket upgrades, calls handleWebsocket method.
|
|
//
|
|
// - If "Accept" header is "application/nostr+json", calls HandleRelayInfo
|
|
// method.
|
|
//
|
|
// - Logs the HTTP request details for non-standard requests.
|
|
//
|
|
// - For all other paths, delegates to the internal mux's ServeHTTP method.
|
|
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|
c := s.Config()
|
|
remote := helpers.GetRemoteFromReq(r)
|
|
var whitelisted bool
|
|
if len(c.Whitelist) > 0 {
|
|
for _, addr := range c.Whitelist {
|
|
if strings.HasPrefix(remote, addr) {
|
|
whitelisted = true
|
|
}
|
|
}
|
|
} else {
|
|
whitelisted = true
|
|
}
|
|
if !whitelisted {
|
|
return
|
|
}
|
|
// standard nostr protocol only governs the "root" path of the relay and
|
|
// websockets
|
|
if r.URL.Path == "/" {
|
|
if r.Header.Get("Upgrade") == "websocket" {
|
|
s.handleWebsocket(w, r)
|
|
return
|
|
}
|
|
if r.Header.Get("Accept") == "application/nostr+json" {
|
|
s.HandleRelayInfo(w, r)
|
|
return
|
|
}
|
|
}
|
|
log.T.C(
|
|
func() string {
|
|
return fmt.Sprintf(
|
|
"http request: %s from %s",
|
|
r.URL.String(), helpers.GetRemoteFromReq(r),
|
|
)
|
|
},
|
|
)
|
|
s.mux.ServeHTTP(w, r)
|
|
}
|
|
|
|
// Start initializes the server by setting up a TCP listener and serving HTTP
|
|
// requests.
|
|
//
|
|
// # Parameters
|
|
//
|
|
// - host (string): The hostname or IP address to listen on.
|
|
//
|
|
// - port (int): The port number to bind to.
|
|
//
|
|
// - started (...chan bool): Optional channels that are closed after the server
|
|
// starts successfully.
|
|
//
|
|
// # Return Values
|
|
//
|
|
// - err (error): An error if any step fails during the server startup process.
|
|
//
|
|
// # Expected Behaviour
|
|
//
|
|
// - Joins the host and port into a full address string.
|
|
//
|
|
// - Logs the intention to start the relay listener at the specified address.
|
|
//
|
|
// - Listens for TCP connections on the specified address.
|
|
//
|
|
// - Configures an HTTP server with CORS middleware, sets timeouts, and binds it
|
|
// to the listener.
|
|
//
|
|
// - If any started channels are provided, closes them upon successful startup.
|
|
//
|
|
// - Starts serving requests using the configured HTTP server.
|
|
func (s *Server) Start(
|
|
host string, port int, started ...chan bool,
|
|
) (err error) {
|
|
// Initialize payment processor if subscription is enabled
|
|
if s.C.SubscriptionEnabled && s.C.NWCUri != "" {
|
|
if db, ok := s.relay.Storage().(*database.D); ok {
|
|
if s.paymentProcessor, err = NewPaymentProcessor(s.C, db); err != nil {
|
|
log.E.F("failed to create payment processor: %v", err)
|
|
// Continue without payment processor
|
|
} else {
|
|
if err := s.paymentProcessor.Start(); err != nil {
|
|
log.E.F("failed to start payment processor: %v", err)
|
|
} else {
|
|
log.I.F("payment processor started successfully")
|
|
}
|
|
}
|
|
} else {
|
|
log.E.F("subscription enabled but storage is not database.D")
|
|
}
|
|
}
|
|
|
|
log.I.F("running spider every %v", s.C.SpiderTime)
|
|
if len(s.C.Owners) > 0 {
|
|
// start up spider
|
|
if err = s.Spider(s.C.Private); chk.E(err) {
|
|
// there wasn't any owners, or they couldn't be found on the spider
|
|
// seeds.
|
|
err = nil
|
|
}
|
|
}
|
|
// start up a spider run to trigger every 30 minutes
|
|
ticker := time.NewTicker(s.C.SpiderTime)
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
if err = s.Spider(s.C.Private); chk.E(err) {
|
|
// there wasn't any owners, or they couldn't be found on the spider
|
|
// seeds.
|
|
err = nil
|
|
}
|
|
case <-s.Ctx.Done():
|
|
log.I.F("stopping spider ticker")
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
addr := net.JoinHostPort(host, strconv.Itoa(port))
|
|
log.I.F("starting relay listener at %s", addr)
|
|
var ln net.Listener
|
|
if ln, err = net.Listen("tcp", addr); err != nil {
|
|
return err
|
|
}
|
|
s.httpServer = &http.Server{
|
|
Handler: cors.Default().Handler(s),
|
|
Addr: addr,
|
|
ReadHeaderTimeout: 7 * time.Second,
|
|
IdleTimeout: 28 * time.Second,
|
|
}
|
|
for _, startedC := range started {
|
|
close(startedC)
|
|
}
|
|
if err = s.httpServer.Serve(ln); errors.Is(err, http.ErrServerClosed) {
|
|
} else if err != nil {
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Shutdown gracefully shuts down the server and its components. It ensures that
|
|
// all resources are properly released.
|
|
//
|
|
// # Expected Behaviour
|
|
//
|
|
// - Logs shutting down message.
|
|
//
|
|
// - Cancels the context to stop ongoing operations.
|
|
//
|
|
// - Closes the event store, logging the action and checking for errors.
|
|
//
|
|
// - Shuts down the HTTP server, logging the action and checking for errors.
|
|
//
|
|
// - If the relay implements ShutdownAware, it calls OnShutdown with the
|
|
// context.
|
|
func (s *Server) Shutdown() {
|
|
log.I.Ln("shutting down relay")
|
|
|
|
// Stop payment processor if running
|
|
if s.paymentProcessor != nil {
|
|
log.I.Ln("stopping payment processor")
|
|
s.paymentProcessor.Stop()
|
|
}
|
|
|
|
s.Cancel()
|
|
log.W.Ln("closing event store")
|
|
chk.E(s.relay.Storage().Close())
|
|
if s.httpServer != nil {
|
|
log.W.Ln("shutting down relay listener")
|
|
chk.E(s.httpServer.Shutdown(s.Ctx))
|
|
}
|
|
if f, ok := s.relay.(relay.ShutdownAware); ok {
|
|
f.OnShutdown(s.Ctx)
|
|
}
|
|
}
|
|
|
|
// Router retrieves and returns the HTTP ServeMux associated with the server.
|
|
//
|
|
// # Return Values
|
|
//
|
|
// - router (*http.ServeMux): The ServeMux instance used for routing HTTP
|
|
// requests.
|
|
//
|
|
// # Expected Behaviour
|
|
//
|
|
// - Returns the ServeMux that handles incoming HTTP requests to the server.
|
|
func (s *Server) Router() (router *http.ServeMux) {
|
|
return s.mux.ServeMux
|
|
}
|