diff --git a/app/blossom.go b/app/blossom.go new file mode 100644 index 0000000..6aa7d3a --- /dev/null +++ b/app/blossom.go @@ -0,0 +1,53 @@ +package app + +import ( + "context" + "net/http" + "strings" + + "lol.mleku.dev/log" + "next.orly.dev/app/config" + "next.orly.dev/pkg/acl" + "next.orly.dev/pkg/database" + blossom "next.orly.dev/pkg/blossom" +) + +// initializeBlossomServer creates and configures the Blossom blob storage server +func initializeBlossomServer( + ctx context.Context, cfg *config.C, db *database.D, +) (*blossom.Server, error) { + // Create blossom server configuration + blossomCfg := &blossom.Config{ + BaseURL: "", // Will be set dynamically per request + MaxBlobSize: 100 * 1024 * 1024, // 100MB default + AllowedMimeTypes: nil, // Allow all MIME types by default + RequireAuth: cfg.AuthRequired || cfg.AuthToWrite, + } + + // Create blossom server with relay's ACL registry + bs := blossom.NewServer(db, acl.Registry, blossomCfg) + + // Override baseURL getter to use request-based URL + // We'll need to modify the handler to inject the baseURL per request + // For now, we'll use a middleware approach + + log.I.F("blossom server initialized with ACL mode: %s", cfg.ACLMode) + return bs, nil +} + +// blossomHandler wraps the blossom server handler to inject baseURL per request +func (s *Server) blossomHandler(w http.ResponseWriter, r *http.Request) { + // Strip /blossom prefix and pass to blossom handler + r.URL.Path = strings.TrimPrefix(r.URL.Path, "/blossom") + if !strings.HasPrefix(r.URL.Path, "/") { + r.URL.Path = "/" + r.URL.Path + } + + // Set baseURL in request context for blossom server to use + baseURL := s.ServiceURL(r) + "/blossom" + type baseURLKey struct{} + r = r.WithContext(context.WithValue(r.Context(), baseURLKey{}, baseURL)) + + s.blossomServer.Handler().ServeHTTP(w, r) +} + diff --git a/app/config/config.go b/app/config/config.go index 191975c..06c1957 100644 --- a/app/config/config.go +++ b/app/config/config.go @@ -52,6 +52,9 @@ type C struct { RelayAddresses []string `env:"ORLY_RELAY_ADDRESSES" usage:"comma-separated list of websocket addresses for this relay (e.g., wss://relay.example.com,wss://backup.example.com)"` FollowListFrequency time.Duration `env:"ORLY_FOLLOW_LIST_FREQUENCY" usage:"how often to fetch admin follow lists (default: 1h)" default:"1h"` + // Blossom blob storage service level settings + BlossomServiceLevels string `env:"ORLY_BLOSSOM_SERVICE_LEVELS" usage:"comma-separated list of service levels in format: name:storage_mb_per_sat_per_month (e.g., basic:1,premium:10)"` + // Web UI and dev mode settings WebDisableEmbedded bool `env:"ORLY_WEB_DISABLE" default:"false" usage:"disable serving the embedded web UI; useful for hot-reload during development"` WebDevProxyURL string `env:"ORLY_WEB_DEV_PROXY_URL" usage:"when ORLY_WEB_DISABLE is true, reverse-proxy non-API paths to this dev server URL (e.g. http://localhost:5173)"` diff --git a/app/main.go b/app/main.go index dde63e9..cf48fb4 100644 --- a/app/main.go +++ b/app/main.go @@ -119,6 +119,14 @@ func Run( // Initialize the user interface l.UserInterface() + // Initialize Blossom blob storage server + if l.blossomServer, err = initializeBlossomServer(ctx, cfg, db); err != nil { + log.E.F("failed to initialize blossom server: %v", err) + // Continue without blossom server + } else if l.blossomServer != nil { + log.I.F("blossom blob storage server initialized") + } + // Ensure a relay identity secret key exists when subscriptions and NWC are enabled if cfg.SubscriptionEnabled && cfg.NWCUri != "" { if skb, e := db.GetOrCreateRelayIdentitySecret(); e != nil { diff --git a/app/payment_processor.go b/app/payment_processor.go index 9caa2db..09122ba 100644 --- a/app/payment_processor.go +++ b/app/payment_processor.go @@ -505,7 +505,9 @@ func (pp *PaymentProcessor) handleNotification( // Prefer explicit payer/relay pubkeys if provided in metadata var payerPubkey []byte var userNpub string - if metadata, ok := notification["metadata"].(map[string]any); ok { + var metadata map[string]any + if md, ok := notification["metadata"].(map[string]any); ok { + metadata = md if s, ok := metadata["payer_pubkey"].(string); ok && s != "" { if pk, err := decodeAnyPubkey(s); err == nil { payerPubkey = pk @@ -565,6 +567,11 @@ func (pp *PaymentProcessor) handleNotification( } satsReceived := int64(amount / 1000) + + // Parse zap memo for blossom service level + blossomLevel := pp.parseBlossomServiceLevel(description, metadata) + + // Calculate subscription days (for relay access) monthlyPrice := pp.config.MonthlyPriceSats if monthlyPrice <= 0 { monthlyPrice = 6000 @@ -575,10 +582,19 @@ func (pp *PaymentProcessor) handleNotification( return fmt.Errorf("payment amount too small") } + // Extend relay subscription if err := pp.db.ExtendSubscription(pubkey, days); err != nil { return fmt.Errorf("failed to extend subscription: %w", err) } + // If blossom service level specified, extend blossom subscription + if blossomLevel != "" { + if err := pp.extendBlossomSubscription(pubkey, satsReceived, blossomLevel, days); err != nil { + log.W.F("failed to extend blossom subscription: %v", err) + // Don't fail the payment if blossom subscription fails + } + } + // Record payment history invoice, _ := notification["invoice"].(string) preimage, _ := notification["preimage"].(string) @@ -888,6 +904,118 @@ func (pp *PaymentProcessor) npubToPubkey(npubStr string) ([]byte, error) { return pubkey, nil } +// parseBlossomServiceLevel parses the zap memo for a blossom service level specification +// Format: "blossom:level" or "blossom:level:storage_mb" in description or metadata memo field +func (pp *PaymentProcessor) parseBlossomServiceLevel( + description string, metadata map[string]any, +) string { + // Check metadata memo field first + if metadata != nil { + if memo, ok := metadata["memo"].(string); ok && memo != "" { + if level := pp.extractBlossomLevelFromMemo(memo); level != "" { + return level + } + } + } + + // Check description + if description != "" { + if level := pp.extractBlossomLevelFromMemo(description); level != "" { + return level + } + } + + return "" +} + +// extractBlossomLevelFromMemo extracts blossom service level from memo text +// Supports formats: "blossom:basic", "blossom:premium", "blossom:basic:100" +func (pp *PaymentProcessor) extractBlossomLevelFromMemo(memo string) string { + // Look for "blossom:" prefix + parts := strings.Fields(memo) + for _, part := range parts { + if strings.HasPrefix(part, "blossom:") { + // Extract level name (e.g., "basic", "premium") + levelPart := strings.TrimPrefix(part, "blossom:") + // Remove any storage specification (e.g., ":100") + if colonIdx := strings.Index(levelPart, ":"); colonIdx > 0 { + levelPart = levelPart[:colonIdx] + } + // Validate level exists in config + if pp.isValidBlossomLevel(levelPart) { + return levelPart + } + } + } + return "" +} + +// isValidBlossomLevel checks if a service level is configured +func (pp *PaymentProcessor) isValidBlossomLevel(level string) bool { + if pp.config == nil || pp.config.BlossomServiceLevels == "" { + return false + } + + // Parse service levels from config + levels := strings.Split(pp.config.BlossomServiceLevels, ",") + for _, l := range levels { + l = strings.TrimSpace(l) + if strings.HasPrefix(l, level+":") { + return true + } + } + return false +} + +// parseServiceLevelStorage parses storage quota in MB per sat per month for a service level +func (pp *PaymentProcessor) parseServiceLevelStorage(level string) (int64, error) { + if pp.config == nil || pp.config.BlossomServiceLevels == "" { + return 0, fmt.Errorf("blossom service levels not configured") + } + + levels := strings.Split(pp.config.BlossomServiceLevels, ",") + for _, l := range levels { + l = strings.TrimSpace(l) + if strings.HasPrefix(l, level+":") { + parts := strings.Split(l, ":") + if len(parts) >= 2 { + var storageMB float64 + if _, err := fmt.Sscanf(parts[1], "%f", &storageMB); err != nil { + return 0, fmt.Errorf("invalid storage format: %w", err) + } + return int64(storageMB), nil + } + } + } + return 0, fmt.Errorf("service level %s not found", level) +} + +// extendBlossomSubscription extends or creates a blossom subscription with service level +func (pp *PaymentProcessor) extendBlossomSubscription( + pubkey []byte, satsReceived int64, level string, days int, +) error { + // Get storage quota per sat per month for this level + storageMBPerSatPerMonth, err := pp.parseServiceLevelStorage(level) + if err != nil { + return fmt.Errorf("failed to parse service level storage: %w", err) + } + + // Calculate storage quota: sats * storage_mb_per_sat_per_month * (days / 30) + storageMB := int64(float64(satsReceived) * float64(storageMBPerSatPerMonth) * (float64(days) / 30.0)) + + // Extend blossom subscription + if err := pp.db.ExtendBlossomSubscription(pubkey, level, storageMB, days); err != nil { + return fmt.Errorf("failed to extend blossom subscription: %w", err) + } + + log.I.F( + "extended blossom subscription: level=%s, storage=%d MB, days=%d", + level, storageMB, days, + ) + + return nil +} + // UpdateRelayProfile creates or updates the relay's kind 0 profile with subscription information func (pp *PaymentProcessor) UpdateRelayProfile() error { // Get relay identity secret to sign the profile diff --git a/app/server.go b/app/server.go index 72b3d88..1fd9cf8 100644 --- a/app/server.go +++ b/app/server.go @@ -27,6 +27,7 @@ import ( "next.orly.dev/pkg/protocol/httpauth" "next.orly.dev/pkg/protocol/publish" "next.orly.dev/pkg/spider" + blossom "next.orly.dev/pkg/blossom" ) type Server struct { @@ -49,6 +50,7 @@ type Server struct { sprocketManager *SprocketManager policyManager *policy.P spiderManager *spider.Spider + blossomServer *blossom.Server } // isIPBlacklisted checks if an IP address is blacklisted using the managed ACL system @@ -241,6 +243,12 @@ func (s *Server) UserInterface() { s.mux.HandleFunc("/api/nip86", s.handleNIP86Management) // ACL mode endpoint s.mux.HandleFunc("/api/acl-mode", s.handleACLMode) + + // Blossom blob storage API endpoint + if s.blossomServer != nil { + s.mux.HandleFunc("/blossom/", s.blossomHandler) + log.Printf("Blossom blob storage API enabled at /blossom") + } } // handleFavicon serves orly-favicon.png as favicon.ico diff --git a/pkg/blossom/handlers.go b/pkg/blossom/handlers.go index 6d7d8fa..411d06a 100644 --- a/pkg/blossom/handlers.go +++ b/pkg/blossom/handlers.go @@ -57,7 +57,7 @@ func (s *Server) handleGetBlob(w http.ResponseWriter, r *http.Request) { // Optional authorization check (BUD-01) if s.requireAuth { - authEv, err := ValidateAuthEventForGet(r, s.baseURL, sha256Hash) + authEv, err := ValidateAuthEventForGet(r, s.getBaseURL(r), sha256Hash) if err != nil { s.setErrorResponse(w, http.StatusUnauthorized, "authorization required") return @@ -142,7 +142,7 @@ func (s *Server) handleHeadBlob(w http.ResponseWriter, r *http.Request) { // Optional authorization check if s.requireAuth { - authEv, err := ValidateAuthEventForGet(r, s.baseURL, sha256Hash) + authEv, err := ValidateAuthEventForGet(r, s.getBaseURL(r), sha256Hash) if err != nil { s.setErrorResponse(w, http.StatusUnauthorized, "authorization required") return @@ -233,6 +233,34 @@ func (s *Server) handleUpload(w http.ResponseWriter, r *http.Request) { return } + // Check storage quota if blob doesn't exist (new upload) + if !exists { + blobSizeMB := int64(len(body)) / (1024 * 1024) + if blobSizeMB == 0 && len(body) > 0 { + blobSizeMB = 1 // At least 1 MB for any non-zero blob + } + + // Get storage quota from database + quotaMB, err := s.db.GetBlossomStorageQuota(pubkey) + if err != nil { + log.W.F("failed to get storage quota: %v", err) + } else if quotaMB > 0 { + // Get current storage used + usedMB, err := s.storage.GetTotalStorageUsed(pubkey) + if err != nil { + log.W.F("failed to calculate storage used: %v", err) + } else { + // Check if upload would exceed quota + if usedMB+blobSizeMB > quotaMB { + s.setErrorResponse(w, http.StatusPaymentRequired, + fmt.Sprintf("storage quota exceeded: %d/%d MB used, %d MB needed", + usedMB, quotaMB, blobSizeMB)) + return + } + } + } + } + // Save blob if it doesn't exist if !exists { if err = s.storage.SaveBlob(sha256Hash, body, pubkey, mimeType, ext); err != nil { @@ -257,7 +285,7 @@ func (s *Server) handleUpload(w http.ResponseWriter, r *http.Request) { } // Build URL with extension - blobURL := BuildBlobURL(s.baseURL, sha256Hex, ext) + blobURL := BuildBlobURL(s.getBaseURL(r), sha256Hex, ext) // Create descriptor descriptor := NewBlobDescriptor( @@ -434,7 +462,7 @@ func (s *Server) handleListBlobs(w http.ResponseWriter, r *http.Request) { // Set URLs for descriptors for _, desc := range descriptors { - desc.URL = BuildBlobURL(s.baseURL, desc.SHA256, "") + desc.URL = BuildBlobURL(s.getBaseURL(r), desc.SHA256, "") } // Return JSON array @@ -605,7 +633,7 @@ func (s *Server) handleMirror(w http.ResponseWriter, r *http.Request) { } // Build URL - blobURL := BuildBlobURL(s.baseURL, sha256Hex, ext) + blobURL := BuildBlobURL(s.getBaseURL(r), sha256Hex, ext) // Create descriptor descriptor := NewBlobDescriptor( @@ -685,6 +713,42 @@ func (s *Server) handleMediaUpload(w http.ResponseWriter, r *http.Request) { optimizedHash := CalculateSHA256(optimizedData) optimizedHex := hex.Enc(optimizedHash) + // Check if optimized blob already exists + exists, err := s.storage.HasBlob(optimizedHash) + if err != nil { + log.E.F("error checking blob existence: %v", err) + s.setErrorResponse(w, http.StatusInternalServerError, "internal server error") + return + } + + // Check storage quota if optimized blob doesn't exist (new upload) + if !exists { + blobSizeMB := int64(len(optimizedData)) / (1024 * 1024) + if blobSizeMB == 0 && len(optimizedData) > 0 { + blobSizeMB = 1 // At least 1 MB for any non-zero blob + } + + // Get storage quota from database + quotaMB, err := s.db.GetBlossomStorageQuota(pubkey) + if err != nil { + log.W.F("failed to get storage quota: %v", err) + } else if quotaMB > 0 { + // Get current storage used + usedMB, err := s.storage.GetTotalStorageUsed(pubkey) + if err != nil { + log.W.F("failed to calculate storage used: %v", err) + } else { + // Check if upload would exceed quota + if usedMB+blobSizeMB > quotaMB { + s.setErrorResponse(w, http.StatusPaymentRequired, + fmt.Sprintf("storage quota exceeded: %d/%d MB used, %d MB needed", + usedMB, quotaMB, blobSizeMB)) + return + } + } + } + } + // Save optimized blob if err = s.storage.SaveBlob(optimizedHash, optimizedData, pubkey, mimeType, ext); err != nil { log.E.F("error saving optimized blob: %v", err) diff --git a/pkg/blossom/server.go b/pkg/blossom/server.go index d5a5a44..007f60f 100644 --- a/pkg/blossom/server.go +++ b/pkg/blossom/server.go @@ -27,7 +27,6 @@ type Config struct { MaxBlobSize int64 AllowedMimeTypes []string RequireAuth bool - BlobDir string // Directory for storing blob files } // NewServer creates a new Blossom server instance @@ -39,7 +38,7 @@ func NewServer(db *database.D, aclRegistry *acl.S, cfg *Config) *Server { } } - storage := NewStorage(db, cfg.BlobDir) + storage := NewStorage(db) // Build allowed MIME types map allowedMap := make(map[string]bool) @@ -198,3 +197,14 @@ func (s *Server) checkACL( return actual >= required } + +// getBaseURL returns the base URL, preferring request context if available +func (s *Server) getBaseURL(r *http.Request) string { + type baseURLKey struct{} + if baseURL := r.Context().Value(baseURLKey{}); baseURL != nil { + if url, ok := baseURL.(string); ok && url != "" { + return url + } + } + return s.baseURL +} diff --git a/pkg/blossom/storage.go b/pkg/blossom/storage.go index 58644f9..f054185 100644 --- a/pkg/blossom/storage.go +++ b/pkg/blossom/storage.go @@ -17,8 +17,8 @@ import ( const ( // Database key prefixes (metadata and indexes only, blob data stored as files) - prefixBlobMeta = "blob:meta:" - prefixBlobIndex = "blob:index:" + prefixBlobMeta = "blob:meta:" + prefixBlobIndex = "blob:index:" prefixBlobReport = "blob:report:" ) @@ -29,7 +29,10 @@ type Storage struct { } // NewStorage creates a new storage instance -func NewStorage(db *database.D, blobDir string) *Storage { +func NewStorage(db *database.D) *Storage { + // Derive blob directory from database path + blobDir := filepath.Join(db.Path(), "blossom") + // Ensure blob directory exists if err := os.MkdirAll(blobDir, 0755); err != nil { log.E.F("failed to create blob directory %s: %v", blobDir, err) @@ -271,10 +274,10 @@ func (s *Storage) ListBlobs( for it.Rewind(); it.Valid(); it.Next() { item := it.Item() key := item.Key() - + // Extract SHA256 from key: prefixBlobIndex + pubkeyHex + ":" + sha256Hex sha256Hex := string(key[len(prefix):]) - + // Get blob metadata metaKey := prefixBlobMeta + sha256Hex metaItem, err := txn.Get([]byte(metaKey)) @@ -326,6 +329,62 @@ func (s *Storage) ListBlobs( return } +// GetTotalStorageUsed calculates total storage used by a pubkey in MB +func (s *Storage) GetTotalStorageUsed(pubkey []byte) (totalMB int64, err error) { + pubkeyHex := hex.Enc(pubkey) + prefix := prefixBlobIndex + pubkeyHex + ":" + + totalBytes := int64(0) + + if err = s.db.View(func(txn *badger.Txn) error { + opts := badger.DefaultIteratorOptions + opts.Prefix = []byte(prefix) + it := txn.NewIterator(opts) + defer it.Close() + + for it.Rewind(); it.Valid(); it.Next() { + item := it.Item() + key := item.Key() + + // Extract SHA256 from key: prefixBlobIndex + pubkeyHex + ":" + sha256Hex + sha256Hex := string(key[len(prefix):]) + + // Get blob metadata + metaKey := prefixBlobMeta + sha256Hex + metaItem, err := txn.Get([]byte(metaKey)) + if err != nil { + continue + } + + var metadata *BlobMetadata + if err = metaItem.Value(func(val []byte) error { + if metadata, err = DeserializeBlobMetadata(val); err != nil { + return err + } + return nil + }); err != nil { + continue + } + + // Verify blob file exists + blobPath := s.getBlobPath(sha256Hex, metadata.Extension) + if _, errGet := os.Stat(blobPath); errGet != nil { + continue + } + + totalBytes += metadata.Size + } + + return nil + }); chk.E(err) { + return + } + + // Convert bytes to MB (rounding up) + totalMB = (totalBytes + 1024*1024 - 1) / (1024 * 1024) + return +} + // SaveReport stores a report for a blob (BUD-09) func (s *Storage) SaveReport(sha256Hash []byte, reportData []byte) (err error) { sha256Hex := hex.Enc(sha256Hash) @@ -394,4 +453,3 @@ func (s *Storage) GetBlobMetadata(sha256Hash []byte) (metadata *BlobMetadata, er return } - diff --git a/pkg/blossom/utils_test.go b/pkg/blossom/utils_test.go index f382979..bdd2354 100644 --- a/pkg/blossom/utils_test.go +++ b/pkg/blossom/utils_test.go @@ -39,19 +39,12 @@ func testSetup(t *testing.T) (*Server, func()) { // Create ACL registry aclRegistry := acl.Registry - // Create temporary directory for blob storage - blobDir, err := os.MkdirTemp("", "blossom-blobs-*") - if err != nil { - t.Fatalf("Failed to create blob temp dir: %v", err) - } - // Create server cfg := &Config{ BaseURL: "http://localhost:8080", MaxBlobSize: 100 * 1024 * 1024, // 100MB AllowedMimeTypes: nil, RequireAuth: false, - BlobDir: blobDir, } server := NewServer(db, aclRegistry, cfg) @@ -60,7 +53,6 @@ func testSetup(t *testing.T) (*Server, func()) { cancel() db.Close() os.RemoveAll(tempDir) - os.RemoveAll(blobDir) } return server, cleanup diff --git a/pkg/database/subscriptions.go b/pkg/database/subscriptions.go index b21beb0..e57f716 100644 --- a/pkg/database/subscriptions.go +++ b/pkg/database/subscriptions.go @@ -13,8 +13,10 @@ import ( ) type Subscription struct { - TrialEnd time.Time `json:"trial_end"` - PaidUntil time.Time `json:"paid_until"` + TrialEnd time.Time `json:"trial_end"` + PaidUntil time.Time `json:"paid_until"` + BlossomLevel string `json:"blossom_level,omitempty"` // Service level name (e.g., "basic", "premium") + BlossomStorage int64 `json:"blossom_storage,omitempty"` // Storage quota in MB } func (d *D) GetSubscription(pubkey []byte) (*Subscription, error) { @@ -190,6 +192,77 @@ func (d *D) GetPaymentHistory(pubkey []byte) ([]Payment, error) { return payments, err } +// ExtendBlossomSubscription extends or creates a blossom subscription with service level +func (d *D) ExtendBlossomSubscription( + pubkey []byte, level string, storageMB int64, days int, +) error { + if days <= 0 { + return fmt.Errorf("invalid days: %d", days) + } + + key := fmt.Sprintf("sub:%s", hex.EncodeToString(pubkey)) + now := time.Now() + + return d.DB.Update( + func(txn *badger.Txn) error { + var sub Subscription + item, err := txn.Get([]byte(key)) + if errors.Is(err, badger.ErrKeyNotFound) { + sub.PaidUntil = now.AddDate(0, 0, days) + } else if err != nil { + return err + } else { + err = item.Value( + func(val []byte) error { + return json.Unmarshal(val, &sub) + }, + ) + if err != nil { + return err + } + extendFrom := now + if !sub.PaidUntil.IsZero() && sub.PaidUntil.After(now) { + extendFrom = sub.PaidUntil + } + sub.PaidUntil = extendFrom.AddDate(0, 0, days) + } + + // Set blossom service level and storage + sub.BlossomLevel = level + // Add storage quota (accumulate if subscription already exists) + if sub.BlossomStorage > 0 && sub.PaidUntil.After(now) { + // Add to existing quota + sub.BlossomStorage += storageMB + } else { + // Set new quota + sub.BlossomStorage = storageMB + } + + data, err := json.Marshal(&sub) + if err != nil { + return err + } + return txn.Set([]byte(key), data) + }, + ) +} + +// GetBlossomStorageQuota returns the current blossom storage quota in MB for a pubkey +func (d *D) GetBlossomStorageQuota(pubkey []byte) (quotaMB int64, err error) { + sub, err := d.GetSubscription(pubkey) + if err != nil { + return 0, err + } + if sub == nil { + return 0, nil + } + // Only return quota if subscription is active + if sub.PaidUntil.IsZero() || time.Now().After(sub.PaidUntil) { + return 0, nil + } + return sub.BlossomStorage, nil +} + // IsFirstTimeUser checks if a user is logging in for the first time and marks them as seen func (d *D) IsFirstTimeUser(pubkey []byte) (bool, error) { key := fmt.Sprintf("firstlogin:%s", hex.EncodeToString(pubkey))