Add NIP-11 relay synchronization and group management features
Some checks failed
Go / build (push) Has been cancelled
Go / release (push) Has been cancelled

- Introduced a new `sync` package for managing NIP-11 relay information and relay group configurations.
- Implemented a cache for NIP-11 documents, allowing retrieval of relay public keys and authoritative configurations.
- Enhanced the sync manager to update peer lists based on authoritative configurations from relay group events.
- Updated event handling to incorporate policy checks during event imports, ensuring compliance with relay rules.
- Refactored various components to utilize the new `sha256-simd` package for improved performance.
- Added comprehensive tests to validate the new synchronization and group management functionalities.
- Bumped version to v0.24.1 to reflect these changes.
This commit is contained in:
2025-11-03 18:17:15 +00:00
parent e161d0e4be
commit e56bf76257
83 changed files with 3712 additions and 7417 deletions

View File

@@ -51,6 +51,7 @@ type C struct {
RelayURL string `env:"ORLY_RELAY_URL" usage:"base URL for the relay dashboard (e.g., https://relay.example.com)"`
RelayAddresses []string `env:"ORLY_RELAY_ADDRESSES" usage:"comma-separated list of websocket addresses for this relay (e.g., wss://relay.example.com,wss://backup.example.com)"`
RelayPeers []string `env:"ORLY_RELAY_PEERS" usage:"comma-separated list of peer relay URLs for distributed synchronization (e.g., https://peer1.example.com,https://peer2.example.com)"`
RelayGroupAdmins []string `env:"ORLY_RELAY_GROUP_ADMINS" usage:"comma-separated list of npubs authorized to publish relay group configuration events"`
FollowListFrequency time.Duration `env:"ORLY_FOLLOW_LIST_FREQUENCY" usage:"how often to fetch admin follow lists (default: 1h)" default:"1h"`
// Blossom blob storage service level settings

View File

@@ -136,8 +136,8 @@ func (l *Listener) HandleEvent(msg []byte) (err error) {
log.D.F("policy allowed event %0x", env.E.ID)
// Check ACL policy for managed ACL mode
if acl.Registry.Active.Load() == "managed" {
// Check ACL policy for managed ACL mode, but skip for peer relay sync events
if acl.Registry.Active.Load() == "managed" && !l.isPeerRelayPubkey(l.authedPubkey.Load()) {
allowed, aclErr := acl.Registry.CheckPolicy(env.E)
if chk.E(aclErr) {
log.E.F("ACL policy check failed: %v", aclErr)
@@ -456,6 +456,17 @@ func (l *Listener) HandleEvent(msg []byte) (err error) {
return
}
// Handle relay group configuration events
if l.relayGroupMgr != nil {
if err := l.relayGroupMgr.ValidateRelayGroupEvent(env.E); err != nil {
log.W.F("invalid relay group config event %s: %v", hex.Enc(env.E.ID), err)
}
// Process the event and potentially update peer lists
if l.syncManager != nil {
l.relayGroupMgr.HandleRelayGroupEvent(env.E, l.syncManager)
}
}
// Update serial for distributed synchronization
if l.syncManager != nil {
l.syncManager.UpdateSerial()
@@ -501,3 +512,21 @@ func (l *Listener) HandleEvent(msg []byte) (err error) {
}
return
}
// isPeerRelayPubkey checks if the given pubkey belongs to a peer relay
func (l *Listener) isPeerRelayPubkey(pubkey []byte) bool {
if l.syncManager == nil {
return false
}
peerPubkeyHex := hex.Enc(pubkey)
// Check if this pubkey matches any of our configured peer relays' NIP-11 pubkeys
for _, peerURL := range l.syncManager.GetPeers() {
if l.syncManager.IsAuthorizedPeer(peerURL, peerPubkeyHex) {
return true
}
}
return false
}

View File

@@ -78,19 +78,24 @@ whitelist:
defer conn.Close()
listener := &Listener{
ctx: ctx,
Server: s,
conn: conn,
remote: remote,
req: r,
startTime: time.Now(),
writeChan: make(chan publish.WriteRequest, 100), // Buffered channel for writes
writeDone: make(chan struct{}),
ctx: ctx,
Server: s,
conn: conn,
remote: remote,
req: r,
startTime: time.Now(),
writeChan: make(chan publish.WriteRequest, 100), // Buffered channel for writes
writeDone: make(chan struct{}),
messageQueue: make(chan messageRequest, 100), // Buffered channel for message processing
processingDone: make(chan struct{}),
}
// Start write worker goroutine
go listener.writeWorker()
// Start message processor goroutine
go listener.messageProcessor()
// Register write channel with publisher
if socketPub := listener.publishers.GetSocketPublisher(); socketPub != nil {
socketPub.SetWriteChan(conn, listener.writeChan)
@@ -140,9 +145,9 @@ whitelist:
// Log detailed connection statistics
dur := time.Since(listener.startTime)
log.D.F(
"ws connection closed %s: msgs=%d, REQs=%d, EVENTs=%d, duration=%v",
"ws connection closed %s: msgs=%d, REQs=%d, EVENTs=%d, dropped=%d, duration=%v",
remote, listener.msgCount, listener.reqCount, listener.eventCount,
dur,
listener.DroppedMessages(), dur,
)
// Log any remaining connection state
@@ -152,6 +157,11 @@ whitelist:
log.D.F("ws connection %s was not authenticated", remote)
}
// Close message queue to signal processor to exit
close(listener.messageQueue)
// Wait for message processor to finish
<-listener.processingDone
// Close write channel to signal worker to exit
close(listener.writeChan)
// Wait for write worker to finish
@@ -212,7 +222,11 @@ whitelist:
log.D.F("received large message from %s: %d bytes", remote, len(msg))
}
// log.T.F("received message from %s: %s", remote, string(msg))
listener.HandleMessage(msg, remote)
// Queue message for asynchronous processing
if !listener.QueueMessage(msg, remote) {
log.W.F("ws->%s message queue full, dropping message (capacity=%d)", remote, cap(listener.messageQueue))
}
}
}

View File

@@ -4,6 +4,7 @@ import (
"context"
"net/http"
"strings"
"sync/atomic"
"time"
"github.com/gorilla/websocket"
@@ -15,7 +16,7 @@ import (
"next.orly.dev/pkg/encoders/filter"
"next.orly.dev/pkg/protocol/publish"
"next.orly.dev/pkg/utils"
"next.orly.dev/pkg/utils/atomic"
atomicutils "next.orly.dev/pkg/utils/atomic"
)
type Listener struct {
@@ -24,25 +25,59 @@ type Listener struct {
ctx context.Context
remote string
req *http.Request
challenge atomic.Bytes
authedPubkey atomic.Bytes
challenge atomicutils.Bytes
authedPubkey atomicutils.Bytes
startTime time.Time
isBlacklisted bool // Marker to identify blacklisted IPs
blacklistTimeout time.Time // When to timeout blacklisted connections
writeChan chan publish.WriteRequest // Channel for write requests (back to queued approach)
writeDone chan struct{} // Closed when write worker exits
// Message processing queue for async handling
messageQueue chan messageRequest // Buffered channel for message processing
processingDone chan struct{} // Closed when message processor exits
// Flow control counters (atomic for concurrent access)
droppedMessages atomic.Int64 // Messages dropped due to full queue
// Diagnostics: per-connection counters
msgCount int
reqCount int
eventCount int
}
type messageRequest struct {
data []byte
remote string
}
// Ctx returns the listener's context, but creates a new context for each operation
// to prevent cancellation from affecting subsequent operations
func (l *Listener) Ctx() context.Context {
return l.ctx
}
// DroppedMessages returns the total number of messages that were dropped
// because the message processing queue was full.
func (l *Listener) DroppedMessages() int {
return int(l.droppedMessages.Load())
}
// RemainingCapacity returns the number of slots available in the message processing queue.
func (l *Listener) RemainingCapacity() int {
return cap(l.messageQueue) - len(l.messageQueue)
}
// QueueMessage queues a message for asynchronous processing.
// Returns true if the message was queued, false if the queue was full.
func (l *Listener) QueueMessage(data []byte, remote string) bool {
req := messageRequest{data: data, remote: remote}
select {
case l.messageQueue <- req:
return true
default:
l.droppedMessages.Add(1)
return false
}
}
func (l *Listener) Write(p []byte) (n int, err error) {
// Send write request to channel - non-blocking with timeout
@@ -136,6 +171,30 @@ func (l *Listener) writeWorker() {
}
}
// messageProcessor is the goroutine that processes messages asynchronously.
// This prevents the websocket read loop from blocking on message processing.
func (l *Listener) messageProcessor() {
defer func() {
close(l.processingDone)
}()
for {
select {
case <-l.ctx.Done():
log.D.F("ws->%s message processor context cancelled", l.remote)
return
case req, ok := <-l.messageQueue:
if !ok {
log.D.F("ws->%s message queue closed", l.remote)
return
}
// Process the message synchronously in this goroutine
l.HandleMessage(req.data, req.remote)
}
}
}
// getManagedACL returns the managed ACL instance if available
func (l *Listener) getManagedACL() *database.ManagedACL {
// Get the managed ACL instance from the ACL registry

View File

@@ -117,8 +117,22 @@ func Run(
}
}
// Initialize relay group manager
l.relayGroupMgr = dsync.NewRelayGroupManager(db, cfg.RelayGroupAdmins)
// Initialize sync manager if relay peers are configured
var peers []string
if len(cfg.RelayPeers) > 0 {
peers = cfg.RelayPeers
} else {
// Try to get peers from relay group configuration
if config, err := l.relayGroupMgr.FindAuthoritativeConfig(ctx); err == nil && config != nil {
peers = config.Relays
log.I.F("using relay group configuration with %d peers", len(peers))
}
}
if len(peers) > 0 {
// Get relay identity for node ID
sk, err := db.GetOrCreateRelayIdentitySecret()
if err != nil {
@@ -132,8 +146,8 @@ func Run(
if relayURL == "" {
relayURL = fmt.Sprintf("http://localhost:%d", cfg.Port)
}
l.syncManager = dsync.NewManager(ctx, db, nodeID, relayURL, cfg.RelayPeers)
log.I.F("distributed sync manager initialized with %d peers", len(cfg.RelayPeers))
l.syncManager = dsync.NewManager(ctx, db, nodeID, relayURL, peers, l.relayGroupMgr, l.policyManager)
log.I.F("distributed sync manager initialized with %d peers", len(peers))
}
}
}

View File

@@ -52,6 +52,7 @@ type Server struct {
policyManager *policy.P
spiderManager *spider.Spider
syncManager *dsync.Manager
relayGroupMgr *dsync.RelayGroupManager
blossomServer *blossom.Server
}
@@ -249,7 +250,7 @@ func (s *Server) UserInterface() {
// Sync endpoints for distributed synchronization
if s.syncManager != nil {
s.mux.HandleFunc("/api/sync/current", s.handleSyncCurrent)
s.mux.HandleFunc("/api/sync/fetch", s.handleSyncFetch)
s.mux.HandleFunc("/api/sync/event-ids", s.handleSyncEventIDs)
log.Printf("Distributed sync API enabled at /api/sync")
}
@@ -1015,8 +1016,8 @@ func (s *Server) handleSyncCurrent(w http.ResponseWriter, r *http.Request) {
s.syncManager.HandleCurrentRequest(w, r)
}
// handleSyncFetch handles requests for events in a serial range
func (s *Server) handleSyncFetch(w http.ResponseWriter, r *http.Request) {
// handleSyncEventIDs handles requests for event IDs with their serial numbers
func (s *Server) handleSyncEventIDs(w http.ResponseWriter, r *http.Request) {
if s.syncManager == nil {
http.Error(w, "Sync manager not initialized", http.StatusServiceUnavailable)
return
@@ -1027,7 +1028,7 @@ func (s *Server) handleSyncFetch(w http.ResponseWriter, r *http.Request) {
return
}
s.syncManager.HandleFetchRequest(w, r)
s.syncManager.HandleEventIDsRequest(w, r)
}
// validatePeerRequest validates NIP-98 authentication and checks if the requesting peer is authorized
@@ -1044,21 +1045,22 @@ func (s *Server) validatePeerRequest(w http.ResponseWriter, r *http.Request) boo
return false
}
// Check if this pubkey corresponds to a configured peer relay
if s.syncManager == nil {
log.Printf("Sync manager not available for peer validation")
http.Error(w, "Service unavailable", http.StatusServiceUnavailable)
return false
}
// Extract the relay URL from the request (this should be in the request body)
// For now, we'll check against all configured peers
peerPubkeyHex := hex.Enc(pubkey)
for range s.Config.RelayPeers {
// Extract pubkey from peer URL (assuming format: https://relay.example.com@pubkey)
// For now, check if the pubkey matches any configured admin/owner
// TODO: Implement proper peer identity mapping
for _, admin := range s.Admins {
if hex.Enc(admin) == peerPubkeyHex {
return true
}
}
for _, owner := range s.Owners {
if hex.Enc(owner) == peerPubkeyHex {
return true
}
// Check if this pubkey matches any of our configured peer relays' NIP-11 pubkeys
for _, peerURL := range s.syncManager.GetPeers() {
if s.syncManager.IsAuthorizedPeer(peerURL, peerPubkeyHex) {
// Also update ACL to grant admin access to this peer pubkey
s.updatePeerAdminACL(pubkey)
return true
}
}
@@ -1066,3 +1068,23 @@ func (s *Server) validatePeerRequest(w http.ResponseWriter, r *http.Request) boo
http.Error(w, "Unauthorized peer", http.StatusForbidden)
return false
}
// updatePeerAdminACL grants admin access to peer relay identity pubkeys
func (s *Server) updatePeerAdminACL(peerPubkey []byte) {
// Find the managed ACL instance and update peer admins
for _, aclInstance := range acl.Registry.ACL {
if aclInstance.Type() == "managed" {
if managed, ok := aclInstance.(*acl.Managed); ok {
// Collect all current peer pubkeys
var peerPubkeys [][]byte
for _, peerURL := range s.syncManager.GetPeers() {
if pubkey, err := s.syncManager.GetPeerPubkey(peerURL); err == nil {
peerPubkeys = append(peerPubkeys, []byte(pubkey))
}
}
managed.UpdatePeerAdmins(peerPubkeys)
break
}
}
}
}