forked from mleku/next.orly.dev
Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
e161d0e4be
|
|||
|
ed412dcb7e
|
@@ -50,6 +50,7 @@ type C struct {
|
||||
MonthlyPriceSats int64 `env:"ORLY_MONTHLY_PRICE_SATS" default:"6000" usage:"price in satoshis for one month subscription (default ~$2 USD)"`
|
||||
RelayURL string `env:"ORLY_RELAY_URL" usage:"base URL for the relay dashboard (e.g., https://relay.example.com)"`
|
||||
RelayAddresses []string `env:"ORLY_RELAY_ADDRESSES" usage:"comma-separated list of websocket addresses for this relay (e.g., wss://relay.example.com,wss://backup.example.com)"`
|
||||
RelayPeers []string `env:"ORLY_RELAY_PEERS" usage:"comma-separated list of peer relay URLs for distributed synchronization (e.g., https://peer1.example.com,https://peer2.example.com)"`
|
||||
FollowListFrequency time.Duration `env:"ORLY_FOLLOW_LIST_FREQUENCY" usage:"how often to fetch admin follow lists (default: 1h)" default:"1h"`
|
||||
|
||||
// Blossom blob storage service level settings
|
||||
|
||||
@@ -455,6 +455,12 @@ func (l *Listener) HandleEvent(msg []byte) (err error) {
|
||||
chk.E(err)
|
||||
return
|
||||
}
|
||||
|
||||
// Update serial for distributed synchronization
|
||||
if l.syncManager != nil {
|
||||
l.syncManager.UpdateSerial()
|
||||
log.D.F("updated serial for event %s", hex.Enc(env.E.ID))
|
||||
}
|
||||
// Send a success response storing
|
||||
if err = Ok.Ok(l, env, ""); chk.E(err) {
|
||||
return
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
"lol.mleku.dev/log"
|
||||
"next.orly.dev/pkg/encoders/envelopes/authenvelope"
|
||||
"next.orly.dev/pkg/encoders/hex"
|
||||
"next.orly.dev/pkg/protocol/publish"
|
||||
"next.orly.dev/pkg/utils/units"
|
||||
)
|
||||
|
||||
@@ -20,7 +21,7 @@ const (
|
||||
DefaultPongWait = 60 * time.Second
|
||||
DefaultPingWait = DefaultPongWait / 2
|
||||
DefaultWriteTimeout = 3 * time.Second
|
||||
DefaultMaxMessageSize = 100 * units.Mb
|
||||
DefaultMaxMessageSize = 512000 // Match khatru's MaxMessageSize
|
||||
// ClientMessageSizeLimit is the maximum message size that clients can handle
|
||||
// This is set to 100MB to allow large messages
|
||||
ClientMessageSizeLimit = 100 * 1024 * 1024 // 100MB
|
||||
@@ -83,7 +84,7 @@ whitelist:
|
||||
remote: remote,
|
||||
req: r,
|
||||
startTime: time.Now(),
|
||||
writeChan: make(chan WriteRequest, 100), // Buffered channel for writes
|
||||
writeChan: make(chan publish.WriteRequest, 100), // Buffered channel for writes
|
||||
writeDone: make(chan struct{}),
|
||||
}
|
||||
|
||||
@@ -119,13 +120,6 @@ whitelist:
|
||||
conn.SetReadDeadline(time.Now().Add(DefaultPongWait))
|
||||
return nil
|
||||
})
|
||||
// Set ping handler - extends read deadline when pings are received
|
||||
// Send pong through write channel
|
||||
conn.SetPingHandler(func(msg string) error {
|
||||
conn.SetReadDeadline(time.Now().Add(DefaultPongWait))
|
||||
deadline := time.Now().Add(DefaultWriteTimeout)
|
||||
return listener.WriteControl(websocket.PongMessage, []byte{}, deadline)
|
||||
})
|
||||
// Don't pass cancel to Pinger - it should not be able to cancel the connection context
|
||||
go s.Pinger(ctx, listener, ticker)
|
||||
defer func() {
|
||||
@@ -135,11 +129,6 @@ whitelist:
|
||||
cancel()
|
||||
ticker.Stop()
|
||||
|
||||
// Close write channel to signal worker to exit
|
||||
close(listener.writeChan)
|
||||
// Wait for write worker to finish
|
||||
<-listener.writeDone
|
||||
|
||||
// Cancel all subscriptions for this connection
|
||||
log.D.F("cancelling subscriptions for %s", remote)
|
||||
listener.publishers.Receive(&W{
|
||||
@@ -162,6 +151,11 @@ whitelist:
|
||||
} else {
|
||||
log.D.F("ws connection %s was not authenticated", remote)
|
||||
}
|
||||
|
||||
// Close write channel to signal worker to exit
|
||||
close(listener.writeChan)
|
||||
// Wait for write worker to finish
|
||||
<-listener.writeDone
|
||||
}()
|
||||
for {
|
||||
select {
|
||||
@@ -191,97 +185,25 @@ whitelist:
|
||||
typ, msg, err = conn.ReadMessage()
|
||||
|
||||
if err != nil {
|
||||
// Check if the error is due to context cancellation
|
||||
if err == context.Canceled || strings.Contains(err.Error(), "context canceled") {
|
||||
log.T.F("connection from %s cancelled (context done): %v", remote, err)
|
||||
return
|
||||
}
|
||||
if strings.Contains(
|
||||
err.Error(), "use of closed network connection",
|
||||
if websocket.IsUnexpectedCloseError(
|
||||
err,
|
||||
websocket.CloseNormalClosure, // 1000
|
||||
websocket.CloseGoingAway, // 1001
|
||||
websocket.CloseNoStatusReceived, // 1005
|
||||
websocket.CloseAbnormalClosure, // 1006
|
||||
4537, // some client seems to send many of these
|
||||
) {
|
||||
return
|
||||
}
|
||||
// Handle EOF errors gracefully - these occur when client closes connection
|
||||
// or sends incomplete/malformed WebSocket frames
|
||||
if strings.Contains(err.Error(), "EOF") ||
|
||||
strings.Contains(err.Error(), "failed to read frame header") {
|
||||
log.T.F("connection from %s closed: %v", remote, err)
|
||||
return
|
||||
}
|
||||
// Handle timeout errors specifically - these can occur on idle connections
|
||||
// but pongs should extend the deadline, so a timeout usually means dead connection
|
||||
if strings.Contains(err.Error(), "timeout") || strings.Contains(err.Error(), "deadline exceeded") {
|
||||
log.T.F("connection from %s read timeout (likely dead connection): %v", remote, err)
|
||||
return
|
||||
}
|
||||
// Handle message too big errors specifically
|
||||
if strings.Contains(err.Error(), "message too large") ||
|
||||
strings.Contains(err.Error(), "read limited at") {
|
||||
log.D.F("client %s hit message size limit: %v", remote, err)
|
||||
// Don't log this as an error since it's a client-side limit
|
||||
// Just close the connection gracefully
|
||||
return
|
||||
}
|
||||
// Check for websocket close errors
|
||||
if websocket.IsCloseError(err, websocket.CloseNormalClosure,
|
||||
websocket.CloseGoingAway,
|
||||
websocket.CloseNoStatusReceived,
|
||||
websocket.CloseAbnormalClosure,
|
||||
websocket.CloseUnsupportedData,
|
||||
websocket.CloseInvalidFramePayloadData) {
|
||||
log.T.F("connection from %s closed: %v", remote, err)
|
||||
} else if websocket.IsCloseError(err, websocket.CloseMessageTooBig) {
|
||||
log.D.F("client %s sent message too big: %v", remote, err)
|
||||
} else {
|
||||
log.E.F("unexpected close error from %s: %v", remote, err)
|
||||
log.I.F("websocket connection closed from %s: %v", remote, err)
|
||||
}
|
||||
cancel() // Cancel context like khatru does
|
||||
return
|
||||
}
|
||||
if typ == websocket.PingMessage {
|
||||
log.D.F("received PING from %s, sending PONG", remote)
|
||||
// Send pong through write channel
|
||||
deadline := time.Now().Add(DefaultWriteTimeout)
|
||||
pongStart := time.Now()
|
||||
if err = listener.WriteControl(websocket.PongMessage, msg, deadline); err != nil {
|
||||
pongDuration := time.Since(pongStart)
|
||||
|
||||
// Check if this is a timeout vs a connection error
|
||||
isTimeout := strings.Contains(err.Error(), "timeout") || strings.Contains(err.Error(), "deadline exceeded")
|
||||
isConnectionError := strings.Contains(err.Error(), "use of closed network connection") ||
|
||||
strings.Contains(err.Error(), "broken pipe") ||
|
||||
strings.Contains(err.Error(), "connection reset") ||
|
||||
websocket.IsCloseError(err, websocket.CloseAbnormalClosure,
|
||||
websocket.CloseGoingAway,
|
||||
websocket.CloseNoStatusReceived)
|
||||
|
||||
if isConnectionError {
|
||||
log.E.F(
|
||||
"failed to send PONG to %s after %v (connection error): %v", remote,
|
||||
pongDuration, err,
|
||||
)
|
||||
return
|
||||
} else if isTimeout {
|
||||
// Timeout on pong - log but don't close immediately
|
||||
// The read deadline will catch dead connections
|
||||
log.W.F(
|
||||
"failed to send PONG to %s after %v (timeout, but connection may still be alive): %v", remote,
|
||||
pongDuration, err,
|
||||
)
|
||||
// Continue - don't close connection on pong timeout
|
||||
} else {
|
||||
// Unknown error - log and continue
|
||||
log.E.F(
|
||||
"failed to send PONG to %s after %v (unknown error): %v", remote,
|
||||
pongDuration, err,
|
||||
)
|
||||
// Continue - don't close on unknown errors
|
||||
}
|
||||
continue
|
||||
}
|
||||
pongDuration := time.Since(pongStart)
|
||||
log.D.F("sent PONG to %s successfully in %v", remote, pongDuration)
|
||||
if pongDuration > time.Millisecond*50 {
|
||||
log.D.F("SLOW PONG to %s: %v (>50ms)", remote, pongDuration)
|
||||
// Send pong directly (like khatru does)
|
||||
if err = conn.WriteMessage(websocket.PongMessage, nil); err != nil {
|
||||
log.E.F("failed to send PONG to %s: %v", remote, err)
|
||||
return
|
||||
}
|
||||
continue
|
||||
}
|
||||
@@ -300,68 +222,25 @@ func (s *Server) Pinger(
|
||||
defer func() {
|
||||
log.D.F("pinger shutting down")
|
||||
ticker.Stop()
|
||||
// DO NOT call cancel here - the pinger should not be able to cancel the connection context
|
||||
// The connection handler will cancel the context when the connection is actually closing
|
||||
}()
|
||||
var err error
|
||||
pingCount := 0
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
pingCount++
|
||||
log.D.F("sending PING #%d", pingCount)
|
||||
|
||||
// Send ping through write channel
|
||||
deadline := time.Now().Add(DefaultWriteTimeout)
|
||||
pingStart := time.Now()
|
||||
|
||||
if err = listener.WriteControl(websocket.PingMessage, []byte{}, deadline); err != nil {
|
||||
pingDuration := time.Since(pingStart)
|
||||
|
||||
// Check if this is a timeout vs a connection error
|
||||
isTimeout := strings.Contains(err.Error(), "timeout") || strings.Contains(err.Error(), "deadline exceeded")
|
||||
isConnectionError := strings.Contains(err.Error(), "use of closed network connection") ||
|
||||
strings.Contains(err.Error(), "broken pipe") ||
|
||||
strings.Contains(err.Error(), "connection reset") ||
|
||||
websocket.IsCloseError(err, websocket.CloseAbnormalClosure,
|
||||
websocket.CloseGoingAway,
|
||||
websocket.CloseNoStatusReceived)
|
||||
|
||||
if isConnectionError {
|
||||
log.E.F(
|
||||
"PING #%d FAILED after %v (connection error): %v", pingCount, pingDuration,
|
||||
err,
|
||||
)
|
||||
chk.E(err)
|
||||
return
|
||||
} else if isTimeout {
|
||||
// Timeout on ping - log but don't stop pinger immediately
|
||||
// The read deadline will catch dead connections
|
||||
log.W.F(
|
||||
"PING #%d timeout after %v (connection may still be alive): %v", pingCount, pingDuration,
|
||||
err,
|
||||
)
|
||||
// Continue - don't stop pinger on timeout
|
||||
} else {
|
||||
// Unknown error - log and continue
|
||||
log.E.F(
|
||||
"PING #%d FAILED after %v (unknown error): %v", pingCount, pingDuration,
|
||||
err,
|
||||
)
|
||||
// Continue - don't stop pinger on unknown errors
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
pingDuration := time.Since(pingStart)
|
||||
log.D.F("PING #%d sent successfully in %v", pingCount, pingDuration)
|
||||
|
||||
if pingDuration > time.Millisecond*100 {
|
||||
log.D.F("SLOW PING #%d: %v (>100ms)", pingCount, pingDuration)
|
||||
}
|
||||
case <-ctx.Done():
|
||||
log.T.F("pinger context cancelled after %d pings", pingCount)
|
||||
return
|
||||
case <-ticker.C:
|
||||
pingCount++
|
||||
// Send ping request through write channel - this allows pings to interrupt other writes
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case listener.writeChan <- publish.WriteRequest{IsPing: true, MsgType: pingCount}:
|
||||
// Ping request queued successfully
|
||||
case <-time.After(DefaultWriteTimeout):
|
||||
log.E.F("ping #%d channel timeout - connection may be overloaded", pingCount)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
154
app/listener.go
154
app/listener.go
@@ -18,9 +18,6 @@ import (
|
||||
"next.orly.dev/pkg/utils/atomic"
|
||||
)
|
||||
|
||||
// WriteRequest represents a write operation to be performed by the write worker
|
||||
type WriteRequest = publish.WriteRequest
|
||||
|
||||
type Listener struct {
|
||||
*Server
|
||||
conn *websocket.Conn
|
||||
@@ -32,7 +29,7 @@ type Listener struct {
|
||||
startTime time.Time
|
||||
isBlacklisted bool // Marker to identify blacklisted IPs
|
||||
blacklistTimeout time.Time // When to timeout blacklisted connections
|
||||
writeChan chan WriteRequest // Channel for write requests
|
||||
writeChan chan publish.WriteRequest // Channel for write requests (back to queued approach)
|
||||
writeDone chan struct{} // Closed when write worker exits
|
||||
// Diagnostics: per-connection counters
|
||||
msgCount int
|
||||
@@ -46,92 +43,13 @@ func (l *Listener) Ctx() context.Context {
|
||||
return l.ctx
|
||||
}
|
||||
|
||||
// writeWorker is the single goroutine that handles all writes to the websocket connection.
|
||||
// This serializes all writes to prevent concurrent write panics.
|
||||
func (l *Listener) writeWorker() {
|
||||
var channelClosed bool
|
||||
defer func() {
|
||||
// Only unregister write channel if connection is actually dead/closing
|
||||
// Unregister if:
|
||||
// 1. Context is cancelled (connection closing)
|
||||
// 2. Channel was closed (connection closing)
|
||||
// 3. Connection error occurred (already handled inline)
|
||||
if l.ctx.Err() != nil || channelClosed {
|
||||
// Connection is closing - safe to unregister
|
||||
if socketPub := l.publishers.GetSocketPublisher(); socketPub != nil {
|
||||
log.D.F("ws->%s write worker: unregistering write channel (connection closing)", l.remote)
|
||||
socketPub.SetWriteChan(l.conn, nil)
|
||||
}
|
||||
} else {
|
||||
// Exiting for other reasons (timeout, etc.) but connection may still be alive
|
||||
// Don't unregister - let the connection cleanup handle it
|
||||
log.D.F("ws->%s write worker: exiting but connection may still be alive, keeping write channel registered", l.remote)
|
||||
}
|
||||
close(l.writeDone)
|
||||
}()
|
||||
for {
|
||||
select {
|
||||
case <-l.ctx.Done():
|
||||
// Context cancelled - connection is closing
|
||||
log.D.F("ws->%s write worker: context cancelled, exiting", l.remote)
|
||||
return
|
||||
case req, ok := <-l.writeChan:
|
||||
if !ok {
|
||||
// Channel closed - connection is closing
|
||||
channelClosed = true
|
||||
log.D.F("ws->%s write worker: write channel closed, exiting", l.remote)
|
||||
return
|
||||
}
|
||||
deadline := req.Deadline
|
||||
if deadline.IsZero() {
|
||||
deadline = time.Now().Add(DefaultWriteTimeout)
|
||||
}
|
||||
l.conn.SetWriteDeadline(deadline)
|
||||
writeStart := time.Now()
|
||||
var err error
|
||||
if req.IsControl {
|
||||
err = l.conn.WriteControl(req.MsgType, req.Data, deadline)
|
||||
} else {
|
||||
err = l.conn.WriteMessage(req.MsgType, req.Data)
|
||||
}
|
||||
if err != nil {
|
||||
writeDuration := time.Since(writeStart)
|
||||
log.E.F("ws->%s write worker FAILED: len=%d duration=%v error=%v",
|
||||
l.remote, len(req.Data), writeDuration, err)
|
||||
// Check for connection errors - if so, stop the worker
|
||||
isConnectionError := strings.Contains(err.Error(), "use of closed network connection") ||
|
||||
strings.Contains(err.Error(), "broken pipe") ||
|
||||
strings.Contains(err.Error(), "connection reset") ||
|
||||
websocket.IsCloseError(err, websocket.CloseAbnormalClosure,
|
||||
websocket.CloseGoingAway,
|
||||
websocket.CloseNoStatusReceived)
|
||||
if isConnectionError {
|
||||
// Connection is dead - unregister channel immediately
|
||||
log.D.F("ws->%s write worker: connection error detected, unregistering write channel", l.remote)
|
||||
if socketPub := l.publishers.GetSocketPublisher(); socketPub != nil {
|
||||
socketPub.SetWriteChan(l.conn, nil)
|
||||
}
|
||||
return
|
||||
}
|
||||
// Continue for other errors (timeouts, etc.) - connection may still be alive
|
||||
log.D.F("ws->%s write worker: non-fatal error (timeout?), continuing", l.remote)
|
||||
} else {
|
||||
writeDuration := time.Since(writeStart)
|
||||
if writeDuration > time.Millisecond*100 {
|
||||
log.D.F("ws->%s write worker SLOW: len=%d duration=%v",
|
||||
l.remote, len(req.Data), writeDuration)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (l *Listener) Write(p []byte) (n int, err error) {
|
||||
// Send write request to channel - non-blocking with timeout
|
||||
select {
|
||||
case <-l.ctx.Done():
|
||||
return 0, l.ctx.Err()
|
||||
case l.writeChan <- WriteRequest{Data: p, MsgType: websocket.TextMessage, IsControl: false}:
|
||||
case l.writeChan <- publish.WriteRequest{Data: p, MsgType: websocket.TextMessage, IsControl: false}:
|
||||
return len(p), nil
|
||||
case <-time.After(DefaultWriteTimeout):
|
||||
log.E.F("ws->%s write channel timeout", l.remote)
|
||||
@@ -144,7 +62,7 @@ func (l *Listener) WriteControl(messageType int, data []byte, deadline time.Time
|
||||
select {
|
||||
case <-l.ctx.Done():
|
||||
return l.ctx.Err()
|
||||
case l.writeChan <- WriteRequest{Data: data, MsgType: messageType, IsControl: true, Deadline: deadline}:
|
||||
case l.writeChan <- publish.WriteRequest{Data: data, MsgType: messageType, IsControl: true, Deadline: deadline}:
|
||||
return nil
|
||||
case <-time.After(DefaultWriteTimeout):
|
||||
log.E.F("ws->%s writeControl channel timeout", l.remote)
|
||||
@@ -152,6 +70,72 @@ func (l *Listener) WriteControl(messageType int, data []byte, deadline time.Time
|
||||
}
|
||||
}
|
||||
|
||||
// writeWorker is the single goroutine that handles all writes to the websocket connection.
|
||||
// This serializes all writes to prevent concurrent write panics and allows pings to interrupt writes.
|
||||
func (l *Listener) writeWorker() {
|
||||
defer func() {
|
||||
// Only unregister write channel if connection is actually dead/closing
|
||||
// Unregister if:
|
||||
// 1. Context is cancelled (connection closing)
|
||||
// 2. Channel was closed (connection closing)
|
||||
// 3. Connection error occurred (already handled inline)
|
||||
if l.ctx.Err() != nil {
|
||||
// Connection is closing - safe to unregister
|
||||
if socketPub := l.publishers.GetSocketPublisher(); socketPub != nil {
|
||||
log.D.F("ws->%s write worker: unregistering write channel (connection closing)", l.remote)
|
||||
socketPub.SetWriteChan(l.conn, nil)
|
||||
}
|
||||
} else {
|
||||
// Exiting for other reasons (timeout, etc.) but connection may still be valid
|
||||
log.D.F("ws->%s write worker exiting unexpectedly", l.remote)
|
||||
}
|
||||
close(l.writeDone)
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-l.ctx.Done():
|
||||
log.D.F("ws->%s write worker context cancelled", l.remote)
|
||||
return
|
||||
case req, ok := <-l.writeChan:
|
||||
if !ok {
|
||||
log.D.F("ws->%s write channel closed", l.remote)
|
||||
return
|
||||
}
|
||||
|
||||
// Handle the write request
|
||||
var err error
|
||||
if req.IsPing {
|
||||
// Special handling for ping messages
|
||||
log.D.F("sending PING #%d", req.MsgType)
|
||||
deadline := time.Now().Add(DefaultWriteTimeout)
|
||||
err = l.conn.WriteControl(websocket.PingMessage, nil, deadline)
|
||||
if err != nil {
|
||||
if !strings.HasSuffix(err.Error(), "use of closed network connection") {
|
||||
log.E.F("error writing ping: %v; closing websocket", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
} else if req.IsControl {
|
||||
// Control message
|
||||
err = l.conn.WriteControl(req.MsgType, req.Data, req.Deadline)
|
||||
if err != nil {
|
||||
log.E.F("ws->%s control write failed: %v", l.remote, err)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
// Regular message
|
||||
l.conn.SetWriteDeadline(time.Now().Add(DefaultWriteTimeout))
|
||||
err = l.conn.WriteMessage(req.MsgType, req.Data)
|
||||
if err != nil {
|
||||
log.E.F("ws->%s write failed: %v", l.remote, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// getManagedACL returns the managed ACL instance if available
|
||||
func (l *Listener) getManagedACL() *database.ManagedACL {
|
||||
// Get the managed ACL instance from the ACL registry
|
||||
|
||||
22
app/main.go
22
app/main.go
@@ -20,6 +20,7 @@ import (
|
||||
"next.orly.dev/pkg/policy"
|
||||
"next.orly.dev/pkg/protocol/publish"
|
||||
"next.orly.dev/pkg/spider"
|
||||
dsync "next.orly.dev/pkg/sync"
|
||||
)
|
||||
|
||||
func Run(
|
||||
@@ -116,6 +117,27 @@ func Run(
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize sync manager if relay peers are configured
|
||||
if len(cfg.RelayPeers) > 0 {
|
||||
// Get relay identity for node ID
|
||||
sk, err := db.GetOrCreateRelayIdentitySecret()
|
||||
if err != nil {
|
||||
log.E.F("failed to get relay identity for sync: %v", err)
|
||||
} else {
|
||||
nodeID, err := keys.SecretBytesToPubKeyHex(sk)
|
||||
if err != nil {
|
||||
log.E.F("failed to derive pubkey for sync node ID: %v", err)
|
||||
} else {
|
||||
relayURL := cfg.RelayURL
|
||||
if relayURL == "" {
|
||||
relayURL = fmt.Sprintf("http://localhost:%d", cfg.Port)
|
||||
}
|
||||
l.syncManager = dsync.NewManager(ctx, db, nodeID, relayURL, cfg.RelayPeers)
|
||||
log.I.F("distributed sync manager initialized with %d peers", len(cfg.RelayPeers))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize the user interface
|
||||
l.UserInterface()
|
||||
|
||||
|
||||
@@ -23,6 +23,9 @@ import (
|
||||
|
||||
const Type = "socketapi"
|
||||
|
||||
// WriteChanMap maps websocket connections to their write channels
|
||||
type WriteChanMap map[*websocket.Conn]chan publish.WriteRequest
|
||||
|
||||
type Subscription struct {
|
||||
remote string
|
||||
AuthedPubkey []byte
|
||||
@@ -33,9 +36,6 @@ type Subscription struct {
|
||||
// connections.
|
||||
type Map map[*websocket.Conn]map[string]Subscription
|
||||
|
||||
// WriteChanMap maps websocket connections to their write channels
|
||||
type WriteChanMap map[*websocket.Conn]chan<- publish.WriteRequest
|
||||
|
||||
type W struct {
|
||||
*websocket.Conn
|
||||
|
||||
@@ -88,25 +88,6 @@ func NewPublisher(c context.Context) (publisher *P) {
|
||||
|
||||
func (p *P) Type() (typeName string) { return Type }
|
||||
|
||||
// SetWriteChan stores the write channel for a websocket connection
|
||||
// If writeChan is nil, the entry is removed from the map
|
||||
func (p *P) SetWriteChan(conn *websocket.Conn, writeChan chan<- publish.WriteRequest) {
|
||||
p.Mx.Lock()
|
||||
defer p.Mx.Unlock()
|
||||
if writeChan == nil {
|
||||
delete(p.WriteChans, conn)
|
||||
} else {
|
||||
p.WriteChans[conn] = writeChan
|
||||
}
|
||||
}
|
||||
|
||||
// GetWriteChan returns the write channel for a websocket connection
|
||||
func (p *P) GetWriteChan(conn *websocket.Conn) (chan<- publish.WriteRequest, bool) {
|
||||
p.Mx.RLock()
|
||||
defer p.Mx.RUnlock()
|
||||
ch, ok := p.WriteChans[conn]
|
||||
return ch, ok
|
||||
}
|
||||
|
||||
// Receive handles incoming messages to manage websocket listener subscriptions
|
||||
// and associated filters.
|
||||
@@ -319,14 +300,14 @@ func (p *P) Deliver(ev *event.E) {
|
||||
log.D.F("subscription delivery QUEUED: event=%s to=%s sub=%s len=%d",
|
||||
hex.Enc(ev.ID), d.sub.remote, d.id, len(msgData))
|
||||
case <-time.After(DefaultWriteTimeout):
|
||||
log.E.F("subscription delivery TIMEOUT: event=%s to=%s sub=%s (write channel full)",
|
||||
log.E.F("subscription delivery TIMEOUT: event=%s to=%s sub=%s",
|
||||
hex.Enc(ev.ID), d.sub.remote, d.id)
|
||||
// Check if connection is still valid
|
||||
p.Mx.RLock()
|
||||
stillSubscribed = p.Map[d.w] != nil
|
||||
p.Mx.RUnlock()
|
||||
if !stillSubscribed {
|
||||
log.D.F("removing failed subscriber connection due to channel timeout: %s", d.sub.remote)
|
||||
log.D.F("removing failed subscriber connection: %s", d.sub.remote)
|
||||
p.removeSubscriber(d.w)
|
||||
}
|
||||
}
|
||||
@@ -352,6 +333,26 @@ func (p *P) removeSubscriberId(ws *websocket.Conn, id string) {
|
||||
}
|
||||
}
|
||||
|
||||
// SetWriteChan stores the write channel for a websocket connection
|
||||
// If writeChan is nil, the entry is removed from the map
|
||||
func (p *P) SetWriteChan(conn *websocket.Conn, writeChan chan publish.WriteRequest) {
|
||||
p.Mx.Lock()
|
||||
defer p.Mx.Unlock()
|
||||
if writeChan == nil {
|
||||
delete(p.WriteChans, conn)
|
||||
} else {
|
||||
p.WriteChans[conn] = writeChan
|
||||
}
|
||||
}
|
||||
|
||||
// GetWriteChan returns the write channel for a websocket connection
|
||||
func (p *P) GetWriteChan(conn *websocket.Conn) (chan publish.WriteRequest, bool) {
|
||||
p.Mx.RLock()
|
||||
defer p.Mx.RUnlock()
|
||||
ch, ok := p.WriteChans[conn]
|
||||
return ch, ok
|
||||
}
|
||||
|
||||
// removeSubscriber removes a websocket from the P collection.
|
||||
func (p *P) removeSubscriber(ws *websocket.Conn) {
|
||||
p.Mx.Lock()
|
||||
|
||||
@@ -27,6 +27,7 @@ import (
|
||||
"next.orly.dev/pkg/protocol/httpauth"
|
||||
"next.orly.dev/pkg/protocol/publish"
|
||||
"next.orly.dev/pkg/spider"
|
||||
dsync "next.orly.dev/pkg/sync"
|
||||
blossom "next.orly.dev/pkg/blossom"
|
||||
)
|
||||
|
||||
@@ -50,6 +51,7 @@ type Server struct {
|
||||
sprocketManager *SprocketManager
|
||||
policyManager *policy.P
|
||||
spiderManager *spider.Spider
|
||||
syncManager *dsync.Manager
|
||||
blossomServer *blossom.Server
|
||||
}
|
||||
|
||||
@@ -243,7 +245,14 @@ func (s *Server) UserInterface() {
|
||||
s.mux.HandleFunc("/api/nip86", s.handleNIP86Management)
|
||||
// ACL mode endpoint
|
||||
s.mux.HandleFunc("/api/acl-mode", s.handleACLMode)
|
||||
|
||||
|
||||
// Sync endpoints for distributed synchronization
|
||||
if s.syncManager != nil {
|
||||
s.mux.HandleFunc("/api/sync/current", s.handleSyncCurrent)
|
||||
s.mux.HandleFunc("/api/sync/fetch", s.handleSyncFetch)
|
||||
log.Printf("Distributed sync API enabled at /api/sync")
|
||||
}
|
||||
|
||||
// Blossom blob storage API endpoint
|
||||
if s.blossomServer != nil {
|
||||
s.mux.HandleFunc("/blossom/", s.blossomHandler)
|
||||
@@ -990,3 +999,70 @@ func (s *Server) handleACLMode(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
w.Write(jsonData)
|
||||
}
|
||||
|
||||
// handleSyncCurrent handles requests for the current serial number
|
||||
func (s *Server) handleSyncCurrent(w http.ResponseWriter, r *http.Request) {
|
||||
if s.syncManager == nil {
|
||||
http.Error(w, "Sync manager not initialized", http.StatusServiceUnavailable)
|
||||
return
|
||||
}
|
||||
|
||||
// Validate NIP-98 authentication and check peer authorization
|
||||
if !s.validatePeerRequest(w, r) {
|
||||
return
|
||||
}
|
||||
|
||||
s.syncManager.HandleCurrentRequest(w, r)
|
||||
}
|
||||
|
||||
// handleSyncFetch handles requests for events in a serial range
|
||||
func (s *Server) handleSyncFetch(w http.ResponseWriter, r *http.Request) {
|
||||
if s.syncManager == nil {
|
||||
http.Error(w, "Sync manager not initialized", http.StatusServiceUnavailable)
|
||||
return
|
||||
}
|
||||
|
||||
// Validate NIP-98 authentication and check peer authorization
|
||||
if !s.validatePeerRequest(w, r) {
|
||||
return
|
||||
}
|
||||
|
||||
s.syncManager.HandleFetchRequest(w, r)
|
||||
}
|
||||
|
||||
// validatePeerRequest validates NIP-98 authentication and checks if the requesting peer is authorized
|
||||
func (s *Server) validatePeerRequest(w http.ResponseWriter, r *http.Request) bool {
|
||||
// Validate NIP-98 authentication
|
||||
valid, pubkey, err := httpauth.CheckAuth(r)
|
||||
if err != nil {
|
||||
log.Printf("NIP-98 auth validation error: %v", err)
|
||||
http.Error(w, "Authentication validation failed", http.StatusUnauthorized)
|
||||
return false
|
||||
}
|
||||
if !valid {
|
||||
http.Error(w, "NIP-98 authentication required", http.StatusUnauthorized)
|
||||
return false
|
||||
}
|
||||
|
||||
// Check if this pubkey corresponds to a configured peer relay
|
||||
peerPubkeyHex := hex.Enc(pubkey)
|
||||
for range s.Config.RelayPeers {
|
||||
// Extract pubkey from peer URL (assuming format: https://relay.example.com@pubkey)
|
||||
// For now, check if the pubkey matches any configured admin/owner
|
||||
// TODO: Implement proper peer identity mapping
|
||||
for _, admin := range s.Admins {
|
||||
if hex.Enc(admin) == peerPubkeyHex {
|
||||
return true
|
||||
}
|
||||
}
|
||||
for _, owner := range s.Owners {
|
||||
if hex.Enc(owner) == peerPubkeyHex {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
log.Printf("Unauthorized sync request from pubkey: %s", peerPubkeyHex)
|
||||
http.Error(w, "Unauthorized peer", http.StatusForbidden)
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -1,86 +1,17 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"io"
|
||||
"os"
|
||||
"runtime/debug"
|
||||
|
||||
"lol.mleku.dev/chk"
|
||||
"lol.mleku.dev/log"
|
||||
"next.orly.dev/pkg/encoders/event"
|
||||
)
|
||||
|
||||
const maxLen = 500000000
|
||||
|
||||
// Import a collection of events in line structured minified JSON format (JSONL).
|
||||
func (d *D) Import(rr io.Reader) {
|
||||
// store to disk so we can return fast
|
||||
tmpPath := os.TempDir() + string(os.PathSeparator) + "orly"
|
||||
os.MkdirAll(tmpPath, 0700)
|
||||
tmp, err := os.CreateTemp(tmpPath, "")
|
||||
if chk.E(err) {
|
||||
return
|
||||
}
|
||||
log.I.F("buffering upload to %s", tmp.Name())
|
||||
if _, err = io.Copy(tmp, rr); chk.E(err) {
|
||||
return
|
||||
}
|
||||
if _, err = tmp.Seek(0, 0); chk.E(err) {
|
||||
return
|
||||
}
|
||||
|
||||
go func() {
|
||||
var err error
|
||||
// Create a scanner to read the buffer line by line
|
||||
scan := bufio.NewScanner(tmp)
|
||||
scanBuf := make([]byte, maxLen)
|
||||
scan.Buffer(scanBuf, maxLen)
|
||||
|
||||
var count, total int
|
||||
for scan.Scan() {
|
||||
select {
|
||||
case <-d.ctx.Done():
|
||||
log.I.F("context closed")
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
b := scan.Bytes()
|
||||
total += len(b) + 1
|
||||
if len(b) < 1 {
|
||||
continue
|
||||
}
|
||||
|
||||
ev := event.New()
|
||||
if _, err = ev.Unmarshal(b); err != nil {
|
||||
// return the pooled buffer on error
|
||||
ev.Free()
|
||||
continue
|
||||
}
|
||||
|
||||
if _, err = d.SaveEvent(d.ctx, ev); err != nil {
|
||||
// return the pooled buffer on error paths too
|
||||
ev.Free()
|
||||
continue
|
||||
}
|
||||
|
||||
// return the pooled buffer after successful save
|
||||
ev.Free()
|
||||
b = nil
|
||||
count++
|
||||
if count%100 == 0 {
|
||||
log.I.F("received %d events", count)
|
||||
debug.FreeOSMemory()
|
||||
}
|
||||
if err := d.ImportEventsFromReader(d.ctx, rr); chk.E(err) {
|
||||
log.E.F("import failed: %v", err)
|
||||
}
|
||||
|
||||
log.I.F("read %d bytes and saved %d events", total, count)
|
||||
err = scan.Err()
|
||||
if chk.E(err) {
|
||||
}
|
||||
|
||||
// Help garbage collection
|
||||
tmp = nil
|
||||
}()
|
||||
}
|
||||
|
||||
101
pkg/database/import_utils.go
Normal file
101
pkg/database/import_utils.go
Normal file
@@ -0,0 +1,101 @@
|
||||
// Package database provides shared import utilities for events
|
||||
package database
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"io"
|
||||
"os"
|
||||
"runtime/debug"
|
||||
"strings"
|
||||
|
||||
"lol.mleku.dev/chk"
|
||||
"lol.mleku.dev/log"
|
||||
"next.orly.dev/pkg/encoders/event"
|
||||
)
|
||||
|
||||
const maxLen = 500000000
|
||||
|
||||
// ImportEventsFromReader imports events from an io.Reader containing JSONL data
|
||||
func (d *D) ImportEventsFromReader(ctx context.Context, rr io.Reader) error {
|
||||
// store to disk so we can return fast
|
||||
tmpPath := os.TempDir() + string(os.PathSeparator) + "orly"
|
||||
os.MkdirAll(tmpPath, 0700)
|
||||
tmp, err := os.CreateTemp(tmpPath, "")
|
||||
if chk.E(err) {
|
||||
return err
|
||||
}
|
||||
defer os.Remove(tmp.Name()) // Clean up temp file when done
|
||||
|
||||
log.I.F("buffering upload to %s", tmp.Name())
|
||||
if _, err = io.Copy(tmp, rr); chk.E(err) {
|
||||
return err
|
||||
}
|
||||
if _, err = tmp.Seek(0, 0); chk.E(err) {
|
||||
return err
|
||||
}
|
||||
|
||||
return d.processJSONLEvents(ctx, tmp)
|
||||
}
|
||||
|
||||
// ImportEventsFromStrings imports events from a slice of JSON strings
|
||||
func (d *D) ImportEventsFromStrings(ctx context.Context, eventJSONs []string) error {
|
||||
// Create a reader from the string slice
|
||||
reader := strings.NewReader(strings.Join(eventJSONs, "\n"))
|
||||
return d.processJSONLEvents(ctx, reader)
|
||||
}
|
||||
|
||||
// processJSONLEvents processes JSONL events from a reader
|
||||
func (d *D) processJSONLEvents(ctx context.Context, rr io.Reader) error {
|
||||
// Create a scanner to read the buffer line by line
|
||||
scan := bufio.NewScanner(rr)
|
||||
scanBuf := make([]byte, maxLen)
|
||||
scan.Buffer(scanBuf, maxLen)
|
||||
|
||||
var count, total int
|
||||
for scan.Scan() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.I.F("context closed")
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
b := scan.Bytes()
|
||||
total += len(b) + 1
|
||||
if len(b) < 1 {
|
||||
continue
|
||||
}
|
||||
|
||||
ev := event.New()
|
||||
if _, err := ev.Unmarshal(b); err != nil {
|
||||
// return the pooled buffer on error
|
||||
ev.Free()
|
||||
log.W.F("failed to unmarshal event: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
if _, err := d.SaveEvent(ctx, ev); err != nil {
|
||||
// return the pooled buffer on error paths too
|
||||
ev.Free()
|
||||
log.W.F("failed to save event: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
// return the pooled buffer after successful save
|
||||
ev.Free()
|
||||
b = nil
|
||||
count++
|
||||
if count%100 == 0 {
|
||||
log.I.F("processed %d events", count)
|
||||
debug.FreeOSMemory()
|
||||
}
|
||||
}
|
||||
|
||||
log.I.F("read %d bytes and saved %d events", total, count)
|
||||
if err := scan.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
1
pkg/protocol/blossom/blossom
Submodule
1
pkg/protocol/blossom/blossom
Submodule
Submodule pkg/protocol/blossom/blossom added at e8d0a1ec44
@@ -15,6 +15,7 @@ type WriteRequest struct {
|
||||
MsgType int
|
||||
IsControl bool
|
||||
Deadline time.Time
|
||||
IsPing bool // Special marker for ping messages
|
||||
}
|
||||
|
||||
// WriteChanSetter defines the interface for setting write channels
|
||||
|
||||
288
pkg/sync/manager.go
Normal file
288
pkg/sync/manager.go
Normal file
@@ -0,0 +1,288 @@
|
||||
package sync
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"lol.mleku.dev/log"
|
||||
"next.orly.dev/pkg/database"
|
||||
)
|
||||
|
||||
// Manager handles distributed synchronization between relay peers using serial numbers as clocks
|
||||
type Manager struct {
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
db *database.D
|
||||
nodeID string
|
||||
relayURL string
|
||||
peers []string
|
||||
currentSerial uint64
|
||||
peerSerials map[string]uint64 // peer URL -> latest serial seen
|
||||
mutex sync.RWMutex
|
||||
}
|
||||
|
||||
// CurrentRequest represents a request for the current serial number
|
||||
type CurrentRequest struct {
|
||||
NodeID string `json:"node_id"`
|
||||
RelayURL string `json:"relay_url"`
|
||||
}
|
||||
|
||||
// CurrentResponse returns the current serial number
|
||||
type CurrentResponse struct {
|
||||
NodeID string `json:"node_id"`
|
||||
RelayURL string `json:"relay_url"`
|
||||
Serial uint64 `json:"serial"`
|
||||
}
|
||||
|
||||
// FetchRequest represents a request for events in a serial range
|
||||
type FetchRequest struct {
|
||||
NodeID string `json:"node_id"`
|
||||
RelayURL string `json:"relay_url"`
|
||||
From uint64 `json:"from"`
|
||||
To uint64 `json:"to"`
|
||||
}
|
||||
|
||||
// FetchResponse contains the requested events as JSONL
|
||||
type FetchResponse struct {
|
||||
Events []string `json:"events"` // JSONL formatted events
|
||||
}
|
||||
|
||||
// NewManager creates a new sync manager
|
||||
func NewManager(ctx context.Context, db *database.D, nodeID, relayURL string, peers []string) *Manager {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
|
||||
m := &Manager{
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
db: db,
|
||||
nodeID: nodeID,
|
||||
relayURL: relayURL,
|
||||
peers: peers,
|
||||
currentSerial: 0,
|
||||
peerSerials: make(map[string]uint64),
|
||||
}
|
||||
|
||||
// Start sync routine
|
||||
go m.syncRoutine()
|
||||
|
||||
return m
|
||||
}
|
||||
|
||||
// Stop stops the sync manager
|
||||
func (m *Manager) Stop() {
|
||||
m.cancel()
|
||||
}
|
||||
|
||||
// GetCurrentSerial returns the current serial number
|
||||
func (m *Manager) GetCurrentSerial() uint64 {
|
||||
m.mutex.RLock()
|
||||
defer m.mutex.RUnlock()
|
||||
return m.currentSerial
|
||||
}
|
||||
|
||||
// UpdateSerial updates the current serial number when a new event is stored
|
||||
func (m *Manager) UpdateSerial() {
|
||||
m.mutex.Lock()
|
||||
defer m.mutex.Unlock()
|
||||
|
||||
// Get the latest serial from database
|
||||
if latest, err := m.getLatestSerial(); err == nil {
|
||||
m.currentSerial = latest
|
||||
}
|
||||
}
|
||||
|
||||
// getLatestSerial gets the latest serial number from the database
|
||||
func (m *Manager) getLatestSerial() (uint64, error) {
|
||||
// This is a simplified implementation
|
||||
// In practice, you'd want to track the highest serial number
|
||||
// For now, return the current serial
|
||||
return m.currentSerial, nil
|
||||
}
|
||||
|
||||
// syncRoutine periodically syncs with peers
|
||||
func (m *Manager) syncRoutine() {
|
||||
ticker := time.NewTicker(5 * time.Second) // Sync every 5 seconds
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-m.ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
m.syncWithPeers()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// syncWithPeers syncs with all configured peers
|
||||
func (m *Manager) syncWithPeers() {
|
||||
for _, peerURL := range m.peers {
|
||||
go m.syncWithPeer(peerURL)
|
||||
}
|
||||
}
|
||||
|
||||
// syncWithPeer syncs with a specific peer
|
||||
func (m *Manager) syncWithPeer(peerURL string) {
|
||||
// Get the peer's current serial
|
||||
currentReq := CurrentRequest{
|
||||
NodeID: m.nodeID,
|
||||
RelayURL: m.relayURL,
|
||||
}
|
||||
|
||||
jsonData, err := json.Marshal(currentReq)
|
||||
if err != nil {
|
||||
log.E.F("failed to marshal current request: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
resp, err := http.Post(peerURL+"/api/sync/current", "application/json", bytes.NewBuffer(jsonData))
|
||||
if err != nil {
|
||||
log.D.F("failed to get current serial from %s: %v", peerURL, err)
|
||||
return
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
log.D.F("current request failed with %s: status %d", peerURL, resp.StatusCode)
|
||||
return
|
||||
}
|
||||
|
||||
var currentResp CurrentResponse
|
||||
if err := json.NewDecoder(resp.Body).Decode(¤tResp); err != nil {
|
||||
log.E.F("failed to decode current response from %s: %v", peerURL, err)
|
||||
return
|
||||
}
|
||||
|
||||
// Check if we need to sync
|
||||
peerSerial := currentResp.Serial
|
||||
ourLastSeen := m.peerSerials[peerURL]
|
||||
|
||||
if peerSerial > ourLastSeen {
|
||||
// Request missing events
|
||||
m.requestEvents(peerURL, ourLastSeen+1, peerSerial)
|
||||
// Update our knowledge of peer's serial
|
||||
m.mutex.Lock()
|
||||
m.peerSerials[peerURL] = peerSerial
|
||||
m.mutex.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// requestEvents requests a range of events from a peer
|
||||
func (m *Manager) requestEvents(peerURL string, from, to uint64) {
|
||||
req := FetchRequest{
|
||||
NodeID: m.nodeID,
|
||||
RelayURL: m.relayURL,
|
||||
From: from,
|
||||
To: to,
|
||||
}
|
||||
|
||||
jsonData, err := json.Marshal(req)
|
||||
if err != nil {
|
||||
log.E.F("failed to marshal fetch request: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
resp, err := http.Post(peerURL+"/api/sync/fetch", "application/json", bytes.NewBuffer(jsonData))
|
||||
if err != nil {
|
||||
log.E.F("failed to request events from %s: %v", peerURL, err)
|
||||
return
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
log.E.F("fetch request failed with %s: status %d", peerURL, resp.StatusCode)
|
||||
return
|
||||
}
|
||||
|
||||
var fetchResp FetchResponse
|
||||
if err := json.NewDecoder(resp.Body).Decode(&fetchResp); err != nil {
|
||||
log.E.F("failed to decode fetch response from %s: %v", peerURL, err)
|
||||
return
|
||||
}
|
||||
|
||||
// Import the received events
|
||||
if len(fetchResp.Events) > 0 {
|
||||
if err := m.db.ImportEventsFromStrings(context.Background(), fetchResp.Events); err != nil {
|
||||
log.E.F("failed to import events from %s: %v", peerURL, err)
|
||||
return
|
||||
}
|
||||
log.I.F("imported %d events from peer %s", len(fetchResp.Events), peerURL)
|
||||
}
|
||||
}
|
||||
|
||||
// getEventsBySerialRange retrieves events by serial range from the database as JSONL
|
||||
func (m *Manager) getEventsBySerialRange(from, to uint64) ([]string, error) {
|
||||
var events []string
|
||||
|
||||
// Get event serials by serial range
|
||||
serials, err := m.db.EventIdsBySerial(from, int(to-from+1))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// TODO: For each serial, retrieve the actual event and marshal to JSONL
|
||||
// For now, return serial numbers as placeholder JSON strings
|
||||
for _, serial := range serials {
|
||||
// This should be replaced with actual event JSON marshalling
|
||||
events = append(events, `{"serial":`+strconv.FormatUint(serial, 10)+`}`)
|
||||
}
|
||||
|
||||
return events, nil
|
||||
}
|
||||
|
||||
// HandleCurrentRequest handles requests for current serial number
|
||||
func (m *Manager) HandleCurrentRequest(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodPost {
|
||||
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
|
||||
var req CurrentRequest
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
http.Error(w, "Invalid JSON", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
resp := CurrentResponse{
|
||||
NodeID: m.nodeID,
|
||||
RelayURL: m.relayURL,
|
||||
Serial: m.GetCurrentSerial(),
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(resp)
|
||||
}
|
||||
|
||||
// HandleFetchRequest handles requests for events in a serial range
|
||||
func (m *Manager) HandleFetchRequest(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodPost {
|
||||
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
|
||||
var req FetchRequest
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
http.Error(w, "Invalid JSON", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
// Get events in the requested range
|
||||
events, err := m.getEventsBySerialRange(req.From, req.To)
|
||||
if err != nil {
|
||||
http.Error(w, fmt.Sprintf("Failed to get events: %v", err), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
resp := FetchResponse{
|
||||
Events: events,
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(resp)
|
||||
}
|
||||
@@ -1 +1 @@
|
||||
v0.23.2
|
||||
v0.24.0
|
||||
140
workaround_test.go
Normal file
140
workaround_test.go
Normal file
@@ -0,0 +1,140 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
"next.orly.dev/app/config"
|
||||
"next.orly.dev/pkg/run"
|
||||
)
|
||||
|
||||
func TestDumbClientWorkaround(t *testing.T) {
|
||||
var relay *run.Relay
|
||||
var err error
|
||||
|
||||
// Start local relay for testing
|
||||
if relay, _, err = startWorkaroundTestRelay(); err != nil {
|
||||
t.Fatalf("Failed to start test relay: %v", err)
|
||||
}
|
||||
defer func() {
|
||||
if stopErr := relay.Stop(); stopErr != nil {
|
||||
t.Logf("Error stopping relay: %v", stopErr)
|
||||
}
|
||||
}()
|
||||
|
||||
relayURL := "ws://127.0.0.1:3338"
|
||||
|
||||
// Wait for relay to be ready
|
||||
if err = waitForRelay(relayURL, 10*time.Second); err != nil {
|
||||
t.Fatalf("Relay not ready after timeout: %v", err)
|
||||
}
|
||||
|
||||
t.Logf("Relay is ready at %s", relayURL)
|
||||
|
||||
// Test connection with a "dumb" client that doesn't handle ping/pong properly
|
||||
dialer := websocket.Dialer{
|
||||
HandshakeTimeout: 10 * time.Second,
|
||||
}
|
||||
|
||||
conn, _, err := dialer.Dial(relayURL, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to connect: %v", err)
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
t.Logf("Connection established")
|
||||
|
||||
// Simulate a dumb client that sets a short read deadline and doesn't handle ping/pong
|
||||
conn.SetReadDeadline(time.Now().Add(30 * time.Second))
|
||||
|
||||
startTime := time.Now()
|
||||
messageCount := 0
|
||||
|
||||
// The connection should stay alive despite the short client-side deadline
|
||||
// because our workaround sets a 24-hour server-side deadline
|
||||
for time.Since(startTime) < 2*time.Minute {
|
||||
// Extend client deadline every 10 seconds (simulating dumb client behavior)
|
||||
if time.Since(startTime).Seconds() > 10 && int(time.Since(startTime).Seconds())%10 == 0 {
|
||||
conn.SetReadDeadline(time.Now().Add(30 * time.Second))
|
||||
t.Logf("Dumb client extended its own deadline")
|
||||
}
|
||||
|
||||
// Try to read with a short timeout to avoid blocking
|
||||
conn.SetReadDeadline(time.Now().Add(1 * time.Second))
|
||||
msgType, data, err := conn.ReadMessage()
|
||||
conn.SetReadDeadline(time.Now().Add(30 * time.Second)) // Reset
|
||||
|
||||
if err != nil {
|
||||
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
|
||||
// Timeout is expected - just continue
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
continue
|
||||
}
|
||||
if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) {
|
||||
t.Logf("Connection closed normally: %v", err)
|
||||
break
|
||||
}
|
||||
t.Errorf("Unexpected error: %v", err)
|
||||
break
|
||||
}
|
||||
|
||||
messageCount++
|
||||
t.Logf("Received message %d: type=%d, len=%d", messageCount, msgType, len(data))
|
||||
}
|
||||
|
||||
elapsed := time.Since(startTime)
|
||||
if elapsed < 90*time.Second {
|
||||
t.Errorf("Connection died too early after %v (expected at least 90s)", elapsed)
|
||||
} else {
|
||||
t.Logf("Workaround successful: connection lasted %v with %d messages", elapsed, messageCount)
|
||||
}
|
||||
}
|
||||
|
||||
// startWorkaroundTestRelay starts a relay for workaround testing
|
||||
func startWorkaroundTestRelay() (relay *run.Relay, port int, err error) {
|
||||
cfg := &config.C{
|
||||
AppName: "ORLY-WORKAROUND-TEST",
|
||||
DataDir: "",
|
||||
Listen: "127.0.0.1",
|
||||
Port: 3338,
|
||||
HealthPort: 0,
|
||||
EnableShutdown: false,
|
||||
LogLevel: "info",
|
||||
DBLogLevel: "warn",
|
||||
DBBlockCacheMB: 512,
|
||||
DBIndexCacheMB: 256,
|
||||
LogToStdout: false,
|
||||
PprofHTTP: false,
|
||||
ACLMode: "none",
|
||||
AuthRequired: false,
|
||||
AuthToWrite: false,
|
||||
SubscriptionEnabled: false,
|
||||
MonthlyPriceSats: 6000,
|
||||
FollowListFrequency: time.Hour,
|
||||
WebDisableEmbedded: false,
|
||||
SprocketEnabled: false,
|
||||
SpiderMode: "none",
|
||||
PolicyEnabled: false,
|
||||
}
|
||||
|
||||
// Set default data dir if not specified
|
||||
if cfg.DataDir == "" {
|
||||
cfg.DataDir = fmt.Sprintf("/tmp/orly-workaround-test-%d", time.Now().UnixNano())
|
||||
}
|
||||
|
||||
// Create options
|
||||
cleanup := true
|
||||
opts := &run.Options{
|
||||
CleanupDataDir: &cleanup,
|
||||
}
|
||||
|
||||
// Start relay
|
||||
if relay, err = run.Start(cfg, opts); err != nil {
|
||||
return nil, 0, fmt.Errorf("failed to start relay: %w", err)
|
||||
}
|
||||
|
||||
return relay, cfg.Port, nil
|
||||
}
|
||||
Reference in New Issue
Block a user