- BBolt B+tree backend with sequential access patterns for spinning disks - Write batching (5000 events / 128MB / 30s flush) to reduce disk thrashing - Adjacency list storage for graph data (one key per vertex, not per edge) - Bloom filter for fast negative edge existence checks (~12MB for 10M edges) - No query cache (saves RAM, B+tree reads are fast enough on HDD) - Migration tool: orly migrate --from badger --to bbolt - Configuration: ORLY_BBOLT_* environment variables Files modified: - app/config/config.go: Added BBolt configuration options - main.go: Added migrate subcommand and BBolt config wiring - pkg/database/factory.go: Added BBolt factory registration - pkg/bbolt/*: New BBolt database backend implementation 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
331 lines
7.6 KiB
Go
331 lines
7.6 KiB
Go
//go:build !(js && wasm)
|
|
|
|
package bbolt
|
|
|
|
import (
|
|
"sync"
|
|
"time"
|
|
|
|
bolt "go.etcd.io/bbolt"
|
|
"lol.mleku.dev/chk"
|
|
)
|
|
|
|
// BatchedWrite represents a single write operation
|
|
type BatchedWrite struct {
|
|
BucketName []byte
|
|
Key []byte
|
|
Value []byte
|
|
IsDelete bool
|
|
}
|
|
|
|
// EventBatch represents a complete event with all its indexes and graph updates
|
|
type EventBatch struct {
|
|
Serial uint64
|
|
EventData []byte // Serialized compact event data
|
|
Indexes []BatchedWrite // Index entries
|
|
EventVertex *EventVertex // Graph vertex for this event
|
|
PubkeyUpdate *PubkeyVertexUpdate // Update to author's pubkey vertex
|
|
MentionUpdates []*PubkeyVertexUpdate // Updates to mentioned pubkeys
|
|
EdgeKeys []EdgeKey // Edge keys for bloom filter
|
|
}
|
|
|
|
// PubkeyVertexUpdate represents an update to a pubkey's vertex
|
|
type PubkeyVertexUpdate struct {
|
|
PubkeySerial uint64
|
|
AddAuthored uint64 // Event serial to add to authored (0 if none)
|
|
AddMention uint64 // Event serial to add to mentions (0 if none)
|
|
}
|
|
|
|
// WriteBatcher accumulates writes and flushes them in batches.
|
|
// Optimized for HDD with large batches and periodic flushes.
|
|
type WriteBatcher struct {
|
|
db *bolt.DB
|
|
bloom *EdgeBloomFilter
|
|
logger *Logger
|
|
|
|
mu sync.Mutex
|
|
pending []*EventBatch
|
|
pendingSize int64
|
|
stopped bool
|
|
|
|
// Configuration
|
|
maxEvents int
|
|
maxBytes int64
|
|
flushPeriod time.Duration
|
|
|
|
// Channels for coordination
|
|
flushCh chan struct{}
|
|
shutdownCh chan struct{}
|
|
doneCh chan struct{}
|
|
|
|
// Stats
|
|
stats BatcherStats
|
|
}
|
|
|
|
// BatcherStats contains batcher statistics
|
|
type BatcherStats struct {
|
|
TotalBatches uint64
|
|
TotalEvents uint64
|
|
TotalBytes uint64
|
|
AverageLatencyMs float64
|
|
LastFlushTime time.Time
|
|
LastFlushDuration time.Duration
|
|
}
|
|
|
|
// NewWriteBatcher creates a new write batcher
|
|
func NewWriteBatcher(db *bolt.DB, bloom *EdgeBloomFilter, cfg *BboltConfig, logger *Logger) *WriteBatcher {
|
|
wb := &WriteBatcher{
|
|
db: db,
|
|
bloom: bloom,
|
|
logger: logger,
|
|
maxEvents: cfg.BatchMaxEvents,
|
|
maxBytes: cfg.BatchMaxBytes,
|
|
flushPeriod: cfg.BatchFlushTimeout,
|
|
pending: make([]*EventBatch, 0, cfg.BatchMaxEvents),
|
|
flushCh: make(chan struct{}, 1),
|
|
shutdownCh: make(chan struct{}),
|
|
doneCh: make(chan struct{}),
|
|
}
|
|
|
|
go wb.flushLoop()
|
|
return wb
|
|
}
|
|
|
|
// Add adds an event batch to the pending writes
|
|
func (wb *WriteBatcher) Add(batch *EventBatch) error {
|
|
wb.mu.Lock()
|
|
defer wb.mu.Unlock()
|
|
|
|
if wb.stopped {
|
|
return ErrBatcherStopped
|
|
}
|
|
|
|
wb.pending = append(wb.pending, batch)
|
|
wb.pendingSize += int64(len(batch.EventData))
|
|
for _, idx := range batch.Indexes {
|
|
wb.pendingSize += int64(len(idx.Key) + len(idx.Value))
|
|
}
|
|
|
|
// Check thresholds
|
|
if len(wb.pending) >= wb.maxEvents || wb.pendingSize >= wb.maxBytes {
|
|
wb.triggerFlush()
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// triggerFlush signals the flush loop to flush (must be called with lock held)
|
|
func (wb *WriteBatcher) triggerFlush() {
|
|
select {
|
|
case wb.flushCh <- struct{}{}:
|
|
default:
|
|
// Already a flush pending
|
|
}
|
|
}
|
|
|
|
// flushLoop runs the background flush timer
|
|
func (wb *WriteBatcher) flushLoop() {
|
|
defer close(wb.doneCh)
|
|
|
|
timer := time.NewTimer(wb.flushPeriod)
|
|
defer timer.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-timer.C:
|
|
if err := wb.Flush(); err != nil {
|
|
wb.logger.Errorf("bbolt: flush error: %v", err)
|
|
}
|
|
timer.Reset(wb.flushPeriod)
|
|
|
|
case <-wb.flushCh:
|
|
if err := wb.Flush(); err != nil {
|
|
wb.logger.Errorf("bbolt: flush error: %v", err)
|
|
}
|
|
if !timer.Stop() {
|
|
select {
|
|
case <-timer.C:
|
|
default:
|
|
}
|
|
}
|
|
timer.Reset(wb.flushPeriod)
|
|
|
|
case <-wb.shutdownCh:
|
|
// Final flush
|
|
if err := wb.Flush(); err != nil {
|
|
wb.logger.Errorf("bbolt: final flush error: %v", err)
|
|
}
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// Flush writes all pending batches to BBolt
|
|
func (wb *WriteBatcher) Flush() error {
|
|
wb.mu.Lock()
|
|
if len(wb.pending) == 0 {
|
|
wb.mu.Unlock()
|
|
return nil
|
|
}
|
|
|
|
// Swap out pending slice
|
|
toFlush := wb.pending
|
|
toFlushSize := wb.pendingSize
|
|
wb.pending = make([]*EventBatch, 0, wb.maxEvents)
|
|
wb.pendingSize = 0
|
|
wb.mu.Unlock()
|
|
|
|
startTime := time.Now()
|
|
|
|
// Collect all edge keys for bloom filter update
|
|
var allEdgeKeys []EdgeKey
|
|
for _, batch := range toFlush {
|
|
allEdgeKeys = append(allEdgeKeys, batch.EdgeKeys...)
|
|
}
|
|
|
|
// Update bloom filter first (memory only)
|
|
if len(allEdgeKeys) > 0 {
|
|
wb.bloom.AddBatch(allEdgeKeys)
|
|
}
|
|
|
|
// Write all batches in a single transaction
|
|
err := wb.db.Update(func(tx *bolt.Tx) error {
|
|
for _, batch := range toFlush {
|
|
// Write compact event data
|
|
cmpBucket := tx.Bucket(bucketCmp)
|
|
if cmpBucket != nil {
|
|
key := makeSerialKey(batch.Serial)
|
|
if err := cmpBucket.Put(key, batch.EventData); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Write all indexes
|
|
for _, idx := range batch.Indexes {
|
|
bucket := tx.Bucket(idx.BucketName)
|
|
if bucket == nil {
|
|
continue
|
|
}
|
|
if idx.IsDelete {
|
|
if err := bucket.Delete(idx.Key); err != nil {
|
|
return err
|
|
}
|
|
} else {
|
|
if err := bucket.Put(idx.Key, idx.Value); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
// Write event vertex
|
|
if batch.EventVertex != nil {
|
|
evBucket := tx.Bucket(bucketEv)
|
|
if evBucket != nil {
|
|
key := makeSerialKey(batch.Serial)
|
|
if err := evBucket.Put(key, batch.EventVertex.Encode()); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
// Update pubkey vertices
|
|
if err := wb.updatePubkeyVertex(tx, batch.PubkeyUpdate); err != nil {
|
|
return err
|
|
}
|
|
for _, mentionUpdate := range batch.MentionUpdates {
|
|
if err := wb.updatePubkeyVertex(tx, mentionUpdate); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
|
|
// Update stats
|
|
duration := time.Since(startTime)
|
|
wb.mu.Lock()
|
|
wb.stats.TotalBatches++
|
|
wb.stats.TotalEvents += uint64(len(toFlush))
|
|
wb.stats.TotalBytes += uint64(toFlushSize)
|
|
wb.stats.LastFlushTime = time.Now()
|
|
wb.stats.LastFlushDuration = duration
|
|
latencyMs := float64(duration.Milliseconds())
|
|
wb.stats.AverageLatencyMs = (wb.stats.AverageLatencyMs*float64(wb.stats.TotalBatches-1) + latencyMs) / float64(wb.stats.TotalBatches)
|
|
wb.mu.Unlock()
|
|
|
|
if err == nil {
|
|
wb.logger.Debugf("bbolt: flushed %d events (%d bytes) in %v", len(toFlush), toFlushSize, duration)
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
// updatePubkeyVertex reads, updates, and writes a pubkey vertex
|
|
func (wb *WriteBatcher) updatePubkeyVertex(tx *bolt.Tx, update *PubkeyVertexUpdate) error {
|
|
if update == nil {
|
|
return nil
|
|
}
|
|
|
|
pvBucket := tx.Bucket(bucketPv)
|
|
if pvBucket == nil {
|
|
return nil
|
|
}
|
|
|
|
key := makeSerialKey(update.PubkeySerial)
|
|
|
|
// Load existing vertex or create new
|
|
var pv PubkeyVertex
|
|
existing := pvBucket.Get(key)
|
|
if existing != nil {
|
|
if err := pv.Decode(existing); chk.E(err) {
|
|
// If decode fails, start fresh
|
|
pv = PubkeyVertex{}
|
|
}
|
|
}
|
|
|
|
// Apply updates
|
|
if update.AddAuthored != 0 {
|
|
pv.AddAuthored(update.AddAuthored)
|
|
}
|
|
if update.AddMention != 0 {
|
|
pv.AddMention(update.AddMention)
|
|
}
|
|
|
|
// Write back
|
|
return pvBucket.Put(key, pv.Encode())
|
|
}
|
|
|
|
// Shutdown gracefully shuts down the batcher
|
|
func (wb *WriteBatcher) Shutdown() error {
|
|
wb.mu.Lock()
|
|
wb.stopped = true
|
|
wb.mu.Unlock()
|
|
|
|
close(wb.shutdownCh)
|
|
<-wb.doneCh
|
|
return nil
|
|
}
|
|
|
|
// Stats returns current batcher statistics
|
|
func (wb *WriteBatcher) Stats() BatcherStats {
|
|
wb.mu.Lock()
|
|
defer wb.mu.Unlock()
|
|
return wb.stats
|
|
}
|
|
|
|
// PendingCount returns the number of pending events
|
|
func (wb *WriteBatcher) PendingCount() int {
|
|
wb.mu.Lock()
|
|
defer wb.mu.Unlock()
|
|
return len(wb.pending)
|
|
}
|
|
|
|
// ErrBatcherStopped is returned when adding to a stopped batcher
|
|
var ErrBatcherStopped = &batcherStoppedError{}
|
|
|
|
type batcherStoppedError struct{}
|
|
|
|
func (e *batcherStoppedError) Error() string {
|
|
return "batcher has been stopped"
|
|
}
|