Enhance graceful shutdown and logging for HTTP server

- Implemented graceful shutdown for the HTTP server, allowing for a controlled shutdown process with a timeout.
- Added logging for shutdown signals and server exit to improve traceability during application termination.
- Introduced IP blacklist configuration to enhance security by blocking specified IP addresses.
- Updated database cache configurations to allow dynamic adjustment via environment variables for better performance tuning.
This commit is contained in:
2025-10-21 17:20:48 +01:00
parent 15e2988222
commit 9aa1e7fab3
9 changed files with 152 additions and 17 deletions

View File

@@ -178,6 +178,33 @@ func (f *Follows) adminRelays() (urls []string) {
copy(admins, f.admins)
f.followsMx.RUnlock()
seen := make(map[string]struct{})
// Build a set of normalized self relay addresses to avoid self-connections
selfSet := make(map[string]struct{})
selfHosts := make(map[string]struct{})
if f.cfg != nil && len(f.cfg.RelayAddresses) > 0 {
for _, a := range f.cfg.RelayAddresses {
n := string(normalize.URL(a))
if n == "" {
continue
}
selfSet[n] = struct{}{}
// Also record hostname (without port) for robust matching
// Accept simple splitting; normalize.URL ensures scheme://host[:port]
host := n
if i := strings.Index(host, "://"); i >= 0 {
host = host[i+3:]
}
if j := strings.Index(host, "/"); j >= 0 {
host = host[:j]
}
if k := strings.Index(host, ":"); k >= 0 {
host = host[:k]
}
if host != "" {
selfHosts[host] = struct{}{}
}
}
}
// First, try to get relay URLs from admin kind 10002 events
for _, adm := range admins {
@@ -208,6 +235,26 @@ func (f *Follows) adminRelays() (urls []string) {
if n == "" {
continue
}
// Skip if this URL is one of our configured self relay addresses or hosts
if _, isSelf := selfSet[n]; isSelf {
log.D.F("follows syncer: skipping configured self relay address: %s", n)
continue
}
// Host match
host := n
if i := strings.Index(host, "://"); i >= 0 {
host = host[i+3:]
}
if j := strings.Index(host, "/"); j >= 0 {
host = host[:j]
}
if k := strings.Index(host, ":"); k >= 0 {
host = host[:k]
}
if _, isSelfHost := selfHosts[host]; isSelfHost {
log.D.F("follows syncer: skipping configured self relay address: %s", n)
continue
}
if _, ok := seen[n]; ok {
continue
}
@@ -228,6 +275,26 @@ func (f *Follows) adminRelays() (urls []string) {
log.W.F("invalid bootstrap relay URL: %s", relay)
continue
}
// Skip if this URL is one of our configured self relay addresses or hosts
if _, isSelf := selfSet[n]; isSelf {
log.D.F("follows syncer: skipping configured self relay address: %s", n)
continue
}
// Host match
host := n
if i := strings.Index(host, "://"); i >= 0 {
host = host[i+3:]
}
if j := strings.Index(host, "/"); j >= 0 {
host = host[:j]
}
if k := strings.Index(host, ":"); k >= 0 {
host = host[:k]
}
if _, isSelfHost := selfHosts[host]; isSelfHost {
log.D.F("follows syncer: skipping configured self relay address: %s", n)
continue
}
if _, ok := seen[n]; ok {
continue
}
@@ -359,14 +426,16 @@ func (f *Follows) startEventSubscriptions(ctx context.Context) {
f3 := &filter.F{
Authors: tag.NewFromBytesSlice(authors...),
Since: oneMonthAgo,
Limit: values.ToUintPointer(1000),
Limit: values.ToUintPointer(500),
}
*ff = append(*ff, f1, f2, f3)
// Use a subscription ID for event sync (no follow lists)
subID := "event-sync"
req := reqenvelope.NewFrom([]byte(subID), ff)
reqBytes := req.Marshal(nil)
log.T.F("follows syncer: outbound REQ to %s: %s", u, string(reqBytes))
if err = c.Write(
ctx, websocket.MessageText, req.Marshal(nil),
ctx, websocket.MessageText, reqBytes,
); chk.E(err) {
log.W.F(
"follows syncer: failed to send event REQ to %s: %v", u, err,
@@ -623,7 +692,9 @@ func (f *Follows) fetchFollowListsFromRelay(relayURL string, authors [][]byte) {
// Use a specific subscription ID for follow list fetching
subID := "follow-lists-fetch"
req := reqenvelope.NewFrom([]byte(subID), ff)
if err = c.Write(ctx, websocket.MessageText, req.Marshal(nil)); chk.E(err) {
reqBytes := req.Marshal(nil)
log.T.F("follows syncer: outbound REQ to %s: %s", relayURL, string(reqBytes))
if err = c.Write(ctx, websocket.MessageText, reqBytes); chk.E(err) {
log.W.F("follows syncer: failed to send follow list REQ to %s: %v", relayURL, err)
return
}

View File

@@ -5,6 +5,7 @@ import (
"errors"
"os"
"path/filepath"
"strconv"
"time"
"github.com/dgraph-io/badger/v4"
@@ -52,18 +53,29 @@ func New(
}
opts := badger.DefaultOptions(d.dataDir)
// Use sane defaults to avoid excessive memory usage during startup.
// Badger's default BlockSize is small (e.g., 4KB). Overriding it to very large values
// can cause massive allocations and OOM panics during deployments.
// Set BlockCacheSize to a moderate value and keep BlockSize small.
opts.BlockCacheSize = int64(256 * units.Mb) // 256 MB cache
opts.BlockSize = 4 * units.Kb // 4 KB block size
// Configure caches based on environment to better match workload.
// Defaults aim for higher hit ratios under read-heavy workloads while remaining safe.
var blockCacheMB = 512 // default 512 MB
var indexCacheMB = 256 // default 256 MB
if v := os.Getenv("ORLY_DB_BLOCK_CACHE_MB"); v != "" {
if n, perr := strconv.Atoi(v); perr == nil && n > 0 {
blockCacheMB = n
}
}
if v := os.Getenv("ORLY_DB_INDEX_CACHE_MB"); v != "" {
if n, perr := strconv.Atoi(v); perr == nil && n > 0 {
indexCacheMB = n
}
}
opts.BlockCacheSize = int64(blockCacheMB * units.Mb)
opts.IndexCacheSize = int64(indexCacheMB * units.Mb)
opts.BlockSize = 4 * units.Kb // 4 KB block size
// Prevent huge allocations during table building and memtable flush.
// Badger's TableBuilder buffer is sized by BaseTableSize; ensure it's small.
opts.BaseTableSize = 64 * units.Mb // 64 MB per table (default ~2MB, increased for fewer files but safe)
opts.MemTableSize = 64 * units.Mb // 64 MB memtable to match table size
opts.BaseTableSize = 64 * units.Mb // 64 MB per table (default ~2MB, increased for fewer files but safe)
opts.MemTableSize = 64 * units.Mb // 64 MB memtable to match table size
// Keep value log files to a moderate size as well
opts.ValueLogFileSize = 256 * units.Mb // 256 MB value log files
opts.ValueLogFileSize = 256 * units.Mb // 256 MB value log files
opts.CompactL0OnClose = true
opts.LmaxCompaction = true
opts.Compression = options.None

View File

@@ -263,6 +263,7 @@ func (r *Client) ConnectWithTLS(
case wr := <-r.writeQueue:
// all write requests will go through this to prevent races
// log.D.F("{%s} sending %v\n", r.URL, string(wr.msg))
log.T.F("WS.Client: outbound message to %s: %s", r.URL, string(wr.msg))
if err = r.Connection.WriteMessage(
r.connectionContext, wr.msg,
); err != nil {

View File

@@ -171,6 +171,7 @@ func (sub *Subscription) Close() {
if sub.Client.IsConnected() {
closeMsg := closeenvelope.NewFrom(sub.id)
closeb := closeMsg.Marshal(nil)
log.T.F("WS.Subscription.Close: outbound CLOSE to %s: %s", sub.Client.URL, string(closeb))
<-sub.Client.Write(closeb)
}
}
@@ -191,6 +192,7 @@ func (sub *Subscription) Fire() (err error) {
"WS.Subscription.Fire: sending REQ id=%s filters=%d bytes=%d",
sub.GetID(), len(*sub.Filters), len(reqb),
)
log.T.F("WS.Subscription.Fire: outbound REQ to %s: %s", sub.Client.URL, string(reqb))
if err = <-sub.Client.Write(reqb); err != nil {
err = fmt.Errorf("failed to write: %w", err)
log.T.F(

View File

@@ -357,7 +357,7 @@ func (s *Spider) queryRelayForEvents(
Authors: tag.NewFromBytesSlice(chunk...),
Since: timestamp.FromUnix(startTime.Unix()),
Until: timestamp.FromUnix(endTime.Unix()),
Limit: func() *uint { l := uint(1000); return &l }(), // Limit to avoid overwhelming
Limit: func() *uint { l := uint(500); return &l }(), // Limit to avoid overwhelming
}
// Subscribe to get events for this chunk