Add BBolt database backend for HDD-optimized archival relays (v0.48.0)
- BBolt B+tree backend with sequential access patterns for spinning disks - Write batching (5000 events / 128MB / 30s flush) to reduce disk thrashing - Adjacency list storage for graph data (one key per vertex, not per edge) - Bloom filter for fast negative edge existence checks (~12MB for 10M edges) - No query cache (saves RAM, B+tree reads are fast enough on HDD) - Migration tool: orly migrate --from badger --to bbolt - Configuration: ORLY_BBOLT_* environment variables Files modified: - app/config/config.go: Added BBolt configuration options - main.go: Added migrate subcommand and BBolt config wiring - pkg/database/factory.go: Added BBolt factory registration - pkg/bbolt/*: New BBolt database backend implementation 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
330
pkg/bbolt/batcher.go
Normal file
330
pkg/bbolt/batcher.go
Normal file
@@ -0,0 +1,330 @@
|
||||
//go:build !(js && wasm)
|
||||
|
||||
package bbolt
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
bolt "go.etcd.io/bbolt"
|
||||
"lol.mleku.dev/chk"
|
||||
)
|
||||
|
||||
// BatchedWrite represents a single write operation
|
||||
type BatchedWrite struct {
|
||||
BucketName []byte
|
||||
Key []byte
|
||||
Value []byte
|
||||
IsDelete bool
|
||||
}
|
||||
|
||||
// EventBatch represents a complete event with all its indexes and graph updates
|
||||
type EventBatch struct {
|
||||
Serial uint64
|
||||
EventData []byte // Serialized compact event data
|
||||
Indexes []BatchedWrite // Index entries
|
||||
EventVertex *EventVertex // Graph vertex for this event
|
||||
PubkeyUpdate *PubkeyVertexUpdate // Update to author's pubkey vertex
|
||||
MentionUpdates []*PubkeyVertexUpdate // Updates to mentioned pubkeys
|
||||
EdgeKeys []EdgeKey // Edge keys for bloom filter
|
||||
}
|
||||
|
||||
// PubkeyVertexUpdate represents an update to a pubkey's vertex
|
||||
type PubkeyVertexUpdate struct {
|
||||
PubkeySerial uint64
|
||||
AddAuthored uint64 // Event serial to add to authored (0 if none)
|
||||
AddMention uint64 // Event serial to add to mentions (0 if none)
|
||||
}
|
||||
|
||||
// WriteBatcher accumulates writes and flushes them in batches.
|
||||
// Optimized for HDD with large batches and periodic flushes.
|
||||
type WriteBatcher struct {
|
||||
db *bolt.DB
|
||||
bloom *EdgeBloomFilter
|
||||
logger *Logger
|
||||
|
||||
mu sync.Mutex
|
||||
pending []*EventBatch
|
||||
pendingSize int64
|
||||
stopped bool
|
||||
|
||||
// Configuration
|
||||
maxEvents int
|
||||
maxBytes int64
|
||||
flushPeriod time.Duration
|
||||
|
||||
// Channels for coordination
|
||||
flushCh chan struct{}
|
||||
shutdownCh chan struct{}
|
||||
doneCh chan struct{}
|
||||
|
||||
// Stats
|
||||
stats BatcherStats
|
||||
}
|
||||
|
||||
// BatcherStats contains batcher statistics
|
||||
type BatcherStats struct {
|
||||
TotalBatches uint64
|
||||
TotalEvents uint64
|
||||
TotalBytes uint64
|
||||
AverageLatencyMs float64
|
||||
LastFlushTime time.Time
|
||||
LastFlushDuration time.Duration
|
||||
}
|
||||
|
||||
// NewWriteBatcher creates a new write batcher
|
||||
func NewWriteBatcher(db *bolt.DB, bloom *EdgeBloomFilter, cfg *BboltConfig, logger *Logger) *WriteBatcher {
|
||||
wb := &WriteBatcher{
|
||||
db: db,
|
||||
bloom: bloom,
|
||||
logger: logger,
|
||||
maxEvents: cfg.BatchMaxEvents,
|
||||
maxBytes: cfg.BatchMaxBytes,
|
||||
flushPeriod: cfg.BatchFlushTimeout,
|
||||
pending: make([]*EventBatch, 0, cfg.BatchMaxEvents),
|
||||
flushCh: make(chan struct{}, 1),
|
||||
shutdownCh: make(chan struct{}),
|
||||
doneCh: make(chan struct{}),
|
||||
}
|
||||
|
||||
go wb.flushLoop()
|
||||
return wb
|
||||
}
|
||||
|
||||
// Add adds an event batch to the pending writes
|
||||
func (wb *WriteBatcher) Add(batch *EventBatch) error {
|
||||
wb.mu.Lock()
|
||||
defer wb.mu.Unlock()
|
||||
|
||||
if wb.stopped {
|
||||
return ErrBatcherStopped
|
||||
}
|
||||
|
||||
wb.pending = append(wb.pending, batch)
|
||||
wb.pendingSize += int64(len(batch.EventData))
|
||||
for _, idx := range batch.Indexes {
|
||||
wb.pendingSize += int64(len(idx.Key) + len(idx.Value))
|
||||
}
|
||||
|
||||
// Check thresholds
|
||||
if len(wb.pending) >= wb.maxEvents || wb.pendingSize >= wb.maxBytes {
|
||||
wb.triggerFlush()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// triggerFlush signals the flush loop to flush (must be called with lock held)
|
||||
func (wb *WriteBatcher) triggerFlush() {
|
||||
select {
|
||||
case wb.flushCh <- struct{}{}:
|
||||
default:
|
||||
// Already a flush pending
|
||||
}
|
||||
}
|
||||
|
||||
// flushLoop runs the background flush timer
|
||||
func (wb *WriteBatcher) flushLoop() {
|
||||
defer close(wb.doneCh)
|
||||
|
||||
timer := time.NewTimer(wb.flushPeriod)
|
||||
defer timer.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-timer.C:
|
||||
if err := wb.Flush(); err != nil {
|
||||
wb.logger.Errorf("bbolt: flush error: %v", err)
|
||||
}
|
||||
timer.Reset(wb.flushPeriod)
|
||||
|
||||
case <-wb.flushCh:
|
||||
if err := wb.Flush(); err != nil {
|
||||
wb.logger.Errorf("bbolt: flush error: %v", err)
|
||||
}
|
||||
if !timer.Stop() {
|
||||
select {
|
||||
case <-timer.C:
|
||||
default:
|
||||
}
|
||||
}
|
||||
timer.Reset(wb.flushPeriod)
|
||||
|
||||
case <-wb.shutdownCh:
|
||||
// Final flush
|
||||
if err := wb.Flush(); err != nil {
|
||||
wb.logger.Errorf("bbolt: final flush error: %v", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Flush writes all pending batches to BBolt
|
||||
func (wb *WriteBatcher) Flush() error {
|
||||
wb.mu.Lock()
|
||||
if len(wb.pending) == 0 {
|
||||
wb.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Swap out pending slice
|
||||
toFlush := wb.pending
|
||||
toFlushSize := wb.pendingSize
|
||||
wb.pending = make([]*EventBatch, 0, wb.maxEvents)
|
||||
wb.pendingSize = 0
|
||||
wb.mu.Unlock()
|
||||
|
||||
startTime := time.Now()
|
||||
|
||||
// Collect all edge keys for bloom filter update
|
||||
var allEdgeKeys []EdgeKey
|
||||
for _, batch := range toFlush {
|
||||
allEdgeKeys = append(allEdgeKeys, batch.EdgeKeys...)
|
||||
}
|
||||
|
||||
// Update bloom filter first (memory only)
|
||||
if len(allEdgeKeys) > 0 {
|
||||
wb.bloom.AddBatch(allEdgeKeys)
|
||||
}
|
||||
|
||||
// Write all batches in a single transaction
|
||||
err := wb.db.Update(func(tx *bolt.Tx) error {
|
||||
for _, batch := range toFlush {
|
||||
// Write compact event data
|
||||
cmpBucket := tx.Bucket(bucketCmp)
|
||||
if cmpBucket != nil {
|
||||
key := makeSerialKey(batch.Serial)
|
||||
if err := cmpBucket.Put(key, batch.EventData); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Write all indexes
|
||||
for _, idx := range batch.Indexes {
|
||||
bucket := tx.Bucket(idx.BucketName)
|
||||
if bucket == nil {
|
||||
continue
|
||||
}
|
||||
if idx.IsDelete {
|
||||
if err := bucket.Delete(idx.Key); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
if err := bucket.Put(idx.Key, idx.Value); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Write event vertex
|
||||
if batch.EventVertex != nil {
|
||||
evBucket := tx.Bucket(bucketEv)
|
||||
if evBucket != nil {
|
||||
key := makeSerialKey(batch.Serial)
|
||||
if err := evBucket.Put(key, batch.EventVertex.Encode()); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Update pubkey vertices
|
||||
if err := wb.updatePubkeyVertex(tx, batch.PubkeyUpdate); err != nil {
|
||||
return err
|
||||
}
|
||||
for _, mentionUpdate := range batch.MentionUpdates {
|
||||
if err := wb.updatePubkeyVertex(tx, mentionUpdate); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
// Update stats
|
||||
duration := time.Since(startTime)
|
||||
wb.mu.Lock()
|
||||
wb.stats.TotalBatches++
|
||||
wb.stats.TotalEvents += uint64(len(toFlush))
|
||||
wb.stats.TotalBytes += uint64(toFlushSize)
|
||||
wb.stats.LastFlushTime = time.Now()
|
||||
wb.stats.LastFlushDuration = duration
|
||||
latencyMs := float64(duration.Milliseconds())
|
||||
wb.stats.AverageLatencyMs = (wb.stats.AverageLatencyMs*float64(wb.stats.TotalBatches-1) + latencyMs) / float64(wb.stats.TotalBatches)
|
||||
wb.mu.Unlock()
|
||||
|
||||
if err == nil {
|
||||
wb.logger.Debugf("bbolt: flushed %d events (%d bytes) in %v", len(toFlush), toFlushSize, duration)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// updatePubkeyVertex reads, updates, and writes a pubkey vertex
|
||||
func (wb *WriteBatcher) updatePubkeyVertex(tx *bolt.Tx, update *PubkeyVertexUpdate) error {
|
||||
if update == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
pvBucket := tx.Bucket(bucketPv)
|
||||
if pvBucket == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
key := makeSerialKey(update.PubkeySerial)
|
||||
|
||||
// Load existing vertex or create new
|
||||
var pv PubkeyVertex
|
||||
existing := pvBucket.Get(key)
|
||||
if existing != nil {
|
||||
if err := pv.Decode(existing); chk.E(err) {
|
||||
// If decode fails, start fresh
|
||||
pv = PubkeyVertex{}
|
||||
}
|
||||
}
|
||||
|
||||
// Apply updates
|
||||
if update.AddAuthored != 0 {
|
||||
pv.AddAuthored(update.AddAuthored)
|
||||
}
|
||||
if update.AddMention != 0 {
|
||||
pv.AddMention(update.AddMention)
|
||||
}
|
||||
|
||||
// Write back
|
||||
return pvBucket.Put(key, pv.Encode())
|
||||
}
|
||||
|
||||
// Shutdown gracefully shuts down the batcher
|
||||
func (wb *WriteBatcher) Shutdown() error {
|
||||
wb.mu.Lock()
|
||||
wb.stopped = true
|
||||
wb.mu.Unlock()
|
||||
|
||||
close(wb.shutdownCh)
|
||||
<-wb.doneCh
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stats returns current batcher statistics
|
||||
func (wb *WriteBatcher) Stats() BatcherStats {
|
||||
wb.mu.Lock()
|
||||
defer wb.mu.Unlock()
|
||||
return wb.stats
|
||||
}
|
||||
|
||||
// PendingCount returns the number of pending events
|
||||
func (wb *WriteBatcher) PendingCount() int {
|
||||
wb.mu.Lock()
|
||||
defer wb.mu.Unlock()
|
||||
return len(wb.pending)
|
||||
}
|
||||
|
||||
// ErrBatcherStopped is returned when adding to a stopped batcher
|
||||
var ErrBatcherStopped = &batcherStoppedError{}
|
||||
|
||||
type batcherStoppedError struct{}
|
||||
|
||||
func (e *batcherStoppedError) Error() string {
|
||||
return "batcher has been stopped"
|
||||
}
|
||||
Reference in New Issue
Block a user