Add Blossom blob storage server and subscription management
- Introduced the `initializeBlossomServer` function to set up the Blossom blob storage server with dynamic base URL handling and ACL configuration. - Implemented the `blossomHandler` method to manage incoming requests to the Blossom API, ensuring proper URL handling and context management. - Enhanced the `PaymentProcessor` to support Blossom service levels, allowing for subscription extensions based on payment metadata. - Added methods for parsing and validating Blossom service levels, including storage quota management and subscription extension logic. - Updated the configuration to include Blossom service level settings, facilitating dynamic service level management. - Integrated storage quota checks in the blob upload process to prevent exceeding allocated limits. - Refactored existing code to improve organization and maintainability, including the removal of unused blob directory configurations. - Added tests to ensure the robustness of new functionalities and maintain existing behavior across blob operations.
This commit is contained in:
@@ -57,7 +57,7 @@ func (s *Server) handleGetBlob(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
// Optional authorization check (BUD-01)
|
||||
if s.requireAuth {
|
||||
authEv, err := ValidateAuthEventForGet(r, s.baseURL, sha256Hash)
|
||||
authEv, err := ValidateAuthEventForGet(r, s.getBaseURL(r), sha256Hash)
|
||||
if err != nil {
|
||||
s.setErrorResponse(w, http.StatusUnauthorized, "authorization required")
|
||||
return
|
||||
@@ -142,7 +142,7 @@ func (s *Server) handleHeadBlob(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
// Optional authorization check
|
||||
if s.requireAuth {
|
||||
authEv, err := ValidateAuthEventForGet(r, s.baseURL, sha256Hash)
|
||||
authEv, err := ValidateAuthEventForGet(r, s.getBaseURL(r), sha256Hash)
|
||||
if err != nil {
|
||||
s.setErrorResponse(w, http.StatusUnauthorized, "authorization required")
|
||||
return
|
||||
@@ -233,6 +233,34 @@ func (s *Server) handleUpload(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
// Check storage quota if blob doesn't exist (new upload)
|
||||
if !exists {
|
||||
blobSizeMB := int64(len(body)) / (1024 * 1024)
|
||||
if blobSizeMB == 0 && len(body) > 0 {
|
||||
blobSizeMB = 1 // At least 1 MB for any non-zero blob
|
||||
}
|
||||
|
||||
// Get storage quota from database
|
||||
quotaMB, err := s.db.GetBlossomStorageQuota(pubkey)
|
||||
if err != nil {
|
||||
log.W.F("failed to get storage quota: %v", err)
|
||||
} else if quotaMB > 0 {
|
||||
// Get current storage used
|
||||
usedMB, err := s.storage.GetTotalStorageUsed(pubkey)
|
||||
if err != nil {
|
||||
log.W.F("failed to calculate storage used: %v", err)
|
||||
} else {
|
||||
// Check if upload would exceed quota
|
||||
if usedMB+blobSizeMB > quotaMB {
|
||||
s.setErrorResponse(w, http.StatusPaymentRequired,
|
||||
fmt.Sprintf("storage quota exceeded: %d/%d MB used, %d MB needed",
|
||||
usedMB, quotaMB, blobSizeMB))
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Save blob if it doesn't exist
|
||||
if !exists {
|
||||
if err = s.storage.SaveBlob(sha256Hash, body, pubkey, mimeType, ext); err != nil {
|
||||
@@ -257,7 +285,7 @@ func (s *Server) handleUpload(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
// Build URL with extension
|
||||
blobURL := BuildBlobURL(s.baseURL, sha256Hex, ext)
|
||||
blobURL := BuildBlobURL(s.getBaseURL(r), sha256Hex, ext)
|
||||
|
||||
// Create descriptor
|
||||
descriptor := NewBlobDescriptor(
|
||||
@@ -434,7 +462,7 @@ func (s *Server) handleListBlobs(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
// Set URLs for descriptors
|
||||
for _, desc := range descriptors {
|
||||
desc.URL = BuildBlobURL(s.baseURL, desc.SHA256, "")
|
||||
desc.URL = BuildBlobURL(s.getBaseURL(r), desc.SHA256, "")
|
||||
}
|
||||
|
||||
// Return JSON array
|
||||
@@ -605,7 +633,7 @@ func (s *Server) handleMirror(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
// Build URL
|
||||
blobURL := BuildBlobURL(s.baseURL, sha256Hex, ext)
|
||||
blobURL := BuildBlobURL(s.getBaseURL(r), sha256Hex, ext)
|
||||
|
||||
// Create descriptor
|
||||
descriptor := NewBlobDescriptor(
|
||||
@@ -685,6 +713,42 @@ func (s *Server) handleMediaUpload(w http.ResponseWriter, r *http.Request) {
|
||||
optimizedHash := CalculateSHA256(optimizedData)
|
||||
optimizedHex := hex.Enc(optimizedHash)
|
||||
|
||||
// Check if optimized blob already exists
|
||||
exists, err := s.storage.HasBlob(optimizedHash)
|
||||
if err != nil {
|
||||
log.E.F("error checking blob existence: %v", err)
|
||||
s.setErrorResponse(w, http.StatusInternalServerError, "internal server error")
|
||||
return
|
||||
}
|
||||
|
||||
// Check storage quota if optimized blob doesn't exist (new upload)
|
||||
if !exists {
|
||||
blobSizeMB := int64(len(optimizedData)) / (1024 * 1024)
|
||||
if blobSizeMB == 0 && len(optimizedData) > 0 {
|
||||
blobSizeMB = 1 // At least 1 MB for any non-zero blob
|
||||
}
|
||||
|
||||
// Get storage quota from database
|
||||
quotaMB, err := s.db.GetBlossomStorageQuota(pubkey)
|
||||
if err != nil {
|
||||
log.W.F("failed to get storage quota: %v", err)
|
||||
} else if quotaMB > 0 {
|
||||
// Get current storage used
|
||||
usedMB, err := s.storage.GetTotalStorageUsed(pubkey)
|
||||
if err != nil {
|
||||
log.W.F("failed to calculate storage used: %v", err)
|
||||
} else {
|
||||
// Check if upload would exceed quota
|
||||
if usedMB+blobSizeMB > quotaMB {
|
||||
s.setErrorResponse(w, http.StatusPaymentRequired,
|
||||
fmt.Sprintf("storage quota exceeded: %d/%d MB used, %d MB needed",
|
||||
usedMB, quotaMB, blobSizeMB))
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Save optimized blob
|
||||
if err = s.storage.SaveBlob(optimizedHash, optimizedData, pubkey, mimeType, ext); err != nil {
|
||||
log.E.F("error saving optimized blob: %v", err)
|
||||
|
||||
@@ -27,7 +27,6 @@ type Config struct {
|
||||
MaxBlobSize int64
|
||||
AllowedMimeTypes []string
|
||||
RequireAuth bool
|
||||
BlobDir string // Directory for storing blob files
|
||||
}
|
||||
|
||||
// NewServer creates a new Blossom server instance
|
||||
@@ -39,7 +38,7 @@ func NewServer(db *database.D, aclRegistry *acl.S, cfg *Config) *Server {
|
||||
}
|
||||
}
|
||||
|
||||
storage := NewStorage(db, cfg.BlobDir)
|
||||
storage := NewStorage(db)
|
||||
|
||||
// Build allowed MIME types map
|
||||
allowedMap := make(map[string]bool)
|
||||
@@ -198,3 +197,14 @@ func (s *Server) checkACL(
|
||||
|
||||
return actual >= required
|
||||
}
|
||||
|
||||
// getBaseURL returns the base URL, preferring request context if available
|
||||
func (s *Server) getBaseURL(r *http.Request) string {
|
||||
type baseURLKey struct{}
|
||||
if baseURL := r.Context().Value(baseURLKey{}); baseURL != nil {
|
||||
if url, ok := baseURL.(string); ok && url != "" {
|
||||
return url
|
||||
}
|
||||
}
|
||||
return s.baseURL
|
||||
}
|
||||
|
||||
@@ -17,8 +17,8 @@ import (
|
||||
|
||||
const (
|
||||
// Database key prefixes (metadata and indexes only, blob data stored as files)
|
||||
prefixBlobMeta = "blob:meta:"
|
||||
prefixBlobIndex = "blob:index:"
|
||||
prefixBlobMeta = "blob:meta:"
|
||||
prefixBlobIndex = "blob:index:"
|
||||
prefixBlobReport = "blob:report:"
|
||||
)
|
||||
|
||||
@@ -29,7 +29,10 @@ type Storage struct {
|
||||
}
|
||||
|
||||
// NewStorage creates a new storage instance
|
||||
func NewStorage(db *database.D, blobDir string) *Storage {
|
||||
func NewStorage(db *database.D) *Storage {
|
||||
// Derive blob directory from database path
|
||||
blobDir := filepath.Join(db.Path(), "blossom")
|
||||
|
||||
// Ensure blob directory exists
|
||||
if err := os.MkdirAll(blobDir, 0755); err != nil {
|
||||
log.E.F("failed to create blob directory %s: %v", blobDir, err)
|
||||
@@ -271,10 +274,10 @@ func (s *Storage) ListBlobs(
|
||||
for it.Rewind(); it.Valid(); it.Next() {
|
||||
item := it.Item()
|
||||
key := item.Key()
|
||||
|
||||
|
||||
// Extract SHA256 from key: prefixBlobIndex + pubkeyHex + ":" + sha256Hex
|
||||
sha256Hex := string(key[len(prefix):])
|
||||
|
||||
|
||||
// Get blob metadata
|
||||
metaKey := prefixBlobMeta + sha256Hex
|
||||
metaItem, err := txn.Get([]byte(metaKey))
|
||||
@@ -326,6 +329,62 @@ func (s *Storage) ListBlobs(
|
||||
return
|
||||
}
|
||||
|
||||
// GetTotalStorageUsed calculates total storage used by a pubkey in MB
|
||||
func (s *Storage) GetTotalStorageUsed(pubkey []byte) (totalMB int64, err error) {
|
||||
pubkeyHex := hex.Enc(pubkey)
|
||||
prefix := prefixBlobIndex + pubkeyHex + ":"
|
||||
|
||||
totalBytes := int64(0)
|
||||
|
||||
if err = s.db.View(func(txn *badger.Txn) error {
|
||||
opts := badger.DefaultIteratorOptions
|
||||
opts.Prefix = []byte(prefix)
|
||||
it := txn.NewIterator(opts)
|
||||
defer it.Close()
|
||||
|
||||
for it.Rewind(); it.Valid(); it.Next() {
|
||||
item := it.Item()
|
||||
key := item.Key()
|
||||
|
||||
// Extract SHA256 from key: prefixBlobIndex + pubkeyHex + ":" + sha256Hex
|
||||
sha256Hex := string(key[len(prefix):])
|
||||
|
||||
// Get blob metadata
|
||||
metaKey := prefixBlobMeta + sha256Hex
|
||||
metaItem, err := txn.Get([]byte(metaKey))
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
var metadata *BlobMetadata
|
||||
if err = metaItem.Value(func(val []byte) error {
|
||||
if metadata, err = DeserializeBlobMetadata(val); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// Verify blob file exists
|
||||
blobPath := s.getBlobPath(sha256Hex, metadata.Extension)
|
||||
if _, errGet := os.Stat(blobPath); errGet != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
totalBytes += metadata.Size
|
||||
}
|
||||
|
||||
return nil
|
||||
}); chk.E(err) {
|
||||
return
|
||||
}
|
||||
|
||||
// Convert bytes to MB (rounding up)
|
||||
totalMB = (totalBytes + 1024*1024 - 1) / (1024 * 1024)
|
||||
return
|
||||
}
|
||||
|
||||
// SaveReport stores a report for a blob (BUD-09)
|
||||
func (s *Storage) SaveReport(sha256Hash []byte, reportData []byte) (err error) {
|
||||
sha256Hex := hex.Enc(sha256Hash)
|
||||
@@ -394,4 +453,3 @@ func (s *Storage) GetBlobMetadata(sha256Hash []byte) (metadata *BlobMetadata, er
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -39,19 +39,12 @@ func testSetup(t *testing.T) (*Server, func()) {
|
||||
// Create ACL registry
|
||||
aclRegistry := acl.Registry
|
||||
|
||||
// Create temporary directory for blob storage
|
||||
blobDir, err := os.MkdirTemp("", "blossom-blobs-*")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create blob temp dir: %v", err)
|
||||
}
|
||||
|
||||
// Create server
|
||||
cfg := &Config{
|
||||
BaseURL: "http://localhost:8080",
|
||||
MaxBlobSize: 100 * 1024 * 1024, // 100MB
|
||||
AllowedMimeTypes: nil,
|
||||
RequireAuth: false,
|
||||
BlobDir: blobDir,
|
||||
}
|
||||
|
||||
server := NewServer(db, aclRegistry, cfg)
|
||||
@@ -60,7 +53,6 @@ func testSetup(t *testing.T) (*Server, func()) {
|
||||
cancel()
|
||||
db.Close()
|
||||
os.RemoveAll(tempDir)
|
||||
os.RemoveAll(blobDir)
|
||||
}
|
||||
|
||||
return server, cleanup
|
||||
|
||||
@@ -13,8 +13,10 @@ import (
|
||||
)
|
||||
|
||||
type Subscription struct {
|
||||
TrialEnd time.Time `json:"trial_end"`
|
||||
PaidUntil time.Time `json:"paid_until"`
|
||||
TrialEnd time.Time `json:"trial_end"`
|
||||
PaidUntil time.Time `json:"paid_until"`
|
||||
BlossomLevel string `json:"blossom_level,omitempty"` // Service level name (e.g., "basic", "premium")
|
||||
BlossomStorage int64 `json:"blossom_storage,omitempty"` // Storage quota in MB
|
||||
}
|
||||
|
||||
func (d *D) GetSubscription(pubkey []byte) (*Subscription, error) {
|
||||
@@ -190,6 +192,77 @@ func (d *D) GetPaymentHistory(pubkey []byte) ([]Payment, error) {
|
||||
return payments, err
|
||||
}
|
||||
|
||||
// ExtendBlossomSubscription extends or creates a blossom subscription with service level
|
||||
func (d *D) ExtendBlossomSubscription(
|
||||
pubkey []byte, level string, storageMB int64, days int,
|
||||
) error {
|
||||
if days <= 0 {
|
||||
return fmt.Errorf("invalid days: %d", days)
|
||||
}
|
||||
|
||||
key := fmt.Sprintf("sub:%s", hex.EncodeToString(pubkey))
|
||||
now := time.Now()
|
||||
|
||||
return d.DB.Update(
|
||||
func(txn *badger.Txn) error {
|
||||
var sub Subscription
|
||||
item, err := txn.Get([]byte(key))
|
||||
if errors.Is(err, badger.ErrKeyNotFound) {
|
||||
sub.PaidUntil = now.AddDate(0, 0, days)
|
||||
} else if err != nil {
|
||||
return err
|
||||
} else {
|
||||
err = item.Value(
|
||||
func(val []byte) error {
|
||||
return json.Unmarshal(val, &sub)
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
extendFrom := now
|
||||
if !sub.PaidUntil.IsZero() && sub.PaidUntil.After(now) {
|
||||
extendFrom = sub.PaidUntil
|
||||
}
|
||||
sub.PaidUntil = extendFrom.AddDate(0, 0, days)
|
||||
}
|
||||
|
||||
// Set blossom service level and storage
|
||||
sub.BlossomLevel = level
|
||||
// Add storage quota (accumulate if subscription already exists)
|
||||
if sub.BlossomStorage > 0 && sub.PaidUntil.After(now) {
|
||||
// Add to existing quota
|
||||
sub.BlossomStorage += storageMB
|
||||
} else {
|
||||
// Set new quota
|
||||
sub.BlossomStorage = storageMB
|
||||
}
|
||||
|
||||
data, err := json.Marshal(&sub)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return txn.Set([]byte(key), data)
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
// GetBlossomStorageQuota returns the current blossom storage quota in MB for a pubkey
|
||||
func (d *D) GetBlossomStorageQuota(pubkey []byte) (quotaMB int64, err error) {
|
||||
sub, err := d.GetSubscription(pubkey)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if sub == nil {
|
||||
return 0, nil
|
||||
}
|
||||
// Only return quota if subscription is active
|
||||
if sub.PaidUntil.IsZero() || time.Now().After(sub.PaidUntil) {
|
||||
return 0, nil
|
||||
}
|
||||
return sub.BlossomStorage, nil
|
||||
}
|
||||
|
||||
// IsFirstTimeUser checks if a user is logging in for the first time and marks them as seen
|
||||
func (d *D) IsFirstTimeUser(pubkey []byte) (bool, error) {
|
||||
key := fmt.Sprintf("firstlogin:%s", hex.EncodeToString(pubkey))
|
||||
|
||||
Reference in New Issue
Block a user