diff --git a/app/config/config.go b/app/config/config.go index 06c1957..ab10a5a 100644 --- a/app/config/config.go +++ b/app/config/config.go @@ -50,6 +50,7 @@ type C struct { MonthlyPriceSats int64 `env:"ORLY_MONTHLY_PRICE_SATS" default:"6000" usage:"price in satoshis for one month subscription (default ~$2 USD)"` RelayURL string `env:"ORLY_RELAY_URL" usage:"base URL for the relay dashboard (e.g., https://relay.example.com)"` RelayAddresses []string `env:"ORLY_RELAY_ADDRESSES" usage:"comma-separated list of websocket addresses for this relay (e.g., wss://relay.example.com,wss://backup.example.com)"` + RelayPeers []string `env:"ORLY_RELAY_PEERS" usage:"comma-separated list of peer relay URLs for distributed synchronization (e.g., https://peer1.example.com,https://peer2.example.com)"` FollowListFrequency time.Duration `env:"ORLY_FOLLOW_LIST_FREQUENCY" usage:"how often to fetch admin follow lists (default: 1h)" default:"1h"` // Blossom blob storage service level settings diff --git a/app/handle-event.go b/app/handle-event.go index e4b02a7..55c51d8 100644 --- a/app/handle-event.go +++ b/app/handle-event.go @@ -455,6 +455,12 @@ func (l *Listener) HandleEvent(msg []byte) (err error) { chk.E(err) return } + + // Update serial for distributed synchronization + if l.syncManager != nil { + l.syncManager.UpdateSerial() + log.D.F("updated serial for event %s", hex.Enc(env.E.ID)) + } // Send a success response storing if err = Ok.Ok(l, env, ""); chk.E(err) { return diff --git a/app/main.go b/app/main.go index b2021b1..6d4385f 100644 --- a/app/main.go +++ b/app/main.go @@ -20,6 +20,7 @@ import ( "next.orly.dev/pkg/policy" "next.orly.dev/pkg/protocol/publish" "next.orly.dev/pkg/spider" + dsync "next.orly.dev/pkg/sync" ) func Run( @@ -116,6 +117,27 @@ func Run( } } + // Initialize sync manager if relay peers are configured + if len(cfg.RelayPeers) > 0 { + // Get relay identity for node ID + sk, err := db.GetOrCreateRelayIdentitySecret() + if err != nil { + log.E.F("failed to get relay identity for sync: %v", err) + } else { + nodeID, err := keys.SecretBytesToPubKeyHex(sk) + if err != nil { + log.E.F("failed to derive pubkey for sync node ID: %v", err) + } else { + relayURL := cfg.RelayURL + if relayURL == "" { + relayURL = fmt.Sprintf("http://localhost:%d", cfg.Port) + } + l.syncManager = dsync.NewManager(ctx, db, nodeID, relayURL, cfg.RelayPeers) + log.I.F("distributed sync manager initialized with %d peers", len(cfg.RelayPeers)) + } + } + } + // Initialize the user interface l.UserInterface() diff --git a/app/server.go b/app/server.go index 1fd9cf8..ce9a03a 100644 --- a/app/server.go +++ b/app/server.go @@ -27,6 +27,7 @@ import ( "next.orly.dev/pkg/protocol/httpauth" "next.orly.dev/pkg/protocol/publish" "next.orly.dev/pkg/spider" + dsync "next.orly.dev/pkg/sync" blossom "next.orly.dev/pkg/blossom" ) @@ -50,6 +51,7 @@ type Server struct { sprocketManager *SprocketManager policyManager *policy.P spiderManager *spider.Spider + syncManager *dsync.Manager blossomServer *blossom.Server } @@ -243,7 +245,14 @@ func (s *Server) UserInterface() { s.mux.HandleFunc("/api/nip86", s.handleNIP86Management) // ACL mode endpoint s.mux.HandleFunc("/api/acl-mode", s.handleACLMode) - + + // Sync endpoints for distributed synchronization + if s.syncManager != nil { + s.mux.HandleFunc("/api/sync/current", s.handleSyncCurrent) + s.mux.HandleFunc("/api/sync/fetch", s.handleSyncFetch) + log.Printf("Distributed sync API enabled at /api/sync") + } + // Blossom blob storage API endpoint if s.blossomServer != nil { s.mux.HandleFunc("/blossom/", s.blossomHandler) @@ -990,3 +999,70 @@ func (s *Server) handleACLMode(w http.ResponseWriter, r *http.Request) { w.Write(jsonData) } + +// handleSyncCurrent handles requests for the current serial number +func (s *Server) handleSyncCurrent(w http.ResponseWriter, r *http.Request) { + if s.syncManager == nil { + http.Error(w, "Sync manager not initialized", http.StatusServiceUnavailable) + return + } + + // Validate NIP-98 authentication and check peer authorization + if !s.validatePeerRequest(w, r) { + return + } + + s.syncManager.HandleCurrentRequest(w, r) +} + +// handleSyncFetch handles requests for events in a serial range +func (s *Server) handleSyncFetch(w http.ResponseWriter, r *http.Request) { + if s.syncManager == nil { + http.Error(w, "Sync manager not initialized", http.StatusServiceUnavailable) + return + } + + // Validate NIP-98 authentication and check peer authorization + if !s.validatePeerRequest(w, r) { + return + } + + s.syncManager.HandleFetchRequest(w, r) +} + +// validatePeerRequest validates NIP-98 authentication and checks if the requesting peer is authorized +func (s *Server) validatePeerRequest(w http.ResponseWriter, r *http.Request) bool { + // Validate NIP-98 authentication + valid, pubkey, err := httpauth.CheckAuth(r) + if err != nil { + log.Printf("NIP-98 auth validation error: %v", err) + http.Error(w, "Authentication validation failed", http.StatusUnauthorized) + return false + } + if !valid { + http.Error(w, "NIP-98 authentication required", http.StatusUnauthorized) + return false + } + + // Check if this pubkey corresponds to a configured peer relay + peerPubkeyHex := hex.Enc(pubkey) + for range s.Config.RelayPeers { + // Extract pubkey from peer URL (assuming format: https://relay.example.com@pubkey) + // For now, check if the pubkey matches any configured admin/owner + // TODO: Implement proper peer identity mapping + for _, admin := range s.Admins { + if hex.Enc(admin) == peerPubkeyHex { + return true + } + } + for _, owner := range s.Owners { + if hex.Enc(owner) == peerPubkeyHex { + return true + } + } + } + + log.Printf("Unauthorized sync request from pubkey: %s", peerPubkeyHex) + http.Error(w, "Unauthorized peer", http.StatusForbidden) + return false +} diff --git a/pkg/database/import.go b/pkg/database/import.go index 61a6568..8e5470c 100644 --- a/pkg/database/import.go +++ b/pkg/database/import.go @@ -1,86 +1,17 @@ package database import ( - "bufio" "io" - "os" - "runtime/debug" "lol.mleku.dev/chk" "lol.mleku.dev/log" - "next.orly.dev/pkg/encoders/event" ) -const maxLen = 500000000 - // Import a collection of events in line structured minified JSON format (JSONL). func (d *D) Import(rr io.Reader) { - // store to disk so we can return fast - tmpPath := os.TempDir() + string(os.PathSeparator) + "orly" - os.MkdirAll(tmpPath, 0700) - tmp, err := os.CreateTemp(tmpPath, "") - if chk.E(err) { - return - } - log.I.F("buffering upload to %s", tmp.Name()) - if _, err = io.Copy(tmp, rr); chk.E(err) { - return - } - if _, err = tmp.Seek(0, 0); chk.E(err) { - return - } - go func() { - var err error - // Create a scanner to read the buffer line by line - scan := bufio.NewScanner(tmp) - scanBuf := make([]byte, maxLen) - scan.Buffer(scanBuf, maxLen) - - var count, total int - for scan.Scan() { - select { - case <-d.ctx.Done(): - log.I.F("context closed") - return - default: - } - - b := scan.Bytes() - total += len(b) + 1 - if len(b) < 1 { - continue - } - - ev := event.New() - if _, err = ev.Unmarshal(b); err != nil { - // return the pooled buffer on error - ev.Free() - continue - } - - if _, err = d.SaveEvent(d.ctx, ev); err != nil { - // return the pooled buffer on error paths too - ev.Free() - continue - } - - // return the pooled buffer after successful save - ev.Free() - b = nil - count++ - if count%100 == 0 { - log.I.F("received %d events", count) - debug.FreeOSMemory() - } + if err := d.ImportEventsFromReader(d.ctx, rr); chk.E(err) { + log.E.F("import failed: %v", err) } - - log.I.F("read %d bytes and saved %d events", total, count) - err = scan.Err() - if chk.E(err) { - } - - // Help garbage collection - tmp = nil }() } diff --git a/pkg/database/import_utils.go b/pkg/database/import_utils.go new file mode 100644 index 0000000..433bfe8 --- /dev/null +++ b/pkg/database/import_utils.go @@ -0,0 +1,101 @@ +// Package database provides shared import utilities for events +package database + +import ( + "bufio" + "context" + "io" + "os" + "runtime/debug" + "strings" + + "lol.mleku.dev/chk" + "lol.mleku.dev/log" + "next.orly.dev/pkg/encoders/event" +) + +const maxLen = 500000000 + +// ImportEventsFromReader imports events from an io.Reader containing JSONL data +func (d *D) ImportEventsFromReader(ctx context.Context, rr io.Reader) error { + // store to disk so we can return fast + tmpPath := os.TempDir() + string(os.PathSeparator) + "orly" + os.MkdirAll(tmpPath, 0700) + tmp, err := os.CreateTemp(tmpPath, "") + if chk.E(err) { + return err + } + defer os.Remove(tmp.Name()) // Clean up temp file when done + + log.I.F("buffering upload to %s", tmp.Name()) + if _, err = io.Copy(tmp, rr); chk.E(err) { + return err + } + if _, err = tmp.Seek(0, 0); chk.E(err) { + return err + } + + return d.processJSONLEvents(ctx, tmp) +} + +// ImportEventsFromStrings imports events from a slice of JSON strings +func (d *D) ImportEventsFromStrings(ctx context.Context, eventJSONs []string) error { + // Create a reader from the string slice + reader := strings.NewReader(strings.Join(eventJSONs, "\n")) + return d.processJSONLEvents(ctx, reader) +} + +// processJSONLEvents processes JSONL events from a reader +func (d *D) processJSONLEvents(ctx context.Context, rr io.Reader) error { + // Create a scanner to read the buffer line by line + scan := bufio.NewScanner(rr) + scanBuf := make([]byte, maxLen) + scan.Buffer(scanBuf, maxLen) + + var count, total int + for scan.Scan() { + select { + case <-ctx.Done(): + log.I.F("context closed") + return ctx.Err() + default: + } + + b := scan.Bytes() + total += len(b) + 1 + if len(b) < 1 { + continue + } + + ev := event.New() + if _, err := ev.Unmarshal(b); err != nil { + // return the pooled buffer on error + ev.Free() + log.W.F("failed to unmarshal event: %v", err) + continue + } + + if _, err := d.SaveEvent(ctx, ev); err != nil { + // return the pooled buffer on error paths too + ev.Free() + log.W.F("failed to save event: %v", err) + continue + } + + // return the pooled buffer after successful save + ev.Free() + b = nil + count++ + if count%100 == 0 { + log.I.F("processed %d events", count) + debug.FreeOSMemory() + } + } + + log.I.F("read %d bytes and saved %d events", total, count) + if err := scan.Err(); err != nil { + return err + } + + return nil +} diff --git a/pkg/protocol/blossom/blossom b/pkg/protocol/blossom/blossom new file mode 160000 index 0000000..e8d0a1e --- /dev/null +++ b/pkg/protocol/blossom/blossom @@ -0,0 +1 @@ +Subproject commit e8d0a1ec443709c040b292199b66c378a079c5d6 diff --git a/pkg/sync/manager.go b/pkg/sync/manager.go new file mode 100644 index 0000000..89d6945 --- /dev/null +++ b/pkg/sync/manager.go @@ -0,0 +1,288 @@ +package sync + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" + "strconv" + "sync" + "time" + + "lol.mleku.dev/log" + "next.orly.dev/pkg/database" +) + +// Manager handles distributed synchronization between relay peers using serial numbers as clocks +type Manager struct { + ctx context.Context + cancel context.CancelFunc + db *database.D + nodeID string + relayURL string + peers []string + currentSerial uint64 + peerSerials map[string]uint64 // peer URL -> latest serial seen + mutex sync.RWMutex +} + +// CurrentRequest represents a request for the current serial number +type CurrentRequest struct { + NodeID string `json:"node_id"` + RelayURL string `json:"relay_url"` +} + +// CurrentResponse returns the current serial number +type CurrentResponse struct { + NodeID string `json:"node_id"` + RelayURL string `json:"relay_url"` + Serial uint64 `json:"serial"` +} + +// FetchRequest represents a request for events in a serial range +type FetchRequest struct { + NodeID string `json:"node_id"` + RelayURL string `json:"relay_url"` + From uint64 `json:"from"` + To uint64 `json:"to"` +} + +// FetchResponse contains the requested events as JSONL +type FetchResponse struct { + Events []string `json:"events"` // JSONL formatted events +} + +// NewManager creates a new sync manager +func NewManager(ctx context.Context, db *database.D, nodeID, relayURL string, peers []string) *Manager { + ctx, cancel := context.WithCancel(ctx) + + m := &Manager{ + ctx: ctx, + cancel: cancel, + db: db, + nodeID: nodeID, + relayURL: relayURL, + peers: peers, + currentSerial: 0, + peerSerials: make(map[string]uint64), + } + + // Start sync routine + go m.syncRoutine() + + return m +} + +// Stop stops the sync manager +func (m *Manager) Stop() { + m.cancel() +} + +// GetCurrentSerial returns the current serial number +func (m *Manager) GetCurrentSerial() uint64 { + m.mutex.RLock() + defer m.mutex.RUnlock() + return m.currentSerial +} + +// UpdateSerial updates the current serial number when a new event is stored +func (m *Manager) UpdateSerial() { + m.mutex.Lock() + defer m.mutex.Unlock() + + // Get the latest serial from database + if latest, err := m.getLatestSerial(); err == nil { + m.currentSerial = latest + } +} + +// getLatestSerial gets the latest serial number from the database +func (m *Manager) getLatestSerial() (uint64, error) { + // This is a simplified implementation + // In practice, you'd want to track the highest serial number + // For now, return the current serial + return m.currentSerial, nil +} + +// syncRoutine periodically syncs with peers +func (m *Manager) syncRoutine() { + ticker := time.NewTicker(5 * time.Second) // Sync every 5 seconds + defer ticker.Stop() + + for { + select { + case <-m.ctx.Done(): + return + case <-ticker.C: + m.syncWithPeers() + } + } +} + +// syncWithPeers syncs with all configured peers +func (m *Manager) syncWithPeers() { + for _, peerURL := range m.peers { + go m.syncWithPeer(peerURL) + } +} + +// syncWithPeer syncs with a specific peer +func (m *Manager) syncWithPeer(peerURL string) { + // Get the peer's current serial + currentReq := CurrentRequest{ + NodeID: m.nodeID, + RelayURL: m.relayURL, + } + + jsonData, err := json.Marshal(currentReq) + if err != nil { + log.E.F("failed to marshal current request: %v", err) + return + } + + resp, err := http.Post(peerURL+"/api/sync/current", "application/json", bytes.NewBuffer(jsonData)) + if err != nil { + log.D.F("failed to get current serial from %s: %v", peerURL, err) + return + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + log.D.F("current request failed with %s: status %d", peerURL, resp.StatusCode) + return + } + + var currentResp CurrentResponse + if err := json.NewDecoder(resp.Body).Decode(¤tResp); err != nil { + log.E.F("failed to decode current response from %s: %v", peerURL, err) + return + } + + // Check if we need to sync + peerSerial := currentResp.Serial + ourLastSeen := m.peerSerials[peerURL] + + if peerSerial > ourLastSeen { + // Request missing events + m.requestEvents(peerURL, ourLastSeen+1, peerSerial) + // Update our knowledge of peer's serial + m.mutex.Lock() + m.peerSerials[peerURL] = peerSerial + m.mutex.Unlock() + } +} + +// requestEvents requests a range of events from a peer +func (m *Manager) requestEvents(peerURL string, from, to uint64) { + req := FetchRequest{ + NodeID: m.nodeID, + RelayURL: m.relayURL, + From: from, + To: to, + } + + jsonData, err := json.Marshal(req) + if err != nil { + log.E.F("failed to marshal fetch request: %v", err) + return + } + + resp, err := http.Post(peerURL+"/api/sync/fetch", "application/json", bytes.NewBuffer(jsonData)) + if err != nil { + log.E.F("failed to request events from %s: %v", peerURL, err) + return + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + log.E.F("fetch request failed with %s: status %d", peerURL, resp.StatusCode) + return + } + + var fetchResp FetchResponse + if err := json.NewDecoder(resp.Body).Decode(&fetchResp); err != nil { + log.E.F("failed to decode fetch response from %s: %v", peerURL, err) + return + } + + // Import the received events + if len(fetchResp.Events) > 0 { + if err := m.db.ImportEventsFromStrings(context.Background(), fetchResp.Events); err != nil { + log.E.F("failed to import events from %s: %v", peerURL, err) + return + } + log.I.F("imported %d events from peer %s", len(fetchResp.Events), peerURL) + } +} + +// getEventsBySerialRange retrieves events by serial range from the database as JSONL +func (m *Manager) getEventsBySerialRange(from, to uint64) ([]string, error) { + var events []string + + // Get event serials by serial range + serials, err := m.db.EventIdsBySerial(from, int(to-from+1)) + if err != nil { + return nil, err + } + + // TODO: For each serial, retrieve the actual event and marshal to JSONL + // For now, return serial numbers as placeholder JSON strings + for _, serial := range serials { + // This should be replaced with actual event JSON marshalling + events = append(events, `{"serial":`+strconv.FormatUint(serial, 10)+`}`) + } + + return events, nil +} + +// HandleCurrentRequest handles requests for current serial number +func (m *Manager) HandleCurrentRequest(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + var req CurrentRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, "Invalid JSON", http.StatusBadRequest) + return + } + + resp := CurrentResponse{ + NodeID: m.nodeID, + RelayURL: m.relayURL, + Serial: m.GetCurrentSerial(), + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(resp) +} + +// HandleFetchRequest handles requests for events in a serial range +func (m *Manager) HandleFetchRequest(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + var req FetchRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, "Invalid JSON", http.StatusBadRequest) + return + } + + // Get events in the requested range + events, err := m.getEventsBySerialRange(req.From, req.To) + if err != nil { + http.Error(w, fmt.Sprintf("Failed to get events: %v", err), http.StatusInternalServerError) + return + } + + resp := FetchResponse{ + Events: events, + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(resp) +} diff --git a/pkg/version/version b/pkg/version/version index 670fd19..a283725 100644 --- a/pkg/version/version +++ b/pkg/version/version @@ -1 +1 @@ -v0.23.4 \ No newline at end of file +v0.24.0 \ No newline at end of file