Add cluster replication configuration and enhance event handling
- Introduced support for cluster replication in the ORLY system, allowing for distributed relay clusters with active replication. - Updated the configuration to include a new option for propagating privileged events to relay peers. - Enhanced the `ClusterManager` to manage event propagation based on the new configuration setting. - Improved the handling of event fetching to respect the propagation settings, ensuring better privacy for privileged events. - Updated documentation to reflect the new cluster replication features and privacy considerations. - Bumped version to v0.24.3 to reflect these changes.
This commit is contained in:
@@ -14,18 +14,22 @@ import (
|
||||
"next.orly.dev/pkg/database"
|
||||
"next.orly.dev/pkg/database/indexes/types"
|
||||
"next.orly.dev/pkg/encoders/event"
|
||||
"next.orly.dev/pkg/encoders/hex"
|
||||
"next.orly.dev/pkg/encoders/kind"
|
||||
)
|
||||
|
||||
type ClusterManager struct {
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
db *database.D
|
||||
adminNpubs []string
|
||||
members map[string]*ClusterMember // keyed by relay URL
|
||||
membersMux sync.RWMutex
|
||||
pollTicker *time.Ticker
|
||||
pollDone chan struct{}
|
||||
httpClient *http.Client
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
db *database.D
|
||||
adminNpubs []string
|
||||
members map[string]*ClusterMember // keyed by relay URL
|
||||
membersMux sync.RWMutex
|
||||
pollTicker *time.Ticker
|
||||
pollDone chan struct{}
|
||||
httpClient *http.Client
|
||||
propagatePrivilegedEvents bool
|
||||
publisher interface{ Deliver(*event.E) }
|
||||
}
|
||||
|
||||
type ClusterMember struct {
|
||||
@@ -54,16 +58,18 @@ type EventInfo struct {
|
||||
Timestamp int64 `json:"timestamp"`
|
||||
}
|
||||
|
||||
func NewClusterManager(ctx context.Context, db *database.D, adminNpubs []string) *ClusterManager {
|
||||
func NewClusterManager(ctx context.Context, db *database.D, adminNpubs []string, propagatePrivilegedEvents bool, publisher interface{ Deliver(*event.E) }) *ClusterManager {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
|
||||
cm := &ClusterManager{
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
db: db,
|
||||
adminNpubs: adminNpubs,
|
||||
members: make(map[string]*ClusterMember),
|
||||
pollDone: make(chan struct{}),
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
db: db,
|
||||
adminNpubs: adminNpubs,
|
||||
members: make(map[string]*ClusterMember),
|
||||
pollDone: make(chan struct{}),
|
||||
propagatePrivilegedEvents: propagatePrivilegedEvents,
|
||||
publisher: publisher,
|
||||
httpClient: &http.Client{
|
||||
Timeout: 30 * time.Second,
|
||||
},
|
||||
@@ -146,17 +152,17 @@ func (cm *ClusterManager) pollMember(member *ClusterMember) {
|
||||
return
|
||||
}
|
||||
|
||||
// Process fetched events
|
||||
for _, eventInfo := range eventsResp.Events {
|
||||
if cm.shouldFetchEvent(eventInfo) {
|
||||
// Fetch full event via WebSocket and store it
|
||||
if err := cm.fetchAndStoreEvent(member.WebSocketURL, eventInfo.ID); err != nil {
|
||||
log.W.F("failed to fetch/store event %s from %s: %v", eventInfo.ID, member.HTTPURL, err)
|
||||
} else {
|
||||
log.D.F("successfully replicated event %s from %s", eventInfo.ID, member.HTTPURL)
|
||||
// Process fetched events
|
||||
for _, eventInfo := range eventsResp.Events {
|
||||
if cm.shouldFetchEvent(eventInfo) {
|
||||
// Fetch full event via WebSocket and store it
|
||||
if err := cm.fetchAndStoreEvent(member.WebSocketURL, eventInfo.ID, cm.publisher); err != nil {
|
||||
log.W.F("failed to fetch/store event %s from %s: %v", eventInfo.ID, member.HTTPURL, err)
|
||||
} else {
|
||||
log.D.F("successfully replicated event %s from %s", eventInfo.ID, member.HTTPURL)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Update last serial if we processed all events
|
||||
if !eventsResp.HasMore && member.LastSerial != to {
|
||||
@@ -417,17 +423,80 @@ func (cm *ClusterManager) getEventsInRangeFromDB(from, to uint64, limit int) ([]
|
||||
}
|
||||
|
||||
// Query events by serial range
|
||||
// This is a simplified implementation - in practice you'd need to use the proper indexing
|
||||
err := cm.db.View(func(txn *badger.Txn) error {
|
||||
// For now, return empty results as this requires more complex indexing logic
|
||||
// TODO: Implement proper serial range querying using database indexes
|
||||
// Iterate through event keys in the database
|
||||
it := txn.NewIterator(badger.IteratorOptions{
|
||||
Prefix: []byte{0}, // Event keys start with 0
|
||||
})
|
||||
defer it.Close()
|
||||
|
||||
count := 0
|
||||
it.Seek([]byte{0})
|
||||
|
||||
for it.Valid() && count < limit {
|
||||
key := it.Item().Key()
|
||||
|
||||
// Check if this is an event key (starts with event prefix)
|
||||
if len(key) >= 8 && key[0] == 0 && key[1] == 0 && key[2] == 0 {
|
||||
// Extract serial from the last 5 bytes (Uint40)
|
||||
if len(key) >= 8 {
|
||||
serial := binary.BigEndian.Uint64(key[len(key)-8:]) >> 24 // Convert from Uint40
|
||||
|
||||
// Check if serial is in range
|
||||
if serial >= from && serial <= to {
|
||||
// Fetch the full event to check if it's privileged
|
||||
serial40 := &types.Uint40{}
|
||||
if err := serial40.Set(serial); err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
ev, err := cm.db.FetchEventBySerial(serial40)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// Check if we should propagate this event
|
||||
shouldPropagate := true
|
||||
if !cm.propagatePrivilegedEvents && kind.IsPrivileged(ev.Kind) {
|
||||
shouldPropagate = false
|
||||
}
|
||||
|
||||
if shouldPropagate {
|
||||
events = append(events, EventInfo{
|
||||
Serial: serial,
|
||||
ID: hex.Enc(ev.ID),
|
||||
Timestamp: ev.CreatedAt,
|
||||
})
|
||||
count++
|
||||
}
|
||||
|
||||
// Free the event
|
||||
ev.Free()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
it.Next()
|
||||
}
|
||||
|
||||
// Check if there are more events
|
||||
if it.Valid() {
|
||||
hasMore = true
|
||||
// Try to get the next serial
|
||||
nextKey := it.Item().Key()
|
||||
if len(nextKey) >= 8 && nextKey[0] == 0 && nextKey[1] == 0 && nextKey[2] == 0 {
|
||||
nextSerial := binary.BigEndian.Uint64(nextKey[len(nextKey)-8:]) >> 24
|
||||
nextFrom = nextSerial
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
return events, hasMore, nextFrom, err
|
||||
}
|
||||
|
||||
func (cm *ClusterManager) fetchAndStoreEvent(wsURL, eventID string) error {
|
||||
func (cm *ClusterManager) fetchAndStoreEvent(wsURL, eventID string, publisher interface{ Deliver(*event.E) }) error {
|
||||
// TODO: Implement WebSocket connection and event fetching
|
||||
// For now, this is a placeholder that assumes the event can be fetched
|
||||
// In a full implementation, this would:
|
||||
@@ -435,9 +504,18 @@ func (cm *ClusterManager) fetchAndStoreEvent(wsURL, eventID string) error {
|
||||
// 2. Send a REQ message for the specific event ID
|
||||
// 3. Receive the EVENT message
|
||||
// 4. Validate and store the event in the local database
|
||||
// 5. Propagate the event to subscribers via the publisher
|
||||
|
||||
// Placeholder - mark as not implemented for now
|
||||
log.D.F("fetchAndStoreEvent called for %s from %s (placeholder implementation)", eventID, wsURL)
|
||||
|
||||
// Note: When implementing the full WebSocket fetching logic, after storing the event,
|
||||
// the publisher should be called like this:
|
||||
// if publisher != nil {
|
||||
// clonedEvent := fetchedEvent.Clone()
|
||||
// go publisher.Deliver(clonedEvent)
|
||||
// }
|
||||
|
||||
return nil // Return success for now
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user