Add payment processing with NWC and subscription-based access control.

- Implemented `PaymentProcessor` to handle NWC payments and extend user subscriptions.
- Added configuration options for NWC URI, subscription pricing, and enablement.
- Updated server to initialize and manage the payment processor.
This commit is contained in:
2025-09-22 17:36:05 +01:00
parent 804e1c9649
commit 7736bb7640
4 changed files with 232 additions and 30 deletions

View File

@@ -23,25 +23,28 @@ import (
// and default values. It defines parameters for app behaviour, storage // and default values. It defines parameters for app behaviour, storage
// locations, logging, and network settings used across the relay service. // locations, logging, and network settings used across the relay service.
type C struct { type C struct {
AppName string `env:"ORLY_APP_NAME" usage:"set a name to display on information about the relay" default:"ORLY"` AppName string `env:"ORLY_APP_NAME" usage:"set a name to display on information about the relay" default:"ORLY"`
DataDir string `env:"ORLY_DATA_DIR" usage:"storage location for the event store" default:"~/.local/share/ORLY"` DataDir string `env:"ORLY_DATA_DIR" usage:"storage location for the event store" default:"~/.local/share/ORLY"`
Listen string `env:"ORLY_LISTEN" default:"0.0.0.0" usage:"network listen address"` Listen string `env:"ORLY_LISTEN" default:"0.0.0.0" usage:"network listen address"`
Port int `env:"ORLY_PORT" default:"3334" usage:"port to listen on"` Port int `env:"ORLY_PORT" default:"3334" usage:"port to listen on"`
HealthPort int `env:"ORLY_HEALTH_PORT" default:"0" usage:"optional health check HTTP port; 0 disables"` HealthPort int `env:"ORLY_HEALTH_PORT" default:"0" usage:"optional health check HTTP port; 0 disables"`
EnableShutdown bool `env:"ORLY_ENABLE_SHUTDOWN" default:"false" usage:"if true, expose /shutdown on the health port to gracefully stop the process (for profiling)"` EnableShutdown bool `env:"ORLY_ENABLE_SHUTDOWN" default:"false" usage:"if true, expose /shutdown on the health port to gracefully stop the process (for profiling)"`
LogLevel string `env:"ORLY_LOG_LEVEL" default:"info" usage:"relay log level: fatal error warn info debug trace"` LogLevel string `env:"ORLY_LOG_LEVEL" default:"info" usage:"relay log level: fatal error warn info debug trace"`
DBLogLevel string `env:"ORLY_DB_LOG_LEVEL" default:"info" usage:"database log level: fatal error warn info debug trace"` DBLogLevel string `env:"ORLY_DB_LOG_LEVEL" default:"info" usage:"database log level: fatal error warn info debug trace"`
LogToStdout bool `env:"ORLY_LOG_TO_STDOUT" default:"false" usage:"log to stdout instead of stderr"` LogToStdout bool `env:"ORLY_LOG_TO_STDOUT" default:"false" usage:"log to stdout instead of stderr"`
Pprof string `env:"ORLY_PPROF" usage:"enable pprof in modes: cpu,memory,allocation,heap,block,goroutine,threadcreate,mutex"` Pprof string `env:"ORLY_PPROF" usage:"enable pprof in modes: cpu,memory,allocation,heap,block,goroutine,threadcreate,mutex"`
PprofPath string `env:"ORLY_PPROF_PATH" usage:"optional directory to write pprof profiles into (inside container); default is temporary dir"` PprofPath string `env:"ORLY_PPROF_PATH" usage:"optional directory to write pprof profiles into (inside container); default is temporary dir"`
PprofHTTP bool `env:"ORLY_PPROF_HTTP" default:"false" usage:"if true, expose net/http/pprof on port 6060"` PprofHTTP bool `env:"ORLY_PPROF_HTTP" default:"false" usage:"if true, expose net/http/pprof on port 6060"`
OpenPprofWeb bool `env:"ORLY_OPEN_PPROF_WEB" default:"false" usage:"if true, automatically open the pprof web viewer when profiling is enabled"` OpenPprofWeb bool `env:"ORLY_OPEN_PPROF_WEB" default:"false" usage:"if true, automatically open the pprof web viewer when profiling is enabled"`
IPWhitelist []string `env:"ORLY_IP_WHITELIST" usage:"comma-separated list of IP addresses to allow access from, matches on prefixes to allow private subnets, eg 10.0.0 = 10.0.0.0/8"` IPWhitelist []string `env:"ORLY_IP_WHITELIST" usage:"comma-separated list of IP addresses to allow access from, matches on prefixes to allow private subnets, eg 10.0.0 = 10.0.0.0/8"`
Admins []string `env:"ORLY_ADMINS" usage:"comma-separated list of admin npubs"` Admins []string `env:"ORLY_ADMINS" usage:"comma-separated list of admin npubs"`
Owners []string `env:"ORLY_OWNERS" usage:"comma-separated list of owner npubs, who have full control of the relay for wipe and restart and other functions"` Owners []string `env:"ORLY_OWNERS" usage:"comma-separated list of owner npubs, who have full control of the relay for wipe and restart and other functions"`
ACLMode string `env:"ORLY_ACL_MODE" usage:"ACL mode: follows,none" default:"none"` ACLMode string `env:"ORLY_ACL_MODE" usage:"ACL mode: follows,none" default:"none"`
SpiderMode string `env:"ORLY_SPIDER_MODE" usage:"spider mode: none,follow" default:"none"` SpiderMode string `env:"ORLY_SPIDER_MODE" usage:"spider mode: none,follow" default:"none"`
SpiderFrequency time.Duration `env:"ORLY_SPIDER_FREQUENCY" usage:"spider frequency in seconds" default:"1h"` SpiderFrequency time.Duration `env:"ORLY_SPIDER_FREQUENCY" usage:"spider frequency in seconds" default:"1h"`
NWCUri string `env:"ORLY_NWC_URI" usage:"NWC (Nostr Wallet Connect) connection string for Lightning payments"`
SubscriptionEnabled bool `env:"ORLY_SUBSCRIPTION_ENABLED" default:"false" usage:"enable subscription-based access control requiring payment for non-directory events"`
MonthlyPriceSats int64 `env:"ORLY_MONTHLY_PRICE_SATS" default:"6000" usage:"price in satoshis for one month subscription (default ~$2 USD)"`
// Web UI and dev mode settings // Web UI and dev mode settings
WebDisableEmbedded bool `env:"ORLY_WEB_DISABLE" default:"false" usage:"disable serving the embedded web UI; useful for hot-reload during development"` WebDisableEmbedded bool `env:"ORLY_WEB_DISABLE" default:"false" usage:"disable serving the embedded web UI; useful for hot-reload during development"`

View File

@@ -8,7 +8,7 @@ import (
"lol.mleku.dev/chk" "lol.mleku.dev/chk"
"lol.mleku.dev/log" "lol.mleku.dev/log"
"next.orly.dev/app/config" "next.orly.dev/app/config"
database "next.orly.dev/pkg/database" "next.orly.dev/pkg/database"
"next.orly.dev/pkg/encoders/bech32encoding" "next.orly.dev/pkg/encoders/bech32encoding"
"next.orly.dev/pkg/protocol/publish" "next.orly.dev/pkg/protocol/publish"
) )
@@ -47,6 +47,16 @@ func Run(
} }
// Initialize the user interface // Initialize the user interface
l.UserInterface() l.UserInterface()
if l.paymentProcessor, err = NewPaymentProcessor(ctx, cfg, db); err != nil {
log.E.F("failed to create payment processor: %v", err)
// Continue without payment processor
} else {
if err = l.paymentProcessor.Start(); err != nil {
log.E.F("failed to start payment processor: %v", err)
} else {
log.I.F("payment processor started successfully")
}
}
addr := fmt.Sprintf("%s:%d", cfg.Listen, cfg.Port) addr := fmt.Sprintf("%s:%d", cfg.Listen, cfg.Port)
log.I.F("starting listener on http://%s", addr) log.I.F("starting listener on http://%s", addr)
go func() { go func() {

184
app/payment_processor.go Normal file
View File

@@ -0,0 +1,184 @@
package app
import (
"context"
"fmt"
"strings"
"sync"
"lol.mleku.dev/chk"
"lol.mleku.dev/log"
"next.orly.dev/app/config"
"next.orly.dev/pkg/database"
"next.orly.dev/pkg/encoders/bech32encoding"
"next.orly.dev/pkg/protocol/nwc"
)
// PaymentProcessor handles NWC payment notifications and updates subscriptions
type PaymentProcessor struct {
nwcClient *nwc.Client
db *database.D
config *config.C
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
// NewPaymentProcessor creates a new payment processor
func NewPaymentProcessor(
ctx context.Context, cfg *config.C, db *database.D,
) (pp *PaymentProcessor, err error) {
if cfg.NWCUri == "" {
return nil, fmt.Errorf("NWC URI not configured")
}
var nwcClient *nwc.Client
if nwcClient, err = nwc.NewClient(cfg.NWCUri); chk.E(err) {
return nil, fmt.Errorf("failed to create NWC client: %w", err)
}
c, cancel := context.WithCancel(ctx)
pp = &PaymentProcessor{
nwcClient: nwcClient,
db: db,
config: cfg,
ctx: c,
cancel: cancel,
}
return pp, nil
}
// Start begins listening for payment notifications
func (pp *PaymentProcessor) Start() error {
pp.wg.Add(1)
go func() {
defer pp.wg.Done()
if err := pp.listenForPayments(); err != nil {
log.E.F("payment processor error: %v", err)
}
}()
return nil
}
// Stop gracefully stops the payment processor
func (pp *PaymentProcessor) Stop() {
if pp.cancel != nil {
pp.cancel()
}
pp.wg.Wait()
}
// listenForPayments subscribes to NWC notifications and processes payments
func (pp *PaymentProcessor) listenForPayments() error {
return pp.nwcClient.SubscribeNotifications(pp.ctx, pp.handleNotification)
}
// handleNotification processes incoming payment notifications
func (pp *PaymentProcessor) handleNotification(
notificationType string, notification map[string]any,
) error {
// Only process payment_received notifications
if notificationType != "payment_received" {
return nil
}
amount, ok := notification["amount"].(float64)
if !ok {
return fmt.Errorf("invalid amount")
}
description, _ := notification["description"].(string)
userNpub := pp.extractNpubFromDescription(description)
if userNpub == "" {
if metadata, ok := notification["metadata"].(map[string]any); ok {
if npubField, ok := metadata["npub"].(string); ok {
userNpub = npubField
}
}
}
if userNpub == "" {
return fmt.Errorf("no npub in payment description")
}
pubkey, err := pp.npubToPubkey(userNpub)
if err != nil {
return fmt.Errorf("invalid npub: %w", err)
}
satsReceived := int64(amount / 1000)
monthlyPrice := pp.config.MonthlyPriceSats
if monthlyPrice <= 0 {
monthlyPrice = 6000
}
days := int((float64(satsReceived) / float64(monthlyPrice)) * 30)
if days < 1 {
return fmt.Errorf("payment amount too small")
}
if err := pp.db.ExtendSubscription(pubkey, days); err != nil {
return fmt.Errorf("failed to extend subscription: %w", err)
}
// Record payment history
invoice, _ := notification["invoice"].(string)
preimage, _ := notification["preimage"].(string)
if err := pp.db.RecordPayment(
pubkey, satsReceived, invoice, preimage,
); err != nil {
log.E.F("failed to record payment: %v", err)
}
log.I.F(
"payment processed: %s %d sats -> %d days", userNpub, satsReceived,
days,
)
return nil
}
// extractNpubFromDescription extracts an npub from the payment description
func (pp *PaymentProcessor) extractNpubFromDescription(description string) string {
// Look for npub1... pattern in the description
parts := strings.Fields(description)
for _, part := range parts {
if strings.HasPrefix(part, "npub1") && len(part) == 63 {
return part
}
}
// Also check if the entire description is just an npub
description = strings.TrimSpace(description)
if strings.HasPrefix(description, "npub1") && len(description) == 63 {
return description
}
return ""
}
// npubToPubkey converts an npub string to pubkey bytes
func (pp *PaymentProcessor) npubToPubkey(npubStr string) ([]byte, error) {
// Validate npub format
if !strings.HasPrefix(npubStr, "npub1") || len(npubStr) != 63 {
return nil, fmt.Errorf("invalid npub format")
}
// Decode using bech32encoding
prefix, value, err := bech32encoding.Decode([]byte(npubStr))
if err != nil {
return nil, fmt.Errorf("failed to decode npub: %w", err)
}
if !strings.EqualFold(string(prefix), "npub") {
return nil, fmt.Errorf("invalid prefix: %s", string(prefix))
}
pubkey, ok := value.([]byte)
if !ok {
return nil, fmt.Errorf("decoded value is not []byte")
}
return pubkey, nil
}

View File

@@ -40,6 +40,8 @@ type Server struct {
// Challenge storage for HTTP UI authentication // Challenge storage for HTTP UI authentication
challengeMutex sync.RWMutex challengeMutex sync.RWMutex
challenges map[string][]byte challenges map[string][]byte
paymentProcessor *PaymentProcessor
} }
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
@@ -406,13 +408,14 @@ func (s *Server) handleExport(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/x-ndjson") w.Header().Set("Content-Type", "application/x-ndjson")
filename := "events-" + time.Now().UTC().Format("20060102-150405Z") + ".jsonl" filename := "events-" + time.Now().UTC().Format("20060102-150405Z") + ".jsonl"
w.Header().Set("Content-Disposition", "attachment; filename=\""+filename+"\"") w.Header().Set(
"Content-Disposition", "attachment; filename=\""+filename+"\"",
)
// Stream export // Stream export
s.D.Export(s.Ctx, w, pks...) s.D.Export(s.Ctx, w, pks...)
} }
// handleExportMine streams only the authenticated user's events as JSONL (NDJSON). // handleExportMine streams only the authenticated user's events as JSONL (NDJSON).
func (s *Server) handleExportMine(w http.ResponseWriter, r *http.Request) { func (s *Server) handleExportMine(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet { if r.Method != http.MethodGet {
@@ -434,7 +437,9 @@ func (s *Server) handleExportMine(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/x-ndjson") w.Header().Set("Content-Type", "application/x-ndjson")
filename := "my-events-" + time.Now().UTC().Format("20060102-150405Z") + ".jsonl" filename := "my-events-" + time.Now().UTC().Format("20060102-150405Z") + ".jsonl"
w.Header().Set("Content-Disposition", "attachment; filename=\""+filename+"\"") w.Header().Set(
"Content-Disposition", "attachment; filename=\""+filename+"\"",
)
// Stream export for this user's pubkey only // Stream export for this user's pubkey only
s.D.Export(s.Ctx, w, pubkey) s.D.Export(s.Ctx, w, pubkey)
@@ -482,7 +487,7 @@ func (s *Server) handleImport(w http.ResponseWriter, r *http.Request) {
http.Error(w, "Empty request body", http.StatusBadRequest) http.Error(w, "Empty request body", http.StatusBadRequest)
return return
} }
s.D.Import(r.Body) s.D.Import(r.Body)
} }
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
@@ -552,7 +557,7 @@ func (s *Server) handleEventsMine(w http.ResponseWriter, r *http.Request) {
} }
// Events are already sorted by QueryEvents in reverse chronological order // Events are already sorted by QueryEvents in reverse chronological order
// Apply offset and limit manually since QueryEvents doesn't support offset // Apply offset and limit manually since QueryEvents doesn't support offset
totalEvents := len(events) totalEvents := len(events)
start := offset start := offset
@@ -568,11 +573,11 @@ func (s *Server) handleEventsMine(w http.ResponseWriter, r *http.Request) {
// Convert events to JSON response format // Convert events to JSON response format
type EventResponse struct { type EventResponse struct {
ID string `json:"id"` ID string `json:"id"`
Kind int `json:"kind"` Kind int `json:"kind"`
CreatedAt int64 `json:"created_at"` CreatedAt int64 `json:"created_at"`
Content string `json:"content"` Content string `json:"content"`
RawJSON string `json:"raw_json"` RawJSON string `json:"raw_json"`
} }
response := struct { response := struct {