Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
8609e9dc22
|
|||
|
3cb05a451c
|
|||
|
4e3f391c3f
|
|||
|
9aa1e7fab3
|
|||
|
15e2988222
|
|||
|
95c6082564
|
@@ -31,12 +31,15 @@ type C struct {
|
|||||||
EnableShutdown bool `env:"ORLY_ENABLE_SHUTDOWN" default:"false" usage:"if true, expose /shutdown on the health port to gracefully stop the process (for profiling)"`
|
EnableShutdown bool `env:"ORLY_ENABLE_SHUTDOWN" default:"false" usage:"if true, expose /shutdown on the health port to gracefully stop the process (for profiling)"`
|
||||||
LogLevel string `env:"ORLY_LOG_LEVEL" default:"info" usage:"relay log level: fatal error warn info debug trace"`
|
LogLevel string `env:"ORLY_LOG_LEVEL" default:"info" usage:"relay log level: fatal error warn info debug trace"`
|
||||||
DBLogLevel string `env:"ORLY_DB_LOG_LEVEL" default:"info" usage:"database log level: fatal error warn info debug trace"`
|
DBLogLevel string `env:"ORLY_DB_LOG_LEVEL" default:"info" usage:"database log level: fatal error warn info debug trace"`
|
||||||
|
DBBlockCacheMB int `env:"ORLY_DB_BLOCK_CACHE_MB" default:"512" usage:"Badger block cache size in MB (higher improves read hit ratio)"`
|
||||||
|
DBIndexCacheMB int `env:"ORLY_DB_INDEX_CACHE_MB" default:"256" usage:"Badger index cache size in MB (improves index lookup performance)"`
|
||||||
LogToStdout bool `env:"ORLY_LOG_TO_STDOUT" default:"false" usage:"log to stdout instead of stderr"`
|
LogToStdout bool `env:"ORLY_LOG_TO_STDOUT" default:"false" usage:"log to stdout instead of stderr"`
|
||||||
Pprof string `env:"ORLY_PPROF" usage:"enable pprof in modes: cpu,memory,allocation,heap,block,goroutine,threadcreate,mutex"`
|
Pprof string `env:"ORLY_PPROF" usage:"enable pprof in modes: cpu,memory,allocation,heap,block,goroutine,threadcreate,mutex"`
|
||||||
PprofPath string `env:"ORLY_PPROF_PATH" usage:"optional directory to write pprof profiles into (inside container); default is temporary dir"`
|
PprofPath string `env:"ORLY_PPROF_PATH" usage:"optional directory to write pprof profiles into (inside container); default is temporary dir"`
|
||||||
PprofHTTP bool `env:"ORLY_PPROF_HTTP" default:"false" usage:"if true, expose net/http/pprof on port 6060"`
|
PprofHTTP bool `env:"ORLY_PPROF_HTTP" default:"false" usage:"if true, expose net/http/pprof on port 6060"`
|
||||||
OpenPprofWeb bool `env:"ORLY_OPEN_PPROF_WEB" default:"false" usage:"if true, automatically open the pprof web viewer when profiling is enabled"`
|
OpenPprofWeb bool `env:"ORLY_OPEN_PPROF_WEB" default:"false" usage:"if true, automatically open the pprof web viewer when profiling is enabled"`
|
||||||
IPWhitelist []string `env:"ORLY_IP_WHITELIST" usage:"comma-separated list of IP addresses to allow access from, matches on prefixes to allow private subnets, eg 10.0.0 = 10.0.0.0/8"`
|
IPWhitelist []string `env:"ORLY_IP_WHITELIST" usage:"comma-separated list of IP addresses to allow access from, matches on prefixes to allow private subnets, eg 10.0.0 = 10.0.0.0/8"`
|
||||||
|
IPBlacklist []string `env:"ORLY_IP_BLACKLIST" usage:"comma-separated list of IP addresses to block; matches on prefixes to allow subnets, e.g. 192.168 = 192.168.0.0/16"`
|
||||||
Admins []string `env:"ORLY_ADMINS" usage:"comma-separated list of admin npubs"`
|
Admins []string `env:"ORLY_ADMINS" usage:"comma-separated list of admin npubs"`
|
||||||
Owners []string `env:"ORLY_OWNERS" usage:"comma-separated list of owner npubs, who have full control of the relay for wipe and restart and other functions"`
|
Owners []string `env:"ORLY_OWNERS" usage:"comma-separated list of owner npubs, who have full control of the relay for wipe and restart and other functions"`
|
||||||
ACLMode string `env:"ORLY_ACL_MODE" usage:"ACL mode: follows, managed (nip-86), none" default:"none"`
|
ACLMode string `env:"ORLY_ACL_MODE" usage:"ACL mode: follows, managed (nip-86), none" default:"none"`
|
||||||
@@ -48,6 +51,8 @@ type C struct {
|
|||||||
SubscriptionEnabled bool `env:"ORLY_SUBSCRIPTION_ENABLED" default:"false" usage:"enable subscription-based access control requiring payment for non-directory events"`
|
SubscriptionEnabled bool `env:"ORLY_SUBSCRIPTION_ENABLED" default:"false" usage:"enable subscription-based access control requiring payment for non-directory events"`
|
||||||
MonthlyPriceSats int64 `env:"ORLY_MONTHLY_PRICE_SATS" default:"6000" usage:"price in satoshis for one month subscription (default ~$2 USD)"`
|
MonthlyPriceSats int64 `env:"ORLY_MONTHLY_PRICE_SATS" default:"6000" usage:"price in satoshis for one month subscription (default ~$2 USD)"`
|
||||||
RelayURL string `env:"ORLY_RELAY_URL" usage:"base URL for the relay dashboard (e.g., https://relay.example.com)"`
|
RelayURL string `env:"ORLY_RELAY_URL" usage:"base URL for the relay dashboard (e.g., https://relay.example.com)"`
|
||||||
|
RelayAddresses []string `env:"ORLY_RELAY_ADDRESSES" usage:"comma-separated list of websocket addresses for this relay (e.g., wss://relay.example.com,wss://backup.example.com) - used by spider to avoid self-connections"`
|
||||||
|
FollowListFrequency time.Duration `env:"ORLY_FOLLOW_LIST_FREQUENCY" usage:"how often to fetch admin follow lists (default: 1h)" default:"1h"`
|
||||||
|
|
||||||
// Web UI and dev mode settings
|
// Web UI and dev mode settings
|
||||||
WebDisableEmbedded bool `env:"ORLY_WEB_DISABLE" default:"false" usage:"disable serving the embedded web UI; useful for hot-reload during development"`
|
WebDisableEmbedded bool `env:"ORLY_WEB_DISABLE" default:"false" usage:"disable serving the embedded web UI; useful for hot-reload during development"`
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ package app
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
"lol.mleku.dev/chk"
|
"lol.mleku.dev/chk"
|
||||||
"lol.mleku.dev/log"
|
"lol.mleku.dev/log"
|
||||||
@@ -15,6 +16,19 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func (l *Listener) HandleMessage(msg []byte, remote string) {
|
func (l *Listener) HandleMessage(msg []byte, remote string) {
|
||||||
|
// Handle blacklisted IPs - discard messages but keep connection open until timeout
|
||||||
|
if l.isBlacklisted {
|
||||||
|
// Check if timeout has been reached
|
||||||
|
if time.Now().After(l.blacklistTimeout) {
|
||||||
|
log.W.F("blacklisted IP %s timeout reached, closing connection", remote)
|
||||||
|
// Close the connection by cancelling the context
|
||||||
|
// The websocket handler will detect this and close the connection
|
||||||
|
return
|
||||||
|
}
|
||||||
|
log.D.F("discarding message from blacklisted IP %s (timeout in %v)", remote, time.Until(l.blacklistTimeout))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
msgPreview := string(msg)
|
msgPreview := string(msg)
|
||||||
if len(msgPreview) > 150 {
|
if len(msgPreview) > 150 {
|
||||||
msgPreview = msgPreview[:150] + "..."
|
msgPreview = msgPreview[:150] + "..."
|
||||||
|
|||||||
@@ -37,6 +37,7 @@ func (l *Listener) HandleReq(msg []byte) (err error) {
|
|||||||
if _, err = env.Unmarshal(msg); chk.E(err) {
|
if _, err = env.Unmarshal(msg); chk.E(err) {
|
||||||
return normalize.Error.Errorf(err.Error())
|
return normalize.Error.Errorf(err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
log.T.C(
|
log.T.C(
|
||||||
func() string {
|
func() string {
|
||||||
return fmt.Sprintf(
|
return fmt.Sprintf(
|
||||||
@@ -131,16 +132,18 @@ func (l *Listener) HandleReq(msg []byte) (err error) {
|
|||||||
)
|
)
|
||||||
|
|
||||||
// Process large author lists by breaking them into chunks
|
// Process large author lists by breaking them into chunks
|
||||||
if f.Authors != nil && f.Authors.Len() > 50 {
|
if f.Authors != nil && f.Authors.Len() > 1000 {
|
||||||
log.W.F("REQ %s: breaking down large author list (%d authors) into chunks", env.Subscription, f.Authors.Len())
|
log.W.F("REQ %s: breaking down large author list (%d authors) into chunks", env.Subscription, f.Authors.Len())
|
||||||
|
|
||||||
// Calculate chunk size based on kinds to avoid OOM
|
// Calculate chunk size to stay under message size limits
|
||||||
chunkSize := 50
|
// Each pubkey is 64 hex chars, plus JSON overhead, so ~100 bytes per author
|
||||||
|
// Target ~50MB per chunk to stay well under 100MB limit
|
||||||
|
chunkSize := ClientMessageSizeLimit / 200 // ~500KB per chunk
|
||||||
if f.Kinds != nil && f.Kinds.Len() > 0 {
|
if f.Kinds != nil && f.Kinds.Len() > 0 {
|
||||||
// Reduce chunk size if there are multiple kinds to prevent too many index ranges
|
// Reduce chunk size if there are multiple kinds to prevent too many index ranges
|
||||||
chunkSize = 50 / f.Kinds.Len()
|
chunkSize = chunkSize / f.Kinds.Len()
|
||||||
if chunkSize < 10 {
|
if chunkSize < 100 {
|
||||||
chunkSize = 10 // Minimum chunk size
|
chunkSize = 100 // Minimum chunk size
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -21,6 +21,9 @@ const (
|
|||||||
DefaultPingWait = DefaultPongWait / 2
|
DefaultPingWait = DefaultPongWait / 2
|
||||||
DefaultWriteTimeout = 3 * time.Second
|
DefaultWriteTimeout = 3 * time.Second
|
||||||
DefaultMaxMessageSize = 100 * units.Mb
|
DefaultMaxMessageSize = 100 * units.Mb
|
||||||
|
// ClientMessageSizeLimit is the maximum message size that clients can handle
|
||||||
|
// This is set to 100MB to allow large messages
|
||||||
|
ClientMessageSizeLimit = 100 * 1024 * 1024 // 100MB
|
||||||
|
|
||||||
// CloseMessage denotes a close control message. The optional message
|
// CloseMessage denotes a close control message. The optional message
|
||||||
// payload contains a numeric code and text. Use the FormatCloseMessage
|
// payload contains a numeric code and text. Use the FormatCloseMessage
|
||||||
@@ -84,6 +87,13 @@ whitelist:
|
|||||||
req: r,
|
req: r,
|
||||||
startTime: time.Now(),
|
startTime: time.Now(),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check for blacklisted IPs
|
||||||
|
listener.isBlacklisted = s.isIPBlacklisted(remote)
|
||||||
|
if listener.isBlacklisted {
|
||||||
|
log.W.F("detected blacklisted IP %s, marking connection for timeout", remote)
|
||||||
|
listener.blacklistTimeout = time.Now().Add(time.Minute) // Timeout after 1 minute
|
||||||
|
}
|
||||||
chal := make([]byte, 32)
|
chal := make([]byte, 32)
|
||||||
rand.Read(chal)
|
rand.Read(chal)
|
||||||
listener.challenge.Store([]byte(hex.Enc(chal)))
|
listener.challenge.Store([]byte(hex.Enc(chal)))
|
||||||
@@ -130,6 +140,13 @@ whitelist:
|
|||||||
return
|
return
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check if blacklisted connection has timed out
|
||||||
|
if listener.isBlacklisted && time.Now().After(listener.blacklistTimeout) {
|
||||||
|
log.W.F("blacklisted IP %s timeout reached, closing connection", remote)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
var typ websocket.MessageType
|
var typ websocket.MessageType
|
||||||
var msg []byte
|
var msg []byte
|
||||||
log.T.F("waiting for message from %s", remote)
|
log.T.F("waiting for message from %s", remote)
|
||||||
|
|||||||
@@ -17,13 +17,15 @@ import (
|
|||||||
|
|
||||||
type Listener struct {
|
type Listener struct {
|
||||||
*Server
|
*Server
|
||||||
conn *websocket.Conn
|
conn *websocket.Conn
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
remote string
|
remote string
|
||||||
req *http.Request
|
req *http.Request
|
||||||
challenge atomic.Bytes
|
challenge atomic.Bytes
|
||||||
authedPubkey atomic.Bytes
|
authedPubkey atomic.Bytes
|
||||||
startTime time.Time
|
startTime time.Time
|
||||||
|
isBlacklisted bool // Marker to identify blacklisted IPs
|
||||||
|
blacklistTimeout time.Time // When to timeout blacklisted connections
|
||||||
// Diagnostics: per-connection counters
|
// Diagnostics: per-connection counters
|
||||||
msgCount int
|
msgCount int
|
||||||
reqCount int
|
reqCount int
|
||||||
|
|||||||
39
app/main.go
39
app/main.go
@@ -4,6 +4,8 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"lol.mleku.dev/chk"
|
"lol.mleku.dev/chk"
|
||||||
"lol.mleku.dev/log"
|
"lol.mleku.dev/log"
|
||||||
@@ -18,11 +20,14 @@ import (
|
|||||||
func Run(
|
func Run(
|
||||||
ctx context.Context, cfg *config.C, db *database.D,
|
ctx context.Context, cfg *config.C, db *database.D,
|
||||||
) (quit chan struct{}) {
|
) (quit chan struct{}) {
|
||||||
|
quit = make(chan struct{})
|
||||||
|
var once sync.Once
|
||||||
|
|
||||||
// shutdown handler
|
// shutdown handler
|
||||||
go func() {
|
go func() {
|
||||||
<-ctx.Done()
|
<-ctx.Done()
|
||||||
log.I.F("shutting down")
|
log.I.F("shutting down")
|
||||||
close(quit)
|
once.Do(func() { close(quit) })
|
||||||
}()
|
}()
|
||||||
// get the admins
|
// get the admins
|
||||||
var err error
|
var err error
|
||||||
@@ -112,9 +117,37 @@ func Run(
|
|||||||
}
|
}
|
||||||
addr := fmt.Sprintf("%s:%d", cfg.Listen, cfg.Port)
|
addr := fmt.Sprintf("%s:%d", cfg.Listen, cfg.Port)
|
||||||
log.I.F("starting listener on http://%s", addr)
|
log.I.F("starting listener on http://%s", addr)
|
||||||
|
|
||||||
|
// Create HTTP server for graceful shutdown
|
||||||
|
srv := &http.Server{
|
||||||
|
Addr: addr,
|
||||||
|
Handler: l,
|
||||||
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
chk.E(http.ListenAndServe(addr, l))
|
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
|
||||||
|
log.E.F("HTTP server error: %v", err)
|
||||||
|
}
|
||||||
}()
|
}()
|
||||||
quit = make(chan struct{})
|
|
||||||
|
// Graceful shutdown handler
|
||||||
|
go func() {
|
||||||
|
<-ctx.Done()
|
||||||
|
log.I.F("shutting down HTTP server gracefully")
|
||||||
|
|
||||||
|
// Create shutdown context with timeout
|
||||||
|
shutdownCtx, cancelShutdown := context.WithTimeout(context.Background(), 10*time.Second)
|
||||||
|
defer cancelShutdown()
|
||||||
|
|
||||||
|
// Shutdown the server gracefully
|
||||||
|
if err := srv.Shutdown(shutdownCtx); err != nil {
|
||||||
|
log.E.F("HTTP server shutdown error: %v", err)
|
||||||
|
} else {
|
||||||
|
log.I.F("HTTP server shutdown completed")
|
||||||
|
}
|
||||||
|
|
||||||
|
once.Do(func() { close(quit) })
|
||||||
|
}()
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -32,7 +32,6 @@ type Server struct {
|
|||||||
mux *http.ServeMux
|
mux *http.ServeMux
|
||||||
Config *config.C
|
Config *config.C
|
||||||
Ctx context.Context
|
Ctx context.Context
|
||||||
remote string
|
|
||||||
publishers *publish.S
|
publishers *publish.S
|
||||||
Admins [][]byte
|
Admins [][]byte
|
||||||
Owners [][]byte
|
Owners [][]byte
|
||||||
@@ -50,6 +49,35 @@ type Server struct {
|
|||||||
policyManager *policy.P
|
policyManager *policy.P
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// isIPBlacklisted checks if an IP address is blacklisted using the managed ACL system
|
||||||
|
func (s *Server) isIPBlacklisted(remote string) bool {
|
||||||
|
// Extract IP from remote address (e.g., "192.168.1.1:12345" -> "192.168.1.1")
|
||||||
|
remoteIP := strings.Split(remote, ":")[0]
|
||||||
|
|
||||||
|
// Check static IP blacklist from config first
|
||||||
|
if len(s.Config.IPBlacklist) > 0 {
|
||||||
|
for _, blocked := range s.Config.IPBlacklist {
|
||||||
|
// Allow simple prefix matching for subnets (e.g., "192.168" matches 192.168.0.0/16)
|
||||||
|
if blocked != "" && strings.HasPrefix(remoteIP, blocked) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if managed ACL is available and active
|
||||||
|
if s.Config.ACLMode == "managed" {
|
||||||
|
for _, aclInstance := range acl.Registry.ACL {
|
||||||
|
if aclInstance.Type() == "managed" {
|
||||||
|
if managed, ok := aclInstance.(*acl.Managed); ok {
|
||||||
|
return managed.IsIPBlocked(remoteIP)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
// Set comprehensive CORS headers for proxy compatibility
|
// Set comprehensive CORS headers for proxy compatibility
|
||||||
w.Header().Set("Access-Control-Allow-Origin", "*")
|
w.Header().Set("Access-Control-Allow-Origin", "*")
|
||||||
|
|||||||
@@ -18,8 +18,8 @@ address = "0.0.0.0"
|
|||||||
messages_per_sec = 0
|
messages_per_sec = 0
|
||||||
subscriptions_per_min = 0
|
subscriptions_per_min = 0
|
||||||
max_event_bytes = 65535
|
max_event_bytes = 65535
|
||||||
max_ws_message_bytes = 131072
|
max_ws_message_bytes = 104857600
|
||||||
max_ws_frame_bytes = 131072
|
max_ws_frame_bytes = 104857600
|
||||||
|
|
||||||
[authorization]
|
[authorization]
|
||||||
pubkey_whitelist = []
|
pubkey_whitelist = []
|
||||||
|
|||||||
@@ -41,7 +41,7 @@ relay {
|
|||||||
}
|
}
|
||||||
|
|
||||||
# Maximum accepted incoming websocket frame size (should be larger than max event) (restart required)
|
# Maximum accepted incoming websocket frame size (should be larger than max event) (restart required)
|
||||||
maxWebsocketPayloadSize = 131072
|
maxWebsocketPayloadSize = 104857600
|
||||||
|
|
||||||
# Websocket-level PING message frequency (should be less than any reverse proxy idle timeouts) (restart required)
|
# Websocket-level PING message frequency (should be less than any reverse proxy idle timeouts) (restart required)
|
||||||
autoPingSeconds = 55
|
autoPingSeconds = 55
|
||||||
|
|||||||
@@ -250,7 +250,7 @@ relay {
|
|||||||
}
|
}
|
||||||
|
|
||||||
# Maximum accepted incoming websocket frame size (should be larger than max event) (restart required)
|
# Maximum accepted incoming websocket frame size (should be larger than max event) (restart required)
|
||||||
maxWebsocketPayloadSize = 131072
|
maxWebsocketPayloadSize = 104857600
|
||||||
|
|
||||||
# Websocket-level PING message frequency (should be less than any reverse proxy idle timeouts) (restart required)
|
# Websocket-level PING message frequency (should be less than any reverse proxy idle timeouts) (restart required)
|
||||||
autoPingSeconds = 55
|
autoPingSeconds = 55
|
||||||
@@ -332,8 +332,8 @@ address = "0.0.0.0"
|
|||||||
messages_per_sec = 0
|
messages_per_sec = 0
|
||||||
subscriptions_per_min = 0
|
subscriptions_per_min = 0
|
||||||
max_event_bytes = 65535
|
max_event_bytes = 65535
|
||||||
max_ws_message_bytes = 131072
|
max_ws_message_bytes = 104857600
|
||||||
max_ws_frame_bytes = 131072
|
max_ws_frame_bytes = 104857600
|
||||||
|
|
||||||
[authorization]
|
[authorization]
|
||||||
pubkey_whitelist = []
|
pubkey_whitelist = []
|
||||||
|
|||||||
199
main.go
199
main.go
@@ -9,6 +9,8 @@ import (
|
|||||||
"os/exec"
|
"os/exec"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
"runtime"
|
"runtime"
|
||||||
|
"sync"
|
||||||
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/pkg/profile"
|
"github.com/pkg/profile"
|
||||||
@@ -21,6 +23,7 @@ import (
|
|||||||
"next.orly.dev/pkg/database"
|
"next.orly.dev/pkg/database"
|
||||||
"next.orly.dev/pkg/encoders/hex"
|
"next.orly.dev/pkg/encoders/hex"
|
||||||
"next.orly.dev/pkg/spider"
|
"next.orly.dev/pkg/spider"
|
||||||
|
"next.orly.dev/pkg/utils/interrupt"
|
||||||
"next.orly.dev/pkg/version"
|
"next.orly.dev/pkg/version"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -53,46 +56,61 @@ func main() {
|
|||||||
runtime.GOMAXPROCS(runtime.NumCPU() * 4)
|
runtime.GOMAXPROCS(runtime.NumCPU() * 4)
|
||||||
var err error
|
var err error
|
||||||
var cfg *config.C
|
var cfg *config.C
|
||||||
if cfg, err = config.New(); chk.T(err) {
|
if cfg, err = config.New(); chk.T(err) {
|
||||||
}
|
}
|
||||||
log.I.F("starting %s %s", cfg.AppName, version.V)
|
log.I.F("starting %s %s", cfg.AppName, version.V)
|
||||||
|
|
||||||
// Handle 'identity' subcommand: print relay identity secret and pubkey and exit
|
// Handle 'identity' subcommand: print relay identity secret and pubkey and exit
|
||||||
if config.IdentityRequested() {
|
if config.IdentityRequested() {
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
defer cancel()
|
defer cancel()
|
||||||
var db *database.D
|
var db *database.D
|
||||||
if db, err = database.New(ctx, cancel, cfg.DataDir, cfg.DBLogLevel); chk.E(err) {
|
if db, err = database.New(ctx, cancel, cfg.DataDir, cfg.DBLogLevel); chk.E(err) {
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
defer db.Close()
|
defer db.Close()
|
||||||
skb, err := db.GetOrCreateRelayIdentitySecret()
|
skb, err := db.GetOrCreateRelayIdentitySecret()
|
||||||
if chk.E(err) {
|
if chk.E(err) {
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
pk, err := keys.SecretBytesToPubKeyHex(skb)
|
pk, err := keys.SecretBytesToPubKeyHex(skb)
|
||||||
if chk.E(err) {
|
if chk.E(err) {
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
fmt.Printf("identity secret: %s\nidentity pubkey: %s\n", hex.Enc(skb), pk)
|
fmt.Printf("identity secret: %s\nidentity pubkey: %s\n", hex.Enc(skb), pk)
|
||||||
os.Exit(0)
|
os.Exit(0)
|
||||||
}
|
}
|
||||||
|
|
||||||
// If OpenPprofWeb is true and profiling is enabled, we need to ensure HTTP profiling is also enabled
|
// If OpenPprofWeb is true and profiling is enabled, we need to ensure HTTP profiling is also enabled
|
||||||
if cfg.OpenPprofWeb && cfg.Pprof != "" && !cfg.PprofHTTP {
|
if cfg.OpenPprofWeb && cfg.Pprof != "" && !cfg.PprofHTTP {
|
||||||
log.I.F("enabling HTTP pprof server to support web viewer")
|
log.I.F("enabling HTTP pprof server to support web viewer")
|
||||||
cfg.PprofHTTP = true
|
cfg.PprofHTTP = true
|
||||||
}
|
}
|
||||||
|
// Ensure profiling is stopped on interrupts (SIGINT/SIGTERM) as well as on normal exit
|
||||||
|
var profileStopOnce sync.Once
|
||||||
|
profileStop := func() {}
|
||||||
switch cfg.Pprof {
|
switch cfg.Pprof {
|
||||||
case "cpu":
|
case "cpu":
|
||||||
if cfg.PprofPath != "" {
|
if cfg.PprofPath != "" {
|
||||||
prof := profile.Start(
|
prof := profile.Start(
|
||||||
profile.CPUProfile, profile.ProfilePath(cfg.PprofPath),
|
profile.CPUProfile, profile.ProfilePath(cfg.PprofPath),
|
||||||
)
|
)
|
||||||
defer prof.Stop()
|
profileStop = func() {
|
||||||
|
profileStopOnce.Do(func() {
|
||||||
|
prof.Stop()
|
||||||
|
log.I.F("cpu profiling stopped and flushed")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
defer profileStop()
|
||||||
} else {
|
} else {
|
||||||
prof := profile.Start(profile.CPUProfile)
|
prof := profile.Start(profile.CPUProfile)
|
||||||
defer prof.Stop()
|
profileStop = func() {
|
||||||
|
profileStopOnce.Do(func() {
|
||||||
|
prof.Stop()
|
||||||
|
log.I.F("cpu profiling stopped and flushed")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
defer profileStop()
|
||||||
}
|
}
|
||||||
case "memory":
|
case "memory":
|
||||||
if cfg.PprofPath != "" {
|
if cfg.PprofPath != "" {
|
||||||
@@ -100,10 +118,22 @@ func main() {
|
|||||||
profile.MemProfile, profile.MemProfileRate(32),
|
profile.MemProfile, profile.MemProfileRate(32),
|
||||||
profile.ProfilePath(cfg.PprofPath),
|
profile.ProfilePath(cfg.PprofPath),
|
||||||
)
|
)
|
||||||
defer prof.Stop()
|
profileStop = func() {
|
||||||
|
profileStopOnce.Do(func() {
|
||||||
|
prof.Stop()
|
||||||
|
log.I.F("memory profiling stopped and flushed")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
defer profileStop()
|
||||||
} else {
|
} else {
|
||||||
prof := profile.Start(profile.MemProfile)
|
prof := profile.Start(profile.MemProfile)
|
||||||
defer prof.Stop()
|
profileStop = func() {
|
||||||
|
profileStopOnce.Do(func() {
|
||||||
|
prof.Stop()
|
||||||
|
log.I.F("memory profiling stopped and flushed")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
defer profileStop()
|
||||||
}
|
}
|
||||||
case "allocation":
|
case "allocation":
|
||||||
if cfg.PprofPath != "" {
|
if cfg.PprofPath != "" {
|
||||||
@@ -111,30 +141,66 @@ func main() {
|
|||||||
profile.MemProfileAllocs, profile.MemProfileRate(32),
|
profile.MemProfileAllocs, profile.MemProfileRate(32),
|
||||||
profile.ProfilePath(cfg.PprofPath),
|
profile.ProfilePath(cfg.PprofPath),
|
||||||
)
|
)
|
||||||
defer prof.Stop()
|
profileStop = func() {
|
||||||
|
profileStopOnce.Do(func() {
|
||||||
|
prof.Stop()
|
||||||
|
log.I.F("allocation profiling stopped and flushed")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
defer profileStop()
|
||||||
} else {
|
} else {
|
||||||
prof := profile.Start(profile.MemProfileAllocs)
|
prof := profile.Start(profile.MemProfileAllocs)
|
||||||
defer prof.Stop()
|
profileStop = func() {
|
||||||
|
profileStopOnce.Do(func() {
|
||||||
|
prof.Stop()
|
||||||
|
log.I.F("allocation profiling stopped and flushed")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
defer profileStop()
|
||||||
}
|
}
|
||||||
case "heap":
|
case "heap":
|
||||||
if cfg.PprofPath != "" {
|
if cfg.PprofPath != "" {
|
||||||
prof := profile.Start(
|
prof := profile.Start(
|
||||||
profile.MemProfileHeap, profile.ProfilePath(cfg.PprofPath),
|
profile.MemProfileHeap, profile.ProfilePath(cfg.PprofPath),
|
||||||
)
|
)
|
||||||
defer prof.Stop()
|
profileStop = func() {
|
||||||
|
profileStopOnce.Do(func() {
|
||||||
|
prof.Stop()
|
||||||
|
log.I.F("heap profiling stopped and flushed")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
defer profileStop()
|
||||||
} else {
|
} else {
|
||||||
prof := profile.Start(profile.MemProfileHeap)
|
prof := profile.Start(profile.MemProfileHeap)
|
||||||
defer prof.Stop()
|
profileStop = func() {
|
||||||
|
profileStopOnce.Do(func() {
|
||||||
|
prof.Stop()
|
||||||
|
log.I.F("heap profiling stopped and flushed")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
defer profileStop()
|
||||||
}
|
}
|
||||||
case "mutex":
|
case "mutex":
|
||||||
if cfg.PprofPath != "" {
|
if cfg.PprofPath != "" {
|
||||||
prof := profile.Start(
|
prof := profile.Start(
|
||||||
profile.MutexProfile, profile.ProfilePath(cfg.PprofPath),
|
profile.MutexProfile, profile.ProfilePath(cfg.PprofPath),
|
||||||
)
|
)
|
||||||
defer prof.Stop()
|
profileStop = func() {
|
||||||
|
profileStopOnce.Do(func() {
|
||||||
|
prof.Stop()
|
||||||
|
log.I.F("mutex profiling stopped and flushed")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
defer profileStop()
|
||||||
} else {
|
} else {
|
||||||
prof := profile.Start(profile.MutexProfile)
|
prof := profile.Start(profile.MutexProfile)
|
||||||
defer prof.Stop()
|
profileStop = func() {
|
||||||
|
profileStopOnce.Do(func() {
|
||||||
|
prof.Stop()
|
||||||
|
log.I.F("mutex profiling stopped and flushed")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
defer profileStop()
|
||||||
}
|
}
|
||||||
case "threadcreate":
|
case "threadcreate":
|
||||||
if cfg.PprofPath != "" {
|
if cfg.PprofPath != "" {
|
||||||
@@ -142,33 +208,75 @@ func main() {
|
|||||||
profile.ThreadcreationProfile,
|
profile.ThreadcreationProfile,
|
||||||
profile.ProfilePath(cfg.PprofPath),
|
profile.ProfilePath(cfg.PprofPath),
|
||||||
)
|
)
|
||||||
defer prof.Stop()
|
profileStop = func() {
|
||||||
|
profileStopOnce.Do(func() {
|
||||||
|
prof.Stop()
|
||||||
|
log.I.F("threadcreate profiling stopped and flushed")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
defer profileStop()
|
||||||
} else {
|
} else {
|
||||||
prof := profile.Start(profile.ThreadcreationProfile)
|
prof := profile.Start(profile.ThreadcreationProfile)
|
||||||
defer prof.Stop()
|
profileStop = func() {
|
||||||
|
profileStopOnce.Do(func() {
|
||||||
|
prof.Stop()
|
||||||
|
log.I.F("threadcreate profiling stopped and flushed")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
defer profileStop()
|
||||||
}
|
}
|
||||||
case "goroutine":
|
case "goroutine":
|
||||||
if cfg.PprofPath != "" {
|
if cfg.PprofPath != "" {
|
||||||
prof := profile.Start(
|
prof := profile.Start(
|
||||||
profile.GoroutineProfile, profile.ProfilePath(cfg.PprofPath),
|
profile.GoroutineProfile, profile.ProfilePath(cfg.PprofPath),
|
||||||
)
|
)
|
||||||
defer prof.Stop()
|
profileStop = func() {
|
||||||
|
profileStopOnce.Do(func() {
|
||||||
|
prof.Stop()
|
||||||
|
log.I.F("goroutine profiling stopped and flushed")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
defer profileStop()
|
||||||
} else {
|
} else {
|
||||||
prof := profile.Start(profile.GoroutineProfile)
|
prof := profile.Start(profile.GoroutineProfile)
|
||||||
defer prof.Stop()
|
profileStop = func() {
|
||||||
|
profileStopOnce.Do(func() {
|
||||||
|
prof.Stop()
|
||||||
|
log.I.F("goroutine profiling stopped and flushed")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
defer profileStop()
|
||||||
}
|
}
|
||||||
case "block":
|
case "block":
|
||||||
if cfg.PprofPath != "" {
|
if cfg.PprofPath != "" {
|
||||||
prof := profile.Start(
|
prof := profile.Start(
|
||||||
profile.BlockProfile, profile.ProfilePath(cfg.PprofPath),
|
profile.BlockProfile, profile.ProfilePath(cfg.PprofPath),
|
||||||
)
|
)
|
||||||
defer prof.Stop()
|
profileStop = func() {
|
||||||
|
profileStopOnce.Do(func() {
|
||||||
|
prof.Stop()
|
||||||
|
log.I.F("block profiling stopped and flushed")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
defer profileStop()
|
||||||
} else {
|
} else {
|
||||||
prof := profile.Start(profile.BlockProfile)
|
prof := profile.Start(profile.BlockProfile)
|
||||||
defer prof.Stop()
|
profileStop = func() {
|
||||||
|
profileStopOnce.Do(func() {
|
||||||
|
prof.Stop()
|
||||||
|
log.I.F("block profiling stopped and flushed")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
defer profileStop()
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Register a handler so profiling is stopped when an interrupt is received
|
||||||
|
interrupt.AddHandler(func() {
|
||||||
|
log.I.F("interrupt received: stopping profiling")
|
||||||
|
profileStop()
|
||||||
|
})
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
var db *database.D
|
var db *database.D
|
||||||
if db, err = database.New(
|
if db, err = database.New(
|
||||||
@@ -220,7 +328,7 @@ func main() {
|
|||||||
|
|
||||||
// Open the pprof web viewer if enabled
|
// Open the pprof web viewer if enabled
|
||||||
if cfg.OpenPprofWeb && cfg.Pprof != "" {
|
if cfg.OpenPprofWeb && cfg.Pprof != "" {
|
||||||
pprofURL := fmt.Sprintf("http://localhost:6060/debug/pprof/")
|
pprofURL := "http://localhost:6060/debug/pprof/"
|
||||||
go func() {
|
go func() {
|
||||||
// Wait a moment for the server to start
|
// Wait a moment for the server to start
|
||||||
time.Sleep(500 * time.Millisecond)
|
time.Sleep(500 * time.Millisecond)
|
||||||
@@ -277,21 +385,24 @@ func main() {
|
|||||||
|
|
||||||
quit := app.Run(ctx, cfg, db)
|
quit := app.Run(ctx, cfg, db)
|
||||||
sigs := make(chan os.Signal, 1)
|
sigs := make(chan os.Signal, 1)
|
||||||
signal.Notify(sigs, os.Interrupt)
|
signal.Notify(sigs, os.Interrupt, syscall.SIGTERM)
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-sigs:
|
case <-sigs:
|
||||||
fmt.Printf("\r")
|
fmt.Printf("\r")
|
||||||
cancel()
|
log.I.F("received shutdown signal, starting graceful shutdown")
|
||||||
|
cancel() // This will trigger HTTP server shutdown
|
||||||
|
<-quit // Wait for HTTP server to shut down
|
||||||
chk.E(db.Close())
|
chk.E(db.Close())
|
||||||
log.I.F("exiting")
|
log.I.F("exiting")
|
||||||
return
|
return
|
||||||
case <-quit:
|
case <-quit:
|
||||||
|
log.I.F("application quit signal received")
|
||||||
cancel()
|
cancel()
|
||||||
chk.E(db.Close())
|
chk.E(db.Close())
|
||||||
log.I.F("exiting")
|
log.I.F("exiting")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
log.I.F("exiting")
|
// log.I.F("exiting")
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -44,6 +44,8 @@ type Follows struct {
|
|||||||
follows [][]byte
|
follows [][]byte
|
||||||
updated chan struct{}
|
updated chan struct{}
|
||||||
subsCancel context.CancelFunc
|
subsCancel context.CancelFunc
|
||||||
|
// Track last follow list fetch time
|
||||||
|
lastFollowListFetch time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *Follows) Configure(cfg ...any) (err error) {
|
func (f *Follows) Configure(cfg ...any) (err error) {
|
||||||
@@ -176,6 +178,33 @@ func (f *Follows) adminRelays() (urls []string) {
|
|||||||
copy(admins, f.admins)
|
copy(admins, f.admins)
|
||||||
f.followsMx.RUnlock()
|
f.followsMx.RUnlock()
|
||||||
seen := make(map[string]struct{})
|
seen := make(map[string]struct{})
|
||||||
|
// Build a set of normalized self relay addresses to avoid self-connections
|
||||||
|
selfSet := make(map[string]struct{})
|
||||||
|
selfHosts := make(map[string]struct{})
|
||||||
|
if f.cfg != nil && len(f.cfg.RelayAddresses) > 0 {
|
||||||
|
for _, a := range f.cfg.RelayAddresses {
|
||||||
|
n := string(normalize.URL(a))
|
||||||
|
if n == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
selfSet[n] = struct{}{}
|
||||||
|
// Also record hostname (without port) for robust matching
|
||||||
|
// Accept simple splitting; normalize.URL ensures scheme://host[:port]
|
||||||
|
host := n
|
||||||
|
if i := strings.Index(host, "://"); i >= 0 {
|
||||||
|
host = host[i+3:]
|
||||||
|
}
|
||||||
|
if j := strings.Index(host, "/"); j >= 0 {
|
||||||
|
host = host[:j]
|
||||||
|
}
|
||||||
|
if k := strings.Index(host, ":"); k >= 0 {
|
||||||
|
host = host[:k]
|
||||||
|
}
|
||||||
|
if host != "" {
|
||||||
|
selfHosts[host] = struct{}{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// First, try to get relay URLs from admin kind 10002 events
|
// First, try to get relay URLs from admin kind 10002 events
|
||||||
for _, adm := range admins {
|
for _, adm := range admins {
|
||||||
@@ -206,6 +235,26 @@ func (f *Follows) adminRelays() (urls []string) {
|
|||||||
if n == "" {
|
if n == "" {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
// Skip if this URL is one of our configured self relay addresses or hosts
|
||||||
|
if _, isSelf := selfSet[n]; isSelf {
|
||||||
|
log.D.F("follows syncer: skipping configured self relay address: %s", n)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// Host match
|
||||||
|
host := n
|
||||||
|
if i := strings.Index(host, "://"); i >= 0 {
|
||||||
|
host = host[i+3:]
|
||||||
|
}
|
||||||
|
if j := strings.Index(host, "/"); j >= 0 {
|
||||||
|
host = host[:j]
|
||||||
|
}
|
||||||
|
if k := strings.Index(host, ":"); k >= 0 {
|
||||||
|
host = host[:k]
|
||||||
|
}
|
||||||
|
if _, isSelfHost := selfHosts[host]; isSelfHost {
|
||||||
|
log.D.F("follows syncer: skipping configured self relay address: %s", n)
|
||||||
|
continue
|
||||||
|
}
|
||||||
if _, ok := seen[n]; ok {
|
if _, ok := seen[n]; ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@@ -226,6 +275,26 @@ func (f *Follows) adminRelays() (urls []string) {
|
|||||||
log.W.F("invalid bootstrap relay URL: %s", relay)
|
log.W.F("invalid bootstrap relay URL: %s", relay)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
// Skip if this URL is one of our configured self relay addresses or hosts
|
||||||
|
if _, isSelf := selfSet[n]; isSelf {
|
||||||
|
log.D.F("follows syncer: skipping configured self relay address: %s", n)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// Host match
|
||||||
|
host := n
|
||||||
|
if i := strings.Index(host, "://"); i >= 0 {
|
||||||
|
host = host[i+3:]
|
||||||
|
}
|
||||||
|
if j := strings.Index(host, "/"); j >= 0 {
|
||||||
|
host = host[:j]
|
||||||
|
}
|
||||||
|
if k := strings.Index(host, ":"); k >= 0 {
|
||||||
|
host = host[:k]
|
||||||
|
}
|
||||||
|
if _, isSelfHost := selfHosts[host]; isSelfHost {
|
||||||
|
log.D.F("follows syncer: skipping configured self relay address: %s", n)
|
||||||
|
continue
|
||||||
|
}
|
||||||
if _, ok := seen[n]; ok {
|
if _, ok := seen[n]; ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@@ -240,7 +309,7 @@ func (f *Follows) adminRelays() (urls []string) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *Follows) startSubscriptions(ctx context.Context) {
|
func (f *Follows) startEventSubscriptions(ctx context.Context) {
|
||||||
// build authors list: admins + follows
|
// build authors list: admins + follows
|
||||||
f.followsMx.RLock()
|
f.followsMx.RLock()
|
||||||
authors := make([][]byte, 0, len(f.admins)+len(f.follows))
|
authors := make([][]byte, 0, len(f.admins)+len(f.follows))
|
||||||
@@ -257,10 +326,11 @@ func (f *Follows) startSubscriptions(ctx context.Context) {
|
|||||||
log.W.F("follows syncer: no admin relays found in DB (kind 10002) and no bootstrap relays configured")
|
log.W.F("follows syncer: no admin relays found in DB (kind 10002) and no bootstrap relays configured")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
log.T.F(
|
log.I.F(
|
||||||
"follows syncer: subscribing to %d relays for %d authors", len(urls),
|
"follows syncer: subscribing to %d relays for %d authors", len(urls),
|
||||||
len(authors),
|
len(authors),
|
||||||
)
|
)
|
||||||
|
log.I.F("follows syncer: starting follow list fetching from relays: %v", urls)
|
||||||
for _, u := range urls {
|
for _, u := range urls {
|
||||||
u := u
|
u := u
|
||||||
go func() {
|
go func() {
|
||||||
@@ -336,11 +406,13 @@ func (f *Follows) startSubscriptions(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
backoff = time.Second
|
backoff = time.Second
|
||||||
log.T.F("follows syncer: successfully connected to %s", u)
|
log.T.F("follows syncer: successfully connected to %s", u)
|
||||||
|
log.I.F("follows syncer: subscribing to events from relay %s", u)
|
||||||
|
|
||||||
// send REQ for kind 3 (follow lists), kind 10002 (relay lists), and all events from follows
|
// send REQ for admin follow lists, relay lists, and all events from follows
|
||||||
ff := &filter.S{}
|
ff := &filter.S{}
|
||||||
|
// Add filter for admin follow lists (kind 3) - for immediate updates
|
||||||
f1 := &filter.F{
|
f1 := &filter.F{
|
||||||
Authors: tag.NewFromBytesSlice(authors...),
|
Authors: tag.NewFromBytesSlice(f.admins...),
|
||||||
Kinds: kind.NewS(kind.New(kind.FollowList.K)),
|
Kinds: kind.NewS(kind.New(kind.FollowList.K)),
|
||||||
Limit: values.ToUintPointer(100),
|
Limit: values.ToUintPointer(100),
|
||||||
}
|
}
|
||||||
@@ -354,103 +426,139 @@ func (f *Follows) startSubscriptions(ctx context.Context) {
|
|||||||
f3 := &filter.F{
|
f3 := &filter.F{
|
||||||
Authors: tag.NewFromBytesSlice(authors...),
|
Authors: tag.NewFromBytesSlice(authors...),
|
||||||
Since: oneMonthAgo,
|
Since: oneMonthAgo,
|
||||||
Limit: values.ToUintPointer(1000),
|
Limit: values.ToUintPointer(500),
|
||||||
}
|
}
|
||||||
*ff = append(*ff, f1, f2, f3)
|
*ff = append(*ff, f1, f2, f3)
|
||||||
req := reqenvelope.NewFrom([]byte("follows-sync"), ff)
|
// Use a subscription ID for event sync (no follow lists)
|
||||||
|
subID := "event-sync"
|
||||||
|
req := reqenvelope.NewFrom([]byte(subID), ff)
|
||||||
|
reqBytes := req.Marshal(nil)
|
||||||
|
log.T.F("follows syncer: outbound REQ to %s: %s", u, string(reqBytes))
|
||||||
if err = c.Write(
|
if err = c.Write(
|
||||||
ctx, websocket.MessageText, req.Marshal(nil),
|
ctx, websocket.MessageText, reqBytes,
|
||||||
); chk.E(err) {
|
); chk.E(err) {
|
||||||
log.W.F(
|
log.W.F(
|
||||||
"follows syncer: failed to send REQ to %s: %v", u, err,
|
"follows syncer: failed to send event REQ to %s: %v", u, err,
|
||||||
)
|
)
|
||||||
_ = c.Close(websocket.StatusInternalError, "write failed")
|
_ = c.Close(websocket.StatusInternalError, "write failed")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
log.T.F(
|
log.T.F(
|
||||||
"follows syncer: sent REQ to %s for kind 3, 10002, and all events (last 30 days) from followed users",
|
"follows syncer: sent event REQ to %s for admin follow lists, kind 10002, and all events (last 30 days) from followed users",
|
||||||
u,
|
u,
|
||||||
)
|
)
|
||||||
// read loop
|
// read loop with keepalive
|
||||||
|
keepaliveTicker := time.NewTicker(30 * time.Second)
|
||||||
|
defer keepaliveTicker.Stop()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
_ = c.Close(websocket.StatusNormalClosure, "ctx done")
|
_ = c.Close(websocket.StatusNormalClosure, "ctx done")
|
||||||
return
|
return
|
||||||
default:
|
case <-keepaliveTicker.C:
|
||||||
}
|
// Send ping to keep connection alive
|
||||||
_, data, err := c.Read(ctx)
|
if err := c.Ping(ctx); err != nil {
|
||||||
if err != nil {
|
log.T.F("follows syncer: ping failed for %s: %v", u, err)
|
||||||
_ = c.Close(websocket.StatusNormalClosure, "read err")
|
break
|
||||||
break
|
}
|
||||||
}
|
log.T.F("follows syncer: sent ping to %s", u)
|
||||||
label, rem, err := envelopes.Identify(data)
|
|
||||||
if chk.E(err) {
|
|
||||||
continue
|
continue
|
||||||
}
|
default:
|
||||||
switch label {
|
// Set a read timeout to avoid hanging
|
||||||
case eventenvelope.L:
|
readCtx, readCancel := context.WithTimeout(ctx, 60*time.Second)
|
||||||
res, _, err := eventenvelope.ParseResult(rem)
|
_, data, err := c.Read(readCtx)
|
||||||
if chk.E(err) || res == nil || res.Event == nil {
|
readCancel()
|
||||||
continue
|
if err != nil {
|
||||||
}
|
_ = c.Close(websocket.StatusNormalClosure, "read err")
|
||||||
// verify signature before saving
|
break
|
||||||
if ok, err := res.Event.Verify(); chk.T(err) || !ok {
|
}
|
||||||
|
label, rem, err := envelopes.Identify(data)
|
||||||
|
if chk.E(err) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
switch label {
|
||||||
|
case eventenvelope.L:
|
||||||
|
res, _, err := eventenvelope.ParseResult(rem)
|
||||||
|
if chk.E(err) || res == nil || res.Event == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// verify signature before saving
|
||||||
|
if ok, err := res.Event.Verify(); chk.T(err) || !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
// Process events based on kind
|
// Process events based on kind
|
||||||
switch res.Event.Kind {
|
switch res.Event.Kind {
|
||||||
case kind.FollowList.K:
|
case kind.FollowList.K:
|
||||||
log.T.F(
|
// Check if this is from an admin and process immediately
|
||||||
"follows syncer: received kind 3 (follow list) event from %s on relay %s",
|
if f.isAdminPubkey(res.Event.Pubkey) {
|
||||||
hex.EncodeToString(res.Event.Pubkey), u,
|
log.I.F(
|
||||||
)
|
"follows syncer: received admin follow list from %s on relay %s - processing immediately",
|
||||||
// Extract followed pubkeys from 'p' tags in kind 3 events
|
hex.EncodeToString(res.Event.Pubkey), u,
|
||||||
f.extractFollowedPubkeys(res.Event)
|
)
|
||||||
case kind.RelayListMetadata.K:
|
f.extractFollowedPubkeys(res.Event)
|
||||||
log.T.F(
|
} else {
|
||||||
"follows syncer: received kind 10002 (relay list) event from %s on relay %s",
|
log.T.F(
|
||||||
hex.EncodeToString(res.Event.Pubkey), u,
|
"follows syncer: received follow list from non-admin %s on relay %s - ignoring",
|
||||||
)
|
hex.EncodeToString(res.Event.Pubkey), u,
|
||||||
default:
|
)
|
||||||
// Log all other events from followed users
|
}
|
||||||
log.T.F(
|
case kind.RelayListMetadata.K:
|
||||||
"follows syncer: received kind %d event from %s on relay %s",
|
log.T.F(
|
||||||
res.Event.Kind,
|
"follows syncer: received kind 10002 (relay list) event from %s on relay %s",
|
||||||
hex.EncodeToString(res.Event.Pubkey), u,
|
hex.EncodeToString(res.Event.Pubkey), u,
|
||||||
)
|
)
|
||||||
}
|
default:
|
||||||
|
// Log all other events from followed users
|
||||||
if _, err = f.D.SaveEvent(
|
log.T.F(
|
||||||
ctx, res.Event,
|
"follows syncer: received kind %d event from %s on relay %s",
|
||||||
); err != nil {
|
res.Event.Kind,
|
||||||
if !strings.HasPrefix(
|
hex.EncodeToString(res.Event.Pubkey), u,
|
||||||
err.Error(), "blocked:",
|
|
||||||
) {
|
|
||||||
log.W.F(
|
|
||||||
"follows syncer: save event failed: %v",
|
|
||||||
err,
|
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
// ignore duplicates and continue
|
|
||||||
} else {
|
if _, err = f.D.SaveEvent(
|
||||||
// Only dispatch if the event was newly saved (no error)
|
ctx, res.Event,
|
||||||
if f.pubs != nil {
|
); err != nil {
|
||||||
go f.pubs.Deliver(res.Event)
|
if !strings.HasPrefix(
|
||||||
|
err.Error(), "blocked:",
|
||||||
|
) {
|
||||||
|
log.W.F(
|
||||||
|
"follows syncer: save event failed: %v",
|
||||||
|
err,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
// ignore duplicates and continue
|
||||||
|
} else {
|
||||||
|
// Only dispatch if the event was newly saved (no error)
|
||||||
|
if f.pubs != nil {
|
||||||
|
go f.pubs.Deliver(res.Event)
|
||||||
|
}
|
||||||
|
// log.I.F(
|
||||||
|
// "saved new event from follows syncer: %0x",
|
||||||
|
// res.Event.ID,
|
||||||
|
// )
|
||||||
}
|
}
|
||||||
// log.I.F(
|
case eoseenvelope.L:
|
||||||
// "saved new event from follows syncer: %0x",
|
log.T.F("follows syncer: received EOSE from %s, continuing persistent subscription", u)
|
||||||
// res.Event.ID,
|
// Continue the subscription for new events
|
||||||
// )
|
default:
|
||||||
|
// ignore other labels
|
||||||
}
|
}
|
||||||
case eoseenvelope.L:
|
|
||||||
// ignore, continue subscription
|
|
||||||
default:
|
|
||||||
// ignore other labels
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// loop reconnect
|
// Connection dropped, reconnect after delay
|
||||||
|
log.W.F("follows syncer: connection to %s dropped, will reconnect in 30 seconds", u)
|
||||||
|
|
||||||
|
// Wait before reconnecting to avoid tight reconnection loops
|
||||||
|
timer := time.NewTimer(30 * time.Second)
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-timer.C:
|
||||||
|
// Continue to reconnect
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
@@ -458,6 +566,11 @@ func (f *Follows) startSubscriptions(ctx context.Context) {
|
|||||||
|
|
||||||
func (f *Follows) Syncer() {
|
func (f *Follows) Syncer() {
|
||||||
log.I.F("starting follows syncer")
|
log.I.F("starting follows syncer")
|
||||||
|
|
||||||
|
// Start periodic follow list fetching
|
||||||
|
go f.startPeriodicFollowListFetching()
|
||||||
|
|
||||||
|
// Start event subscriptions
|
||||||
go func() {
|
go func() {
|
||||||
// start immediately if Configure already ran
|
// start immediately if Configure already ran
|
||||||
for {
|
for {
|
||||||
@@ -478,7 +591,7 @@ func (f *Follows) Syncer() {
|
|||||||
f.subsCancel = cancel
|
f.subsCancel = cancel
|
||||||
innerCancel = cancel
|
innerCancel = cancel
|
||||||
log.I.F("follows syncer: (re)opening subscriptions")
|
log.I.F("follows syncer: (re)opening subscriptions")
|
||||||
f.startSubscriptions(ctx)
|
f.startEventSubscriptions(ctx)
|
||||||
}
|
}
|
||||||
// small sleep to avoid tight loop if updated fires rapidly
|
// small sleep to avoid tight loop if updated fires rapidly
|
||||||
if innerCancel == nil {
|
if innerCancel == nil {
|
||||||
@@ -489,6 +602,150 @@ func (f *Follows) Syncer() {
|
|||||||
f.updated <- struct{}{}
|
f.updated <- struct{}{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// startPeriodicFollowListFetching starts periodic fetching of admin follow lists
|
||||||
|
func (f *Follows) startPeriodicFollowListFetching() {
|
||||||
|
frequency := f.cfg.FollowListFrequency
|
||||||
|
if frequency == 0 {
|
||||||
|
frequency = time.Hour // Default to 1 hour
|
||||||
|
}
|
||||||
|
|
||||||
|
log.I.F("starting periodic follow list fetching every %v", frequency)
|
||||||
|
|
||||||
|
ticker := time.NewTicker(frequency)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
// Fetch immediately on startup
|
||||||
|
f.fetchAdminFollowLists()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-f.Ctx.Done():
|
||||||
|
log.D.F("periodic follow list fetching stopped due to context cancellation")
|
||||||
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
f.fetchAdminFollowLists()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// fetchAdminFollowLists fetches follow lists from admin relays
|
||||||
|
func (f *Follows) fetchAdminFollowLists() {
|
||||||
|
log.I.F("follows syncer: fetching admin follow lists")
|
||||||
|
|
||||||
|
urls := f.adminRelays()
|
||||||
|
if len(urls) == 0 {
|
||||||
|
log.W.F("follows syncer: no admin relays found for follow list fetching")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// build authors list: admins only (not follows)
|
||||||
|
f.followsMx.RLock()
|
||||||
|
authors := make([][]byte, len(f.admins))
|
||||||
|
copy(authors, f.admins)
|
||||||
|
f.followsMx.RUnlock()
|
||||||
|
|
||||||
|
if len(authors) == 0 {
|
||||||
|
log.W.F("follows syncer: no admins to fetch follow lists for")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
log.I.F("follows syncer: fetching follow lists from %d relays for %d admins", len(urls), len(authors))
|
||||||
|
|
||||||
|
for _, u := range urls {
|
||||||
|
go f.fetchFollowListsFromRelay(u, authors)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// fetchFollowListsFromRelay fetches follow lists from a specific relay
|
||||||
|
func (f *Follows) fetchFollowListsFromRelay(relayURL string, authors [][]byte) {
|
||||||
|
ctx, cancel := context.WithTimeout(f.Ctx, 30*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
// Create proper headers for the WebSocket connection
|
||||||
|
headers := http.Header{}
|
||||||
|
headers.Set("User-Agent", "ORLY-Relay/0.9.2")
|
||||||
|
headers.Set("Origin", "https://orly.dev")
|
||||||
|
|
||||||
|
// Use proper WebSocket dial options
|
||||||
|
dialOptions := &websocket.DialOptions{
|
||||||
|
HTTPHeader: headers,
|
||||||
|
}
|
||||||
|
|
||||||
|
c, _, err := websocket.Dial(ctx, relayURL, dialOptions)
|
||||||
|
if err != nil {
|
||||||
|
log.W.F("follows syncer: failed to connect to %s for follow list fetch: %v", relayURL, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer c.Close(websocket.StatusNormalClosure, "follow list fetch complete")
|
||||||
|
|
||||||
|
log.I.F("follows syncer: fetching follow lists from relay %s", relayURL)
|
||||||
|
|
||||||
|
// Create filter for follow lists only (kind 3)
|
||||||
|
ff := &filter.S{}
|
||||||
|
f1 := &filter.F{
|
||||||
|
Authors: tag.NewFromBytesSlice(authors...),
|
||||||
|
Kinds: kind.NewS(kind.New(kind.FollowList.K)),
|
||||||
|
Limit: values.ToUintPointer(100),
|
||||||
|
}
|
||||||
|
*ff = append(*ff, f1)
|
||||||
|
|
||||||
|
// Use a specific subscription ID for follow list fetching
|
||||||
|
subID := "follow-lists-fetch"
|
||||||
|
req := reqenvelope.NewFrom([]byte(subID), ff)
|
||||||
|
reqBytes := req.Marshal(nil)
|
||||||
|
log.T.F("follows syncer: outbound REQ to %s: %s", relayURL, string(reqBytes))
|
||||||
|
if err = c.Write(ctx, websocket.MessageText, reqBytes); chk.E(err) {
|
||||||
|
log.W.F("follows syncer: failed to send follow list REQ to %s: %v", relayURL, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
log.T.F("follows syncer: sent follow list REQ to %s", relayURL)
|
||||||
|
|
||||||
|
// Read follow list events with timeout
|
||||||
|
timeout := time.After(10 * time.Second)
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-timeout:
|
||||||
|
log.T.F("follows syncer: timeout reading follow lists from %s", relayURL)
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
_, data, err := c.Read(ctx)
|
||||||
|
if err != nil {
|
||||||
|
log.T.F("follows syncer: error reading follow lists from %s: %v", relayURL, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
label, rem, err := envelopes.Identify(data)
|
||||||
|
if chk.E(err) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
switch label {
|
||||||
|
case eventenvelope.L:
|
||||||
|
res, _, err := eventenvelope.ParseResult(rem)
|
||||||
|
if chk.E(err) || res == nil || res.Event == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Process follow list events
|
||||||
|
if res.Event.Kind == kind.FollowList.K {
|
||||||
|
log.I.F("follows syncer: received follow list from %s on relay %s",
|
||||||
|
hex.EncodeToString(res.Event.Pubkey), relayURL)
|
||||||
|
f.extractFollowedPubkeys(res.Event)
|
||||||
|
}
|
||||||
|
case eoseenvelope.L:
|
||||||
|
log.T.F("follows syncer: end of follow list events from %s", relayURL)
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
// ignore other labels
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// GetFollowedPubkeys returns a copy of the followed pubkeys list
|
// GetFollowedPubkeys returns a copy of the followed pubkeys list
|
||||||
func (f *Follows) GetFollowedPubkeys() [][]byte {
|
func (f *Follows) GetFollowedPubkeys() [][]byte {
|
||||||
f.followsMx.RLock()
|
f.followsMx.RLock()
|
||||||
@@ -499,6 +756,19 @@ func (f *Follows) GetFollowedPubkeys() [][]byte {
|
|||||||
return followedPubkeys
|
return followedPubkeys
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// isAdminPubkey checks if a pubkey belongs to an admin
|
||||||
|
func (f *Follows) isAdminPubkey(pubkey []byte) bool {
|
||||||
|
f.followsMx.RLock()
|
||||||
|
defer f.followsMx.RUnlock()
|
||||||
|
|
||||||
|
for _, admin := range f.admins {
|
||||||
|
if utils.FastEqual(admin, pubkey) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
// extractFollowedPubkeys extracts followed pubkeys from 'p' tags in kind 3 events
|
// extractFollowedPubkeys extracts followed pubkeys from 'p' tags in kind 3 events
|
||||||
func (f *Follows) extractFollowedPubkeys(event *event.E) {
|
func (f *Follows) extractFollowedPubkeys(event *event.E) {
|
||||||
if event.Kind != kind.FollowList.K {
|
if event.Kind != kind.FollowList.K {
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/dgraph-io/badger/v4"
|
"github.com/dgraph-io/badger/v4"
|
||||||
@@ -52,18 +53,29 @@ func New(
|
|||||||
}
|
}
|
||||||
|
|
||||||
opts := badger.DefaultOptions(d.dataDir)
|
opts := badger.DefaultOptions(d.dataDir)
|
||||||
// Use sane defaults to avoid excessive memory usage during startup.
|
// Configure caches based on environment to better match workload.
|
||||||
// Badger's default BlockSize is small (e.g., 4KB). Overriding it to very large values
|
// Defaults aim for higher hit ratios under read-heavy workloads while remaining safe.
|
||||||
// can cause massive allocations and OOM panics during deployments.
|
var blockCacheMB = 512 // default 512 MB
|
||||||
// Set BlockCacheSize to a moderate value and keep BlockSize small.
|
var indexCacheMB = 256 // default 256 MB
|
||||||
opts.BlockCacheSize = int64(256 * units.Mb) // 256 MB cache
|
if v := os.Getenv("ORLY_DB_BLOCK_CACHE_MB"); v != "" {
|
||||||
opts.BlockSize = 4 * units.Kb // 4 KB block size
|
if n, perr := strconv.Atoi(v); perr == nil && n > 0 {
|
||||||
|
blockCacheMB = n
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if v := os.Getenv("ORLY_DB_INDEX_CACHE_MB"); v != "" {
|
||||||
|
if n, perr := strconv.Atoi(v); perr == nil && n > 0 {
|
||||||
|
indexCacheMB = n
|
||||||
|
}
|
||||||
|
}
|
||||||
|
opts.BlockCacheSize = int64(blockCacheMB * units.Mb)
|
||||||
|
opts.IndexCacheSize = int64(indexCacheMB * units.Mb)
|
||||||
|
opts.BlockSize = 4 * units.Kb // 4 KB block size
|
||||||
// Prevent huge allocations during table building and memtable flush.
|
// Prevent huge allocations during table building and memtable flush.
|
||||||
// Badger's TableBuilder buffer is sized by BaseTableSize; ensure it's small.
|
// Badger's TableBuilder buffer is sized by BaseTableSize; ensure it's small.
|
||||||
opts.BaseTableSize = 64 * units.Mb // 64 MB per table (default ~2MB, increased for fewer files but safe)
|
opts.BaseTableSize = 64 * units.Mb // 64 MB per table (default ~2MB, increased for fewer files but safe)
|
||||||
opts.MemTableSize = 64 * units.Mb // 64 MB memtable to match table size
|
opts.MemTableSize = 64 * units.Mb // 64 MB memtable to match table size
|
||||||
// Keep value log files to a moderate size as well
|
// Keep value log files to a moderate size as well
|
||||||
opts.ValueLogFileSize = 256 * units.Mb // 256 MB value log files
|
opts.ValueLogFileSize = 256 * units.Mb // 256 MB value log files
|
||||||
opts.CompactL0OnClose = true
|
opts.CompactL0OnClose = true
|
||||||
opts.LmaxCompaction = true
|
opts.LmaxCompaction = true
|
||||||
opts.Compression = options.None
|
opts.Compression = options.None
|
||||||
|
|||||||
@@ -263,6 +263,7 @@ func (r *Client) ConnectWithTLS(
|
|||||||
case wr := <-r.writeQueue:
|
case wr := <-r.writeQueue:
|
||||||
// all write requests will go through this to prevent races
|
// all write requests will go through this to prevent races
|
||||||
// log.D.F("{%s} sending %v\n", r.URL, string(wr.msg))
|
// log.D.F("{%s} sending %v\n", r.URL, string(wr.msg))
|
||||||
|
log.T.F("WS.Client: outbound message to %s: %s", r.URL, string(wr.msg))
|
||||||
if err = r.Connection.WriteMessage(
|
if err = r.Connection.WriteMessage(
|
||||||
r.connectionContext, wr.msg,
|
r.connectionContext, wr.msg,
|
||||||
); err != nil {
|
); err != nil {
|
||||||
|
|||||||
@@ -171,6 +171,7 @@ func (sub *Subscription) Close() {
|
|||||||
if sub.Client.IsConnected() {
|
if sub.Client.IsConnected() {
|
||||||
closeMsg := closeenvelope.NewFrom(sub.id)
|
closeMsg := closeenvelope.NewFrom(sub.id)
|
||||||
closeb := closeMsg.Marshal(nil)
|
closeb := closeMsg.Marshal(nil)
|
||||||
|
log.T.F("WS.Subscription.Close: outbound CLOSE to %s: %s", sub.Client.URL, string(closeb))
|
||||||
<-sub.Client.Write(closeb)
|
<-sub.Client.Write(closeb)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -191,6 +192,7 @@ func (sub *Subscription) Fire() (err error) {
|
|||||||
"WS.Subscription.Fire: sending REQ id=%s filters=%d bytes=%d",
|
"WS.Subscription.Fire: sending REQ id=%s filters=%d bytes=%d",
|
||||||
sub.GetID(), len(*sub.Filters), len(reqb),
|
sub.GetID(), len(*sub.Filters), len(reqb),
|
||||||
)
|
)
|
||||||
|
log.T.F("WS.Subscription.Fire: outbound REQ to %s: %s", sub.Client.URL, string(reqb))
|
||||||
if err = <-sub.Client.Write(reqb); err != nil {
|
if err = <-sub.Client.Write(reqb); err != nil {
|
||||||
err = fmt.Errorf("failed to write: %w", err)
|
err = fmt.Errorf("failed to write: %w", err)
|
||||||
log.T.F(
|
log.T.F(
|
||||||
|
|||||||
@@ -23,8 +23,8 @@ import (
|
|||||||
const (
|
const (
|
||||||
OneTimeSpiderSyncMarker = "spider_one_time_sync_completed"
|
OneTimeSpiderSyncMarker = "spider_one_time_sync_completed"
|
||||||
SpiderLastScanMarker = "spider_last_scan_time"
|
SpiderLastScanMarker = "spider_last_scan_time"
|
||||||
// MaxWebSocketMessageSize is the maximum size for WebSocket messages to avoid 32KB limit
|
// MaxWebSocketMessageSize is the maximum size for WebSocket messages
|
||||||
MaxWebSocketMessageSize = 30 * 1024 // 30KB to be safe
|
MaxWebSocketMessageSize = 100 * 1024 * 1024 // 100MB
|
||||||
// PubkeyHexSize is the size of a hex-encoded pubkey (32 bytes = 64 hex chars)
|
// PubkeyHexSize is the size of a hex-encoded pubkey (32 bytes = 64 hex chars)
|
||||||
PubkeyHexSize = 64
|
PubkeyHexSize = 64
|
||||||
)
|
)
|
||||||
@@ -34,6 +34,8 @@ type Spider struct {
|
|||||||
cfg *config.C
|
cfg *config.C
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
|
// Configured relay addresses for self-detection
|
||||||
|
relayAddresses []string
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(
|
func New(
|
||||||
@@ -41,10 +43,11 @@ func New(
|
|||||||
cancel context.CancelFunc,
|
cancel context.CancelFunc,
|
||||||
) *Spider {
|
) *Spider {
|
||||||
return &Spider{
|
return &Spider{
|
||||||
db: db,
|
db: db,
|
||||||
cfg: cfg,
|
cfg: cfg,
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
cancel: cancel,
|
cancel: cancel,
|
||||||
|
relayAddresses: cfg.RelayAddresses,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -187,6 +190,7 @@ func (s *Spider) performSync(startTime, endTime time.Time) error {
|
|||||||
// 4. Query each relay for events from followed pubkeys in the time range
|
// 4. Query each relay for events from followed pubkeys in the time range
|
||||||
eventsFound := 0
|
eventsFound := 0
|
||||||
for _, relayURL := range relayURLs {
|
for _, relayURL := range relayURLs {
|
||||||
|
log.I.F("Spider sync: fetching follow lists from relay %s", relayURL)
|
||||||
count, err := s.queryRelayForEvents(
|
count, err := s.queryRelayForEvents(
|
||||||
relayURL, followedPubkeys, startTime, endTime,
|
relayURL, followedPubkeys, startTime, endTime,
|
||||||
)
|
)
|
||||||
@@ -194,6 +198,7 @@ func (s *Spider) performSync(startTime, endTime time.Time) error {
|
|||||||
log.E.F("Spider sync: error querying relay %s: %v", relayURL, err)
|
log.E.F("Spider sync: error querying relay %s: %v", relayURL, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
log.I.F("Spider sync: completed fetching from relay %s, found %d events", relayURL, count)
|
||||||
eventsFound += count
|
eventsFound += count
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -263,6 +268,18 @@ func (s *Spider) discoverRelays(followedPubkeys [][]byte) ([]string, error) {
|
|||||||
if n == "" {
|
if n == "" {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
// Skip if this relay is one of the configured relay addresses
|
||||||
|
skipRelay := false
|
||||||
|
for _, relayAddr := range s.relayAddresses {
|
||||||
|
if n == relayAddr {
|
||||||
|
log.D.F("spider: skipping configured relay address: %s", n)
|
||||||
|
skipRelay = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if skipRelay {
|
||||||
|
continue
|
||||||
|
}
|
||||||
if _, ok := seen[n]; ok {
|
if _, ok := seen[n]; ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@@ -340,7 +357,7 @@ func (s *Spider) queryRelayForEvents(
|
|||||||
Authors: tag.NewFromBytesSlice(chunk...),
|
Authors: tag.NewFromBytesSlice(chunk...),
|
||||||
Since: timestamp.FromUnix(startTime.Unix()),
|
Since: timestamp.FromUnix(startTime.Unix()),
|
||||||
Until: timestamp.FromUnix(endTime.Unix()),
|
Until: timestamp.FromUnix(endTime.Unix()),
|
||||||
Limit: func() *uint { l := uint(1000); return &l }(), // Limit to avoid overwhelming
|
Limit: func() *uint { l := uint(500); return &l }(), // Limit to avoid overwhelming
|
||||||
}
|
}
|
||||||
|
|
||||||
// Subscribe to get events for this chunk
|
// Subscribe to get events for this chunk
|
||||||
|
|||||||
@@ -1 +1 @@
|
|||||||
v0.17.4
|
v0.17.6
|
||||||
147
scripts/run-relay-pprof.sh
Executable file
147
scripts/run-relay-pprof.sh
Executable file
@@ -0,0 +1,147 @@
|
|||||||
|
#!/usr/bin/env bash
|
||||||
|
set -euo pipefail
|
||||||
|
|
||||||
|
# Run the relay with CPU profiling enabled, wait 60s, then open the
|
||||||
|
# generated profile using `go tool pprof` web UI.
|
||||||
|
#
|
||||||
|
# Notes:
|
||||||
|
# - Builds a temporary relay binary in /tmp and deletes it on exit.
|
||||||
|
# - Uses the exact env requested, plus ORLY_PPROF=cpu and a deterministic
|
||||||
|
# ORLY_PPROF_PATH inside a temp dir.
|
||||||
|
# - Profiles for DURATION seconds (default 60).
|
||||||
|
# - Launches pprof web UI at http://localhost:8000 and attempts to open browser.
|
||||||
|
|
||||||
|
DURATION="${DURATION:-60}"
|
||||||
|
HEALTH_PORT="${HEALTH_PORT:-18081}"
|
||||||
|
ROOT_DIR="/home/mleku/src/next.orly.dev"
|
||||||
|
LISTEN_HOST="${LISTEN_HOST:-10.0.0.2}"
|
||||||
|
|
||||||
|
cd "$ROOT_DIR"
|
||||||
|
|
||||||
|
# Refresh embedded web assets
|
||||||
|
reset || true
|
||||||
|
./scripts/update-embedded-web.sh || true
|
||||||
|
|
||||||
|
TMP_DIR="$(mktemp -d -t orly-pprof-XXXXXX)"
|
||||||
|
BIN_PATH="$TMP_DIR/next.orly.dev"
|
||||||
|
LOG_FILE="$TMP_DIR/relay.log"
|
||||||
|
PPROF_FILE=""
|
||||||
|
RELAY_PID=""
|
||||||
|
PPROF_DIR="$TMP_DIR/profiles"
|
||||||
|
mkdir -p "$PPROF_DIR"
|
||||||
|
|
||||||
|
cleanup() {
|
||||||
|
# Try to stop relay if still running
|
||||||
|
if [[ -n "${RELAY_PID}" ]] && kill -0 "${RELAY_PID}" 2>/dev/null; then
|
||||||
|
kill "${RELAY_PID}" || true
|
||||||
|
wait "${RELAY_PID}" || true
|
||||||
|
fi
|
||||||
|
rm -f "$BIN_PATH" 2>/dev/null || true
|
||||||
|
rm -rf "$TMP_DIR" 2>/dev/null || true
|
||||||
|
}
|
||||||
|
trap cleanup EXIT
|
||||||
|
|
||||||
|
echo "[run-relay-pprof] Building relay binary ..."
|
||||||
|
GOFLAGS="${GOFLAGS:-}" go build -o "$BIN_PATH" .
|
||||||
|
|
||||||
|
echo "[run-relay-pprof] Starting relay with CPU profiling ..."
|
||||||
|
(
|
||||||
|
ORLY_LOG_LEVEL=debug \
|
||||||
|
ORLY_LISTEN="$LISTEN_HOST" \
|
||||||
|
ORLY_PORT=3334 \
|
||||||
|
ORLY_ADMINS=npub1fjqqy4a93z5zsjwsfxqhc2764kvykfdyttvldkkkdera8dr78vhsmmleku \
|
||||||
|
ORLY_ACL_MODE=follows \
|
||||||
|
ORLY_SPIDER_MODE=none \
|
||||||
|
ORLY_RELAY_ADDRESSES=test.orly.dev \
|
||||||
|
ORLY_IP_BLACKLIST=192.71.213.188 \
|
||||||
|
ORLY_HEALTH_PORT="$HEALTH_PORT" \
|
||||||
|
ORLY_ENABLE_SHUTDOWN=true \
|
||||||
|
ORLY_PPROF_HTTP=true \
|
||||||
|
ORLY_OPEN_PPROF_WEB=true \
|
||||||
|
"$BIN_PATH"
|
||||||
|
) >"$LOG_FILE" 2>&1 &
|
||||||
|
RELAY_PID=$!
|
||||||
|
|
||||||
|
# Wait for pprof HTTP server readiness
|
||||||
|
PPROF_BASE="http://${LISTEN_HOST}:6060"
|
||||||
|
echo "[run-relay-pprof] Waiting for pprof at ${PPROF_BASE} ..."
|
||||||
|
for i in {1..100}; do
|
||||||
|
if curl -fsS "${PPROF_BASE}/debug/pprof/" -o /dev/null 2>/dev/null; then
|
||||||
|
READY=1
|
||||||
|
break
|
||||||
|
fi
|
||||||
|
sleep 0.2
|
||||||
|
done
|
||||||
|
if [[ -z "${READY:-}" ]]; then
|
||||||
|
echo "[run-relay-pprof] ERROR: pprof HTTP server not reachable at ${PPROF_BASE}." >&2
|
||||||
|
echo "[run-relay-pprof] Check that ${LISTEN_HOST} is a local bindable address." >&2
|
||||||
|
# Attempt to dump recent logs for context
|
||||||
|
tail -n 100 "$LOG_FILE" || true
|
||||||
|
# Try INT to clean up
|
||||||
|
killall -INT next.orly.dev 2>/dev/null || true
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
# Open the HTTP pprof UI
|
||||||
|
( xdg-open "${PPROF_BASE}/debug/pprof/" >/dev/null 2>&1 || true ) &
|
||||||
|
|
||||||
|
echo "[run-relay-pprof] Collecting CPU profile via HTTP for ${DURATION}s ..."
|
||||||
|
# The HTTP /debug/pprof/profile endpoint records CPU for the provided seconds
|
||||||
|
# and returns a pprof file without needing to stop the process.
|
||||||
|
curl -fsS --max-time $((DURATION+10)) \
|
||||||
|
"${PPROF_BASE}/debug/pprof/profile?seconds=${DURATION}" \
|
||||||
|
-o "$PPROF_DIR/cpu.pprof" || true
|
||||||
|
|
||||||
|
echo "[run-relay-pprof] Sending SIGINT (Ctrl+C) for graceful shutdown ..."
|
||||||
|
killall -INT next.orly.dev 2>/dev/null || true
|
||||||
|
|
||||||
|
# Wait up to ~60s for graceful shutdown so defers (pprof Stop) can run
|
||||||
|
for i in {1..300}; do
|
||||||
|
if ! pgrep -x next.orly.dev >/dev/null 2>&1; then
|
||||||
|
break
|
||||||
|
fi
|
||||||
|
sleep 0.2
|
||||||
|
done
|
||||||
|
|
||||||
|
# Try HTTP shutdown if still running (ensures defer paths can run)
|
||||||
|
if pgrep -x next.orly.dev >/dev/null 2>&1; then
|
||||||
|
echo "[run-relay-pprof] Still running, requesting /shutdown ..."
|
||||||
|
curl -fsS --max-time 2 "http://10.0.0.2:${HEALTH_PORT}/shutdown" >/dev/null 2>&1 || true
|
||||||
|
for i in {1..150}; do
|
||||||
|
if ! pgrep -x next.orly.dev >/dev/null 2>&1; then
|
||||||
|
break
|
||||||
|
fi
|
||||||
|
sleep 0.2
|
||||||
|
done
|
||||||
|
fi
|
||||||
|
if pgrep -x next.orly.dev >/dev/null 2>&1; then
|
||||||
|
echo "[run-relay-pprof] Escalating: sending SIGTERM ..."
|
||||||
|
killall -TERM next.orly.dev 2>/dev/null || true
|
||||||
|
for i in {1..150}; do
|
||||||
|
if ! pgrep -x next.orly.dev >/dev/null 2>&1; then
|
||||||
|
break
|
||||||
|
fi
|
||||||
|
sleep 0.2
|
||||||
|
done
|
||||||
|
fi
|
||||||
|
if pgrep -x next.orly.dev >/dev/null 2>&1; then
|
||||||
|
echo "[run-relay-pprof] Force kill: sending SIGKILL ..."
|
||||||
|
killall -KILL next.orly.dev 2>/dev/null || true
|
||||||
|
fi
|
||||||
|
|
||||||
|
PPROF_FILE="$PPROF_DIR/cpu.pprof"
|
||||||
|
if [[ ! -s "$PPROF_FILE" ]]; then
|
||||||
|
echo "[run-relay-pprof] ERROR: HTTP CPU profile not captured (file empty)." >&2
|
||||||
|
echo "[run-relay-pprof] Hint: Ensure ORLY_PPROF_HTTP=true and port 6060 is reachable." >&2
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
echo "[run-relay-pprof] Detected profile file: $PPROF_FILE"
|
||||||
|
echo "[run-relay-pprof] Launching 'go tool pprof' web UI on :8000 ..."
|
||||||
|
|
||||||
|
# Try to open a browser automatically, ignore failures
|
||||||
|
( sleep 0.6; xdg-open "http://localhost:8000" >/dev/null 2>&1 || true ) &
|
||||||
|
|
||||||
|
exec go tool pprof -http=:8000 "$BIN_PATH" "$PPROF_FILE"
|
||||||
|
|
||||||
|
|
||||||
Reference in New Issue
Block a user