Files
next.orly.dev/app/main.go
woikos e6fa2f15e4
Some checks failed
Go / build-and-release (push) Has been cancelled
Add persistent keyset storage for Cashu tokens (v0.44.4)
- Add FileStore implementation for keyset persistence
- Keysets now survive server restarts
- Store keysets in JSON file at $ORLY_DATA_DIR/cashu-keysets.json
- Tokens issued before restart remain valid

Files modified:
- pkg/cashu/keyset/file_store.go: New file-based keyset store
- app/main.go: Use FileStore instead of MemoryStore

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-29 15:37:16 +01:00

663 lines
21 KiB
Go

package app
import (
"context"
"fmt"
"net/http"
"os"
"path/filepath"
"sync"
"time"
"golang.org/x/crypto/acme/autocert"
"lol.mleku.dev/chk"
"lol.mleku.dev/log"
"next.orly.dev/app/config"
"next.orly.dev/pkg/acl"
"git.mleku.dev/mleku/nostr/crypto/keys"
"next.orly.dev/pkg/database"
"git.mleku.dev/mleku/nostr/encoders/bech32encoding"
"next.orly.dev/pkg/neo4j"
"next.orly.dev/pkg/policy"
"next.orly.dev/pkg/protocol/graph"
"next.orly.dev/pkg/protocol/nip43"
"next.orly.dev/pkg/protocol/publish"
"next.orly.dev/pkg/bunker"
"next.orly.dev/pkg/cashu/issuer"
"next.orly.dev/pkg/cashu/keyset"
"next.orly.dev/pkg/cashu/verifier"
cashuiface "next.orly.dev/pkg/interfaces/cashu"
"next.orly.dev/pkg/ratelimit"
"next.orly.dev/pkg/spider"
dsync "next.orly.dev/pkg/sync"
"next.orly.dev/pkg/wireguard"
"git.mleku.dev/mleku/nostr/interfaces/signer/p8k"
)
func Run(
ctx context.Context, cfg *config.C, db database.Database, limiter *ratelimit.Limiter,
) (quit chan struct{}) {
quit = make(chan struct{})
var once sync.Once
// shutdown handler
go func() {
<-ctx.Done()
log.I.F("shutting down")
once.Do(func() { close(quit) })
}()
// get the admins
var err error
var adminKeys [][]byte
for _, admin := range cfg.Admins {
if len(admin) == 0 {
continue
}
var pk []byte
if pk, err = bech32encoding.NpubOrHexToPublicKeyBinary(admin); chk.E(err) {
continue
}
adminKeys = append(adminKeys, pk)
}
// get the owners
var ownerKeys [][]byte
for _, owner := range cfg.Owners {
if len(owner) == 0 {
continue
}
var pk []byte
if pk, err = bech32encoding.NpubOrHexToPublicKeyBinary(owner); chk.E(err) {
continue
}
ownerKeys = append(ownerKeys, pk)
}
// start listener
l := &Server{
Ctx: ctx,
Config: cfg,
DB: db,
publishers: publish.New(NewPublisher(ctx)),
Admins: adminKeys,
Owners: ownerKeys,
rateLimiter: limiter,
cfg: cfg,
db: db,
}
// Initialize NIP-43 invite manager if enabled
if cfg.NIP43Enabled {
l.InviteManager = nip43.NewInviteManager(cfg.NIP43InviteExpiry)
log.I.F("NIP-43 invite system enabled with %v expiry", cfg.NIP43InviteExpiry)
}
// Initialize sprocket manager
l.sprocketManager = NewSprocketManager(ctx, cfg.AppName, cfg.SprocketEnabled)
// Initialize policy manager
l.policyManager = policy.NewWithManager(ctx, cfg.AppName, cfg.PolicyEnabled, cfg.PolicyPath)
// Merge policy-defined owners with environment-defined owners
// This allows cloud deployments to add owners via policy.json when env vars cannot be modified
if l.policyManager != nil {
policyOwners := l.policyManager.GetOwnersBin()
if len(policyOwners) > 0 {
// Deduplicate when merging
existingOwners := make(map[string]struct{})
for _, owner := range l.Owners {
existingOwners[string(owner)] = struct{}{}
}
for _, policyOwner := range policyOwners {
if _, exists := existingOwners[string(policyOwner)]; !exists {
l.Owners = append(l.Owners, policyOwner)
existingOwners[string(policyOwner)] = struct{}{}
}
}
log.I.F("merged %d policy-defined owners with %d environment-defined owners (total: %d unique owners)",
len(policyOwners), len(ownerKeys), len(l.Owners))
}
}
// Initialize policy follows from database (load follow lists of policy admins)
// This must be done after policy manager initialization but before accepting connections
if err := l.InitializePolicyFollows(); err != nil {
log.W.F("failed to initialize policy follows: %v", err)
// Continue anyway - follows can be loaded when admins update their follow lists
}
// Cleanup any kind 3 events that lost their p tags (only for Badger backend)
if badgerDB, ok := db.(*database.D); ok {
if err := badgerDB.CleanupKind3WithoutPTags(ctx); chk.E(err) {
log.E.F("failed to cleanup kind 3 events: %v", err)
}
}
// Initialize graph query executor (Badger backend)
if badgerDB, ok := db.(*database.D); ok {
// Get relay identity key for signing graph query responses
relaySecretKey, err := badgerDB.GetOrCreateRelayIdentitySecret()
if err != nil {
log.E.F("failed to get relay identity key for graph executor: %v", err)
} else {
// Create the graph adapter and executor
graphAdapter := database.NewGraphAdapter(badgerDB)
if l.graphExecutor, err = graph.NewExecutor(graphAdapter, relaySecretKey); err != nil {
log.E.F("failed to create graph executor: %v", err)
} else {
log.I.F("graph query executor initialized (Badger backend)")
}
}
}
// Initialize graph query executor (Neo4j backend)
if neo4jDB, ok := db.(*neo4j.N); ok {
// Get relay identity key for signing graph query responses
relaySecretKey, err := neo4jDB.GetOrCreateRelayIdentitySecret()
if err != nil {
log.E.F("failed to get relay identity key for graph executor: %v", err)
} else {
// Create the graph adapter and executor
graphAdapter := neo4j.NewGraphAdapter(neo4jDB)
if l.graphExecutor, err = graph.NewExecutor(graphAdapter, relaySecretKey); err != nil {
log.E.F("failed to create graph executor: %v", err)
} else {
log.I.F("graph query executor initialized (Neo4j backend)")
}
}
}
// Initialize Cashu access token system when ACL is active
if cfg.ACLMode != "none" {
// Create keyset manager with file-based store (keysets persist across restarts)
keysetPath := filepath.Join(cfg.DataDir, "cashu-keysets.json")
keysetStore, err := keyset.NewFileStore(keysetPath)
if err != nil {
log.E.F("failed to create Cashu keyset store at %s: %v", keysetPath, err)
} else {
keysetManager := keyset.NewManager(keysetStore, keyset.DefaultActiveWindow, keyset.DefaultVerifyWindow)
// Initialize keyset manager (loads existing keysets or creates new one)
if err := keysetManager.Init(); err != nil {
log.E.F("failed to initialize Cashu keyset manager: %v", err)
} else {
// Create issuer with permissive checker (ACL handles authorization)
issuerCfg := issuer.DefaultConfig()
l.CashuIssuer = issuer.New(keysetManager, cashuiface.AllowAllChecker{}, issuerCfg)
// Create verifier for validating tokens
l.CashuVerifier = verifier.New(keysetManager, cashuiface.AllowAllChecker{}, verifier.DefaultConfig())
log.I.F("Cashu access token system enabled (ACL mode: %s, keysets: %s)", cfg.ACLMode, keysetPath)
}
}
}
// Initialize spider manager based on mode (only for Badger backend)
if badgerDB, ok := db.(*database.D); ok && cfg.SpiderMode != "none" {
if l.spiderManager, err = spider.New(ctx, badgerDB, l.publishers, cfg.SpiderMode); chk.E(err) {
log.E.F("failed to create spider manager: %v", err)
} else {
// Set up callbacks for follows mode
if cfg.SpiderMode == "follows" {
l.spiderManager.SetCallbacks(
func() []string {
// Get admin relays from follows ACL if available
for _, aclInstance := range acl.Registry.ACL {
if aclInstance.Type() == "follows" {
if follows, ok := aclInstance.(*acl.Follows); ok {
return follows.AdminRelays()
}
}
}
return nil
},
func() [][]byte {
// Get followed pubkeys from follows ACL if available
for _, aclInstance := range acl.Registry.ACL {
if aclInstance.Type() == "follows" {
if follows, ok := aclInstance.(*acl.Follows); ok {
return follows.GetFollowedPubkeys()
}
}
}
return nil
},
)
}
if err = l.spiderManager.Start(); chk.E(err) {
log.E.F("failed to start spider manager: %v", err)
} else {
log.I.F("spider manager started successfully in '%s' mode", cfg.SpiderMode)
// Hook up follow list update notifications from ACL to spider
if cfg.SpiderMode == "follows" {
for _, aclInstance := range acl.Registry.ACL {
if aclInstance.Type() == "follows" {
if follows, ok := aclInstance.(*acl.Follows); ok {
follows.SetFollowListUpdateCallback(func() {
log.I.F("follow list updated, notifying spider")
l.spiderManager.NotifyFollowListUpdate()
})
log.I.F("spider: follow list update notifications configured")
}
}
}
}
}
}
}
// Initialize directory spider if enabled (only for Badger backend)
if badgerDB, ok := db.(*database.D); ok && cfg.DirectorySpiderEnabled {
if l.directorySpider, err = spider.NewDirectorySpider(
ctx,
badgerDB,
l.publishers,
cfg.DirectorySpiderInterval,
cfg.DirectorySpiderMaxHops,
); chk.E(err) {
log.E.F("failed to create directory spider: %v", err)
} else {
// Set up callback to get seed pubkeys (whitelisted users)
l.directorySpider.SetSeedCallback(func() [][]byte {
var pubkeys [][]byte
// Get followed pubkeys from follows ACL if available
for _, aclInstance := range acl.Registry.ACL {
if aclInstance.Type() == "follows" {
if follows, ok := aclInstance.(*acl.Follows); ok {
pubkeys = append(pubkeys, follows.GetFollowedPubkeys()...)
}
}
}
// Fall back to admin keys if no follows ACL
if len(pubkeys) == 0 {
pubkeys = adminKeys
}
return pubkeys
})
if err = l.directorySpider.Start(); chk.E(err) {
log.E.F("failed to start directory spider: %v", err)
} else {
log.I.F("directory spider started (interval: %v, max hops: %d)",
cfg.DirectorySpiderInterval, cfg.DirectorySpiderMaxHops)
}
}
}
// Initialize relay group manager (only for Badger backend)
if badgerDB, ok := db.(*database.D); ok {
l.relayGroupMgr = dsync.NewRelayGroupManager(badgerDB, cfg.RelayGroupAdmins)
} else if cfg.SpiderMode != "none" || len(cfg.RelayPeers) > 0 || len(cfg.ClusterAdmins) > 0 {
log.I.Ln("spider, sync, and cluster features require Badger backend (currently using alternative backend)")
}
// Initialize sync manager if relay peers are configured (only for Badger backend)
if badgerDB, ok := db.(*database.D); ok {
var peers []string
if len(cfg.RelayPeers) > 0 {
peers = cfg.RelayPeers
} else {
// Try to get peers from relay group configuration
if l.relayGroupMgr != nil {
if config, err := l.relayGroupMgr.FindAuthoritativeConfig(ctx); err == nil && config != nil {
peers = config.Relays
log.I.F("using relay group configuration with %d peers", len(peers))
}
}
}
if len(peers) > 0 {
// Get relay identity for node ID
sk, err := db.GetOrCreateRelayIdentitySecret()
if err != nil {
log.E.F("failed to get relay identity for sync: %v", err)
} else {
nodeID, err := keys.SecretBytesToPubKeyHex(sk)
if err != nil {
log.E.F("failed to derive pubkey for sync node ID: %v", err)
} else {
relayURL := cfg.RelayURL
if relayURL == "" {
relayURL = fmt.Sprintf("http://localhost:%d", cfg.Port)
}
l.syncManager = dsync.NewManager(ctx, badgerDB, nodeID, relayURL, peers, l.relayGroupMgr, l.policyManager)
log.I.F("distributed sync manager initialized with %d peers", len(peers))
}
}
}
}
// Initialize cluster manager for cluster replication (only for Badger backend)
if badgerDB, ok := db.(*database.D); ok {
var clusterAdminNpubs []string
if len(cfg.ClusterAdmins) > 0 {
clusterAdminNpubs = cfg.ClusterAdmins
} else {
// Default to regular admins if no cluster admins specified
for _, admin := range cfg.Admins {
clusterAdminNpubs = append(clusterAdminNpubs, admin)
}
}
if len(clusterAdminNpubs) > 0 {
l.clusterManager = dsync.NewClusterManager(ctx, badgerDB, clusterAdminNpubs, cfg.ClusterPropagatePrivilegedEvents, l.publishers)
l.clusterManager.Start()
log.I.F("cluster replication manager initialized with %d admin npubs", len(clusterAdminNpubs))
}
}
// Initialize Blossom blob storage server (only for Badger backend)
// MUST be done before UserInterface() which registers routes
if badgerDB, ok := db.(*database.D); ok {
log.I.F("Badger backend detected, initializing Blossom server...")
if l.blossomServer, err = initializeBlossomServer(ctx, cfg, badgerDB); err != nil {
log.E.F("failed to initialize blossom server: %v", err)
// Continue without blossom server
} else if l.blossomServer != nil {
log.I.F("blossom blob storage server initialized")
} else {
log.W.F("blossom server initialization returned nil without error")
}
} else {
log.I.F("Non-Badger backend detected (type: %T), Blossom server not available", db)
}
// Initialize WireGuard VPN and NIP-46 Bunker (only for Badger backend)
// Requires ACL mode 'follows' or 'managed' - no point for open relays
if badgerDB, ok := db.(*database.D); ok && cfg.WGEnabled && cfg.ACLMode != "none" {
if cfg.WGEndpoint == "" {
log.E.F("WireGuard enabled but ORLY_WG_ENDPOINT not set - skipping")
} else {
// Get or create the subnet pool (restores seed and allocations from DB)
subnetPool, err := badgerDB.GetOrCreateSubnetPool(cfg.WGNetwork)
if err != nil {
log.E.F("failed to create subnet pool: %v", err)
} else {
l.subnetPool = subnetPool
// Get or create WireGuard server key
wgServerKey, err := badgerDB.GetOrCreateWireGuardServerKey()
if err != nil {
log.E.F("failed to get WireGuard server key: %v", err)
} else {
// Create WireGuard server
wgConfig := &wireguard.Config{
Port: cfg.WGPort,
Endpoint: cfg.WGEndpoint,
PrivateKey: wgServerKey,
Network: cfg.WGNetwork,
ServerIP: "10.73.0.1",
}
l.wireguardServer, err = wireguard.New(wgConfig)
if err != nil {
log.E.F("failed to create WireGuard server: %v", err)
} else {
if err = l.wireguardServer.Start(); err != nil {
log.E.F("failed to start WireGuard server: %v", err)
} else {
log.I.F("WireGuard VPN server started on UDP port %d", cfg.WGPort)
// Load existing peers from database and add to server
peers, err := badgerDB.GetAllWireGuardPeers()
if err != nil {
log.W.F("failed to load existing WireGuard peers: %v", err)
} else {
for _, peer := range peers {
// Derive client IP from sequence
subnet := subnetPool.SubnetForSequence(peer.Sequence)
clientIP := subnet.ClientIP.String()
if err := l.wireguardServer.AddPeer(peer.NostrPubkey, peer.WGPublicKey, clientIP); err != nil {
log.W.F("failed to add existing peer: %v", err)
}
}
if len(peers) > 0 {
log.I.F("loaded %d existing WireGuard peers", len(peers))
}
}
// Initialize bunker if enabled
if cfg.BunkerEnabled {
// Get relay identity for signing
relaySecretKey, err := badgerDB.GetOrCreateRelayIdentitySecret()
if err != nil {
log.E.F("failed to get relay identity for bunker: %v", err)
} else {
// Create signer from secret key
relaySigner, sigErr := p8k.New()
if sigErr != nil {
log.E.F("failed to create signer for bunker: %v", sigErr)
} else if sigErr = relaySigner.InitSec(relaySecretKey); sigErr != nil {
log.E.F("failed to init signer for bunker: %v", sigErr)
} else {
relayPubkey := relaySigner.Pub()
bunkerConfig := &bunker.Config{
RelaySigner: relaySigner,
RelayPubkey: relayPubkey[:],
Netstack: l.wireguardServer.GetNetstack(),
ListenAddr: fmt.Sprintf("10.73.0.1:%d", cfg.BunkerPort),
}
l.bunkerServer = bunker.New(bunkerConfig)
if err = l.bunkerServer.Start(); err != nil {
log.E.F("failed to start bunker server: %v", err)
} else {
log.I.F("NIP-46 bunker server started on 10.73.0.1:%d (WireGuard only)", cfg.BunkerPort)
}
}
}
}
}
}
}
}
}
} else if cfg.WGEnabled && cfg.ACLMode == "none" {
log.I.F("WireGuard disabled: requires ACL mode 'follows' or 'managed' (currently: 'none')")
}
// Initialize event domain services (validation, routing, processing)
l.InitEventServices()
// Initialize the user interface (registers routes)
l.UserInterface()
// Ensure a relay identity secret key exists when subscriptions and NWC are enabled
if cfg.SubscriptionEnabled && cfg.NWCUri != "" {
if skb, e := db.GetOrCreateRelayIdentitySecret(); e != nil {
log.E.F("failed to ensure relay identity key: %v", e)
} else if pk, e2 := keys.SecretBytesToPubKeyHex(skb); e2 == nil {
log.I.F("relay identity loaded (pub=%s)", pk)
// ensure relay identity pubkey is considered an admin for ACL follows mode
found := false
for _, a := range cfg.Admins {
if a == pk {
found = true
break
}
}
if !found {
cfg.Admins = append(cfg.Admins, pk)
log.I.F("added relay identity to admins for follow-list whitelisting")
}
// also ensure relay identity pubkey is considered an owner for full control
found = false
for _, o := range cfg.Owners {
if o == pk {
found = true
break
}
}
if !found {
cfg.Owners = append(cfg.Owners, pk)
log.I.F("added relay identity to owners for full control")
}
}
}
// Initialize payment processor (only for Badger backend)
if badgerDB, ok := db.(*database.D); ok {
if l.paymentProcessor, err = NewPaymentProcessor(ctx, cfg, badgerDB); err != nil {
// log.E.F("failed to create payment processor: %v", err)
// Continue without payment processor
} else {
if err = l.paymentProcessor.Start(); err != nil {
log.E.F("failed to start payment processor: %v", err)
} else {
log.I.F("payment processor started successfully")
}
}
}
// Start rate limiter if enabled
if limiter != nil && limiter.IsEnabled() {
limiter.Start()
log.I.F("adaptive rate limiter started")
}
// Wait for database to be ready before accepting requests
log.I.F("waiting for database warmup to complete...")
<-db.Ready()
log.I.F("database ready, starting HTTP servers")
// Check if TLS is enabled
var tlsEnabled bool
var tlsServer *http.Server
var httpServer *http.Server
if len(cfg.TLSDomains) > 0 {
// Validate TLS configuration
if err = ValidateTLSConfig(cfg.TLSDomains, cfg.Certs); chk.E(err) {
log.E.F("invalid TLS configuration: %v", err)
} else {
tlsEnabled = true
log.I.F("TLS enabled for domains: %v", cfg.TLSDomains)
// Create cache directory for autocert
cacheDir := filepath.Join(cfg.DataDir, "autocert")
if err = os.MkdirAll(cacheDir, 0700); chk.E(err) {
log.E.F("failed to create autocert cache directory: %v", err)
tlsEnabled = false
} else {
// Set up autocert manager
m := &autocert.Manager{
Prompt: autocert.AcceptTOS,
Cache: autocert.DirCache(cacheDir),
HostPolicy: autocert.HostWhitelist(cfg.TLSDomains...),
}
// Create TLS server on port 443
tlsServer = &http.Server{
Addr: ":443",
Handler: l,
TLSConfig: TLSConfig(m, cfg.Certs...),
}
// Create HTTP server for ACME challenges and redirects on port 80
httpServer = &http.Server{
Addr: ":80",
Handler: m.HTTPHandler(nil),
}
// Start TLS server
go func() {
log.I.F("starting TLS listener on https://:443")
if err := tlsServer.ListenAndServeTLS("", ""); err != nil && err != http.ErrServerClosed {
log.E.F("TLS server error: %v", err)
}
}()
// Start HTTP server for ACME challenges
go func() {
log.I.F("starting HTTP listener on http://:80 for ACME challenges")
if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.E.F("HTTP server error: %v", err)
}
}()
}
}
}
// Start regular HTTP server if TLS is not enabled or as fallback
if !tlsEnabled {
addr := fmt.Sprintf("%s:%d", cfg.Listen, cfg.Port)
log.I.F("starting listener on http://%s", addr)
httpServer = &http.Server{
Addr: addr,
Handler: l,
}
go func() {
if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.E.F("HTTP server error: %v", err)
}
}()
}
// Graceful shutdown handler
go func() {
<-ctx.Done()
log.I.F("shutting down servers gracefully")
// Stop spider manager if running
if l.spiderManager != nil {
l.spiderManager.Stop()
log.I.F("spider manager stopped")
}
// Stop directory spider if running
if l.directorySpider != nil {
l.directorySpider.Stop()
log.I.F("directory spider stopped")
}
// Stop rate limiter if running
if l.rateLimiter != nil && l.rateLimiter.IsEnabled() {
l.rateLimiter.Stop()
log.I.F("rate limiter stopped")
}
// Stop bunker server if running
if l.bunkerServer != nil {
l.bunkerServer.Stop()
log.I.F("bunker server stopped")
}
// Stop WireGuard server if running
if l.wireguardServer != nil {
l.wireguardServer.Stop()
log.I.F("WireGuard server stopped")
}
// Create shutdown context with timeout
shutdownCtx, cancelShutdown := context.WithTimeout(context.Background(), 10*time.Second)
defer cancelShutdown()
// Shutdown TLS server if running
if tlsServer != nil {
if err := tlsServer.Shutdown(shutdownCtx); err != nil {
log.E.F("TLS server shutdown error: %v", err)
} else {
log.I.F("TLS server shutdown completed")
}
}
// Shutdown HTTP server
if httpServer != nil {
if err := httpServer.Shutdown(shutdownCtx); err != nil {
log.E.F("HTTP server shutdown error: %v", err)
} else {
log.I.F("HTTP server shutdown completed")
}
}
once.Do(func() { close(quit) })
}()
return
}