package sync import ( "bytes" "context" "encoding/json" "fmt" "net/http" "strings" "sync" "time" "lol.mleku.dev/log" "next.orly.dev/pkg/database" "next.orly.dev/pkg/encoders/event" "next.orly.dev/pkg/encoders/filter" "next.orly.dev/pkg/encoders/hex" "next.orly.dev/pkg/encoders/tag" ) // Manager handles distributed synchronization between relay peers using serial numbers as clocks type Manager struct { ctx context.Context cancel context.CancelFunc db *database.D nodeID string relayURL string peers []string currentSerial uint64 peerSerials map[string]uint64 // peer URL -> latest serial seen relayGroupMgr *RelayGroupManager nip11Cache *NIP11Cache policyManager interface{ CheckPolicy(action string, ev *event.E, pubkey []byte, remote string) (bool, error) } mutex sync.RWMutex } // CurrentRequest represents a request for the current serial number type CurrentRequest struct { NodeID string `json:"node_id"` RelayURL string `json:"relay_url"` } // CurrentResponse returns the current serial number type CurrentResponse struct { NodeID string `json:"node_id"` RelayURL string `json:"relay_url"` Serial uint64 `json:"serial"` } // EventIDsRequest represents a request for event IDs with serials type EventIDsRequest struct { NodeID string `json:"node_id"` RelayURL string `json:"relay_url"` From uint64 `json:"from"` To uint64 `json:"to"` } // EventIDsResponse contains event IDs mapped to their serial numbers type EventIDsResponse struct { EventMap map[string]uint64 `json:"event_map"` // event_id -> serial } // NewManager creates a new sync manager func NewManager(ctx context.Context, db *database.D, nodeID, relayURL string, peers []string, relayGroupMgr *RelayGroupManager, policyManager interface{ CheckPolicy(action string, ev *event.E, pubkey []byte, remote string) (bool, error) }) *Manager { ctx, cancel := context.WithCancel(ctx) m := &Manager{ ctx: ctx, cancel: cancel, db: db, nodeID: nodeID, relayURL: relayURL, peers: peers, currentSerial: 0, peerSerials: make(map[string]uint64), relayGroupMgr: relayGroupMgr, nip11Cache: NewNIP11Cache(30 * time.Minute), // Cache NIP-11 docs for 30 minutes policyManager: policyManager, } // Start sync routine go m.syncRoutine() return m } // Stop stops the sync manager func (m *Manager) Stop() { m.cancel() } // UpdatePeers updates the peer list from relay group configuration func (m *Manager) UpdatePeers(newPeers []string) { m.mutex.Lock() defer m.mutex.Unlock() m.peers = newPeers log.I.F("updated peer list to %d peers", len(newPeers)) } // IsAuthorizedPeer checks if a peer is authorized by validating its NIP-11 pubkey func (m *Manager) IsAuthorizedPeer(peerURL string, expectedPubkey string) bool { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() peerPubkey, err := m.nip11Cache.GetPubkey(ctx, peerURL) if err != nil { log.D.F("failed to fetch NIP-11 pubkey for %s: %v", peerURL, err) return false } return peerPubkey == expectedPubkey } // GetPeerPubkey fetches and caches the pubkey for a peer relay func (m *Manager) GetPeerPubkey(peerURL string) (string, error) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() return m.nip11Cache.GetPubkey(ctx, peerURL) } // GetCurrentSerial returns the current serial number func (m *Manager) GetCurrentSerial() uint64 { m.mutex.RLock() defer m.mutex.RUnlock() return m.currentSerial } // GetPeers returns a copy of the current peer list func (m *Manager) GetPeers() []string { m.mutex.RLock() defer m.mutex.RUnlock() peers := make([]string, len(m.peers)) copy(peers, m.peers) return peers } // UpdateSerial updates the current serial number when a new event is stored func (m *Manager) UpdateSerial() { m.mutex.Lock() defer m.mutex.Unlock() // Get the latest serial from database if latest, err := m.getLatestSerial(); err == nil { m.currentSerial = latest } } // getLatestSerial gets the latest serial number from the database func (m *Manager) getLatestSerial() (uint64, error) { // This is a simplified implementation // In practice, you'd want to track the highest serial number // For now, return the current serial return m.currentSerial, nil } // syncRoutine periodically syncs with peers sequentially func (m *Manager) syncRoutine() { ticker := time.NewTicker(5 * time.Second) // Sync every 5 seconds defer ticker.Stop() for { select { case <-m.ctx.Done(): return case <-ticker.C: m.syncWithPeersSequentially() } } } // syncWithPeersSequentially syncs with all configured peers one at a time func (m *Manager) syncWithPeersSequentially() { for _, peerURL := range m.peers { m.syncWithPeer(peerURL) // Small delay between peers to avoid overwhelming time.Sleep(100 * time.Millisecond) } } // syncWithPeer syncs with a specific peer func (m *Manager) syncWithPeer(peerURL string) { // Get the peer's current serial currentReq := CurrentRequest{ NodeID: m.nodeID, RelayURL: m.relayURL, } jsonData, err := json.Marshal(currentReq) if err != nil { log.E.F("failed to marshal current request: %v", err) return } resp, err := http.Post(peerURL+"/api/sync/current", "application/json", bytes.NewBuffer(jsonData)) if err != nil { log.D.F("failed to get current serial from %s: %v", peerURL, err) return } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { log.D.F("current request failed with %s: status %d", peerURL, resp.StatusCode) return } var currentResp CurrentResponse if err := json.NewDecoder(resp.Body).Decode(¤tResp); err != nil { log.E.F("failed to decode current response from %s: %v", peerURL, err) return } // Check if we need to sync peerSerial := currentResp.Serial ourLastSeen := m.peerSerials[peerURL] if peerSerial > ourLastSeen { // Request event IDs for the missing range m.requestEventIDs(peerURL, ourLastSeen+1, peerSerial) // Update our knowledge of peer's serial m.mutex.Lock() m.peerSerials[peerURL] = peerSerial m.mutex.Unlock() } } // requestEventIDs requests event IDs for a serial range from a peer func (m *Manager) requestEventIDs(peerURL string, from, to uint64) { req := EventIDsRequest{ NodeID: m.nodeID, RelayURL: m.relayURL, From: from, To: to, } jsonData, err := json.Marshal(req) if err != nil { log.E.F("failed to marshal event-ids request: %v", err) return } resp, err := http.Post(peerURL+"/api/sync/event-ids", "application/json", bytes.NewBuffer(jsonData)) if err != nil { log.E.F("failed to request event IDs from %s: %v", peerURL, err) return } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { log.E.F("event-ids request failed with %s: status %d", peerURL, resp.StatusCode) return } var eventIDsResp EventIDsResponse if err := json.NewDecoder(resp.Body).Decode(&eventIDsResp); err != nil { log.E.F("failed to decode event-ids response from %s: %v", peerURL, err) return } // Check which events we don't have and request them via websocket missingEventIDs := m.findMissingEventIDs(eventIDsResp.EventMap) if len(missingEventIDs) > 0 { m.requestEventsViaWebsocket(missingEventIDs) log.I.F("requested %d missing events from peer %s", len(missingEventIDs), peerURL) } } // findMissingEventIDs checks which event IDs we don't have locally func (m *Manager) findMissingEventIDs(eventMap map[string]uint64) []string { var missing []string for eventID := range eventMap { // Check if we have this event locally // This is a simplified check - in practice you'd query the database if !m.hasEventLocally(eventID) { missing = append(missing, eventID) } } return missing } // hasEventLocally checks if we have a specific event func (m *Manager) hasEventLocally(eventID string) bool { // Convert hex event ID to bytes eventIDBytes, err := hex.Dec(eventID) if err != nil { log.D.F("invalid event ID format: %s", eventID) return false } // Query for the event f := &filter.F{ Ids: tag.NewFromBytesSlice(eventIDBytes), } events, err := m.db.QueryEvents(context.Background(), f) if err != nil { log.D.F("error querying for event %s: %v", eventID, err) return false } return len(events) > 0 } // requestEventsViaWebsocket requests specific events via websocket from peers func (m *Manager) requestEventsViaWebsocket(eventIDs []string) { if len(eventIDs) == 0 { return } // Convert hex event IDs to bytes for websocket requests var eventIDBytes [][]byte for _, eventID := range eventIDs { if bytes, err := hex.Dec(eventID); err == nil { eventIDBytes = append(eventIDBytes, bytes) } } if len(eventIDBytes) == 0 { return } // TODO: Implement websocket connection and REQ message sending // For now, try to request from our peers via their websocket endpoints for _, peerURL := range m.peers { // Convert HTTP URL to WebSocket URL wsURL := strings.Replace(peerURL, "http://", "ws://", 1) wsURL = strings.Replace(wsURL, "https://", "wss://", 1) log.D.F("would connect to %s and request %d events", wsURL, len(eventIDBytes)) // Here we would: // 1. Establish websocket connection to peer // 2. Send NIP-98 auth if required // 3. Send REQ message with the filter for specific event IDs // 4. Receive and process EVENT messages // 5. Import received events } limit := 5 if len(eventIDs) < limit { limit = len(eventIDs) } log.I.F("requested %d events via websocket: %v", len(eventIDs), eventIDs[:limit]) } // min returns the minimum of two integers func min(a, b int) int { if a < b { return a } return b } // getEventsWithIDs retrieves events with their IDs by serial range func (m *Manager) getEventsWithIDs(from, to uint64) (map[string]uint64, error) { eventMap := make(map[string]uint64) // Get event serials by serial range serials, err := m.db.EventIdsBySerial(from, int(to-from+1)) if err != nil { return nil, err } // For each serial, we need to map it to an event ID // This is a simplified implementation - in practice we'd need to query events by serial for i, serial := range serials { // TODO: Implement actual event ID retrieval by serial // For now, create placeholder event IDs based on serial eventID := fmt.Sprintf("event_%d", serial) eventMap[eventID] = serial _ = i // avoid unused variable warning } return eventMap, nil } // HandleCurrentRequest handles requests for current serial number func (m *Manager) HandleCurrentRequest(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } var req CurrentRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { http.Error(w, "Invalid JSON", http.StatusBadRequest) return } resp := CurrentResponse{ NodeID: m.nodeID, RelayURL: m.relayURL, Serial: m.GetCurrentSerial(), } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(resp) } // HandleEventIDsRequest handles requests for event IDs with their serial numbers func (m *Manager) HandleEventIDsRequest(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } var req EventIDsRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { http.Error(w, "Invalid JSON", http.StatusBadRequest) return } // Get events with IDs in the requested range eventMap, err := m.getEventsWithIDs(req.From, req.To) if err != nil { http.Error(w, fmt.Sprintf("Failed to get event IDs: %v", err), http.StatusInternalServerError) return } resp := EventIDsResponse{ EventMap: eventMap, } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(resp) }