add a filter query cache 512mb that stores already decoded recent query results
this should improve performance noticeably for typical kind 1 client queries
This commit is contained in:
@@ -12,19 +12,21 @@ import (
|
||||
"github.com/dgraph-io/badger/v4/options"
|
||||
"lol.mleku.dev"
|
||||
"lol.mleku.dev/chk"
|
||||
"next.orly.dev/pkg/database/querycache"
|
||||
"next.orly.dev/pkg/utils/apputil"
|
||||
"next.orly.dev/pkg/utils/units"
|
||||
)
|
||||
|
||||
// D implements the Database interface using Badger as the storage backend
|
||||
type D struct {
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
dataDir string
|
||||
Logger *logger
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
dataDir string
|
||||
Logger *logger
|
||||
*badger.DB
|
||||
seq *badger.Sequence
|
||||
ready chan struct{} // Closed when database is ready to serve requests
|
||||
seq *badger.Sequence
|
||||
ready chan struct{} // Closed when database is ready to serve requests
|
||||
queryCache *querycache.EventCache
|
||||
}
|
||||
|
||||
// Ensure D implements Database interface at compile time
|
||||
@@ -35,14 +37,29 @@ func New(
|
||||
) (
|
||||
d *D, err error,
|
||||
) {
|
||||
// Initialize query cache with configurable size (default 512MB)
|
||||
queryCacheSize := int64(512 * 1024 * 1024) // 512 MB
|
||||
if v := os.Getenv("ORLY_QUERY_CACHE_SIZE_MB"); v != "" {
|
||||
if n, perr := strconv.Atoi(v); perr == nil && n > 0 {
|
||||
queryCacheSize = int64(n * 1024 * 1024)
|
||||
}
|
||||
}
|
||||
queryCacheMaxAge := 5 * time.Minute // Default 5 minutes
|
||||
if v := os.Getenv("ORLY_QUERY_CACHE_MAX_AGE"); v != "" {
|
||||
if duration, perr := time.ParseDuration(v); perr == nil {
|
||||
queryCacheMaxAge = duration
|
||||
}
|
||||
}
|
||||
|
||||
d = &D{
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
dataDir: dataDir,
|
||||
Logger: NewLogger(lol.GetLogLevel(logLevel), dataDir),
|
||||
DB: nil,
|
||||
seq: nil,
|
||||
ready: make(chan struct{}),
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
dataDir: dataDir,
|
||||
Logger: NewLogger(lol.GetLogLevel(logLevel), dataDir),
|
||||
DB: nil,
|
||||
seq: nil,
|
||||
ready: make(chan struct{}),
|
||||
queryCache: querycache.NewEventCache(queryCacheSize, queryCacheMaxAge),
|
||||
}
|
||||
|
||||
// Ensure the data directory exists
|
||||
@@ -198,6 +215,21 @@ func (d *D) Sync() (err error) {
|
||||
return d.DB.Sync()
|
||||
}
|
||||
|
||||
// QueryCacheStats returns statistics about the query cache
|
||||
func (d *D) QueryCacheStats() querycache.CacheStats {
|
||||
if d.queryCache == nil {
|
||||
return querycache.CacheStats{}
|
||||
}
|
||||
return d.queryCache.Stats()
|
||||
}
|
||||
|
||||
// InvalidateQueryCache clears all entries from the query cache
|
||||
func (d *D) InvalidateQueryCache() {
|
||||
if d.queryCache != nil {
|
||||
d.queryCache.Invalidate()
|
||||
}
|
||||
}
|
||||
|
||||
// Close releases resources and closes the database.
|
||||
func (d *D) Close() (err error) {
|
||||
if d.seq != nil {
|
||||
|
||||
@@ -51,6 +51,14 @@ func (d *D) QueryAllVersions(c context.Context, f *filter.F) (
|
||||
func (d *D) QueryEventsWithOptions(c context.Context, f *filter.F, includeDeleteEvents bool, showAllVersions bool) (
|
||||
evs event.S, err error,
|
||||
) {
|
||||
// Try cache first (only for standard queries, not special cases)
|
||||
if d.queryCache != nil && !showAllVersions && includeDeleteEvents {
|
||||
if cachedEvents, found := d.queryCache.Get(f); found {
|
||||
log.D.F("QueryEventsWithOptions: returning %d cached events", len(cachedEvents))
|
||||
return cachedEvents, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Determine if we should return multiple versions of replaceable events
|
||||
// based on the limit parameter
|
||||
wantMultipleVersions := showAllVersions || (f.Limit != nil && *f.Limit > 1)
|
||||
@@ -583,6 +591,13 @@ func (d *D) QueryEventsWithOptions(c context.Context, f *filter.F, includeDelete
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Populate cache with results (only for standard queries)
|
||||
if d.queryCache != nil && !showAllVersions && includeDeleteEvents && len(evs) > 0 {
|
||||
d.queryCache.Put(f, evs)
|
||||
log.D.F("QueryEventsWithOptions: cached %d events", len(evs))
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
247
pkg/database/querycache/event_cache.go
Normal file
247
pkg/database/querycache/event_cache.go
Normal file
@@ -0,0 +1,247 @@
|
||||
package querycache
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"lol.mleku.dev/log"
|
||||
"next.orly.dev/pkg/encoders/event"
|
||||
"next.orly.dev/pkg/encoders/filter"
|
||||
)
|
||||
|
||||
const (
|
||||
// DefaultMaxSize is the default maximum cache size in bytes (512 MB)
|
||||
DefaultMaxSize = 512 * 1024 * 1024
|
||||
// DefaultMaxAge is the default maximum age for cache entries
|
||||
DefaultMaxAge = 5 * time.Minute
|
||||
)
|
||||
|
||||
// EventCacheEntry represents a cached set of events for a filter
|
||||
type EventCacheEntry struct {
|
||||
FilterKey string
|
||||
Events event.S // Slice of events
|
||||
TotalSize int // Estimated size in bytes
|
||||
LastAccess time.Time
|
||||
CreatedAt time.Time
|
||||
listElement *list.Element
|
||||
}
|
||||
|
||||
// EventCache caches event.S results from database queries
|
||||
type EventCache struct {
|
||||
mu sync.RWMutex
|
||||
|
||||
entries map[string]*EventCacheEntry
|
||||
lruList *list.List
|
||||
|
||||
currentSize int64
|
||||
maxSize int64
|
||||
maxAge time.Duration
|
||||
|
||||
hits uint64
|
||||
misses uint64
|
||||
evictions uint64
|
||||
invalidations uint64
|
||||
}
|
||||
|
||||
// NewEventCache creates a new event cache
|
||||
func NewEventCache(maxSize int64, maxAge time.Duration) *EventCache {
|
||||
if maxSize <= 0 {
|
||||
maxSize = DefaultMaxSize
|
||||
}
|
||||
if maxAge <= 0 {
|
||||
maxAge = DefaultMaxAge
|
||||
}
|
||||
|
||||
c := &EventCache{
|
||||
entries: make(map[string]*EventCacheEntry),
|
||||
lruList: list.New(),
|
||||
maxSize: maxSize,
|
||||
maxAge: maxAge,
|
||||
}
|
||||
|
||||
go c.cleanupExpired()
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
// Get retrieves cached events for a filter
|
||||
func (c *EventCache) Get(f *filter.F) (events event.S, found bool) {
|
||||
filterKey := string(f.Serialize())
|
||||
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
entry, exists := c.entries[filterKey]
|
||||
if !exists {
|
||||
c.misses++
|
||||
return nil, false
|
||||
}
|
||||
|
||||
// Check if expired
|
||||
if time.Since(entry.CreatedAt) > c.maxAge {
|
||||
c.removeEntry(entry)
|
||||
c.misses++
|
||||
return nil, false
|
||||
}
|
||||
|
||||
// Update access time and move to front
|
||||
entry.LastAccess = time.Now()
|
||||
c.lruList.MoveToFront(entry.listElement)
|
||||
|
||||
c.hits++
|
||||
log.D.F("event cache HIT: filter=%s events=%d", filterKey[:min(50, len(filterKey))], len(entry.Events))
|
||||
|
||||
return entry.Events, true
|
||||
}
|
||||
|
||||
// Put stores events in the cache
|
||||
func (c *EventCache) Put(f *filter.F, events event.S) {
|
||||
if len(events) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
filterKey := string(f.Serialize())
|
||||
|
||||
// Estimate size: each event is roughly 500 bytes on average
|
||||
estimatedSize := len(events) * 500
|
||||
|
||||
// Don't cache if too large
|
||||
if int64(estimatedSize) > c.maxSize {
|
||||
log.W.F("event cache: entry too large: %d bytes", estimatedSize)
|
||||
return
|
||||
}
|
||||
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
// Check if already exists
|
||||
if existing, exists := c.entries[filterKey]; exists {
|
||||
c.currentSize -= int64(existing.TotalSize)
|
||||
existing.Events = events
|
||||
existing.TotalSize = estimatedSize
|
||||
existing.LastAccess = time.Now()
|
||||
existing.CreatedAt = time.Now()
|
||||
c.currentSize += int64(estimatedSize)
|
||||
c.lruList.MoveToFront(existing.listElement)
|
||||
return
|
||||
}
|
||||
|
||||
// Evict if necessary
|
||||
for c.currentSize+int64(estimatedSize) > c.maxSize && c.lruList.Len() > 0 {
|
||||
oldest := c.lruList.Back()
|
||||
if oldest != nil {
|
||||
oldEntry := oldest.Value.(*EventCacheEntry)
|
||||
c.removeEntry(oldEntry)
|
||||
c.evictions++
|
||||
}
|
||||
}
|
||||
|
||||
// Create new entry
|
||||
entry := &EventCacheEntry{
|
||||
FilterKey: filterKey,
|
||||
Events: events,
|
||||
TotalSize: estimatedSize,
|
||||
LastAccess: time.Now(),
|
||||
CreatedAt: time.Now(),
|
||||
}
|
||||
|
||||
entry.listElement = c.lruList.PushFront(entry)
|
||||
c.entries[filterKey] = entry
|
||||
c.currentSize += int64(estimatedSize)
|
||||
|
||||
log.D.F("event cache PUT: filter=%s events=%d size=%d total=%d/%d",
|
||||
filterKey[:min(50, len(filterKey))], len(events), estimatedSize, c.currentSize, c.maxSize)
|
||||
}
|
||||
|
||||
// Invalidate clears all entries (called when new events are stored)
|
||||
func (c *EventCache) Invalidate() {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
if len(c.entries) > 0 {
|
||||
cleared := len(c.entries)
|
||||
c.entries = make(map[string]*EventCacheEntry)
|
||||
c.lruList = list.New()
|
||||
c.currentSize = 0
|
||||
c.invalidations += uint64(cleared)
|
||||
log.T.F("event cache INVALIDATE: cleared %d entries", cleared)
|
||||
}
|
||||
}
|
||||
|
||||
// removeEntry removes an entry (must be called with lock held)
|
||||
func (c *EventCache) removeEntry(entry *EventCacheEntry) {
|
||||
delete(c.entries, entry.FilterKey)
|
||||
c.lruList.Remove(entry.listElement)
|
||||
c.currentSize -= int64(entry.TotalSize)
|
||||
}
|
||||
|
||||
// cleanupExpired removes expired entries periodically
|
||||
func (c *EventCache) cleanupExpired() {
|
||||
ticker := time.NewTicker(1 * time.Minute)
|
||||
defer ticker.Stop()
|
||||
|
||||
for range ticker.C {
|
||||
c.mu.Lock()
|
||||
now := time.Now()
|
||||
var toRemove []*EventCacheEntry
|
||||
|
||||
for _, entry := range c.entries {
|
||||
if now.Sub(entry.CreatedAt) > c.maxAge {
|
||||
toRemove = append(toRemove, entry)
|
||||
}
|
||||
}
|
||||
|
||||
for _, entry := range toRemove {
|
||||
c.removeEntry(entry)
|
||||
}
|
||||
|
||||
if len(toRemove) > 0 {
|
||||
log.D.F("event cache cleanup: removed %d expired entries", len(toRemove))
|
||||
}
|
||||
|
||||
c.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// CacheStats holds cache performance metrics
|
||||
type CacheStats struct {
|
||||
Entries int
|
||||
CurrentSize int64
|
||||
MaxSize int64
|
||||
Hits uint64
|
||||
Misses uint64
|
||||
HitRate float64
|
||||
Evictions uint64
|
||||
Invalidations uint64
|
||||
}
|
||||
|
||||
// Stats returns cache statistics
|
||||
func (c *EventCache) Stats() CacheStats {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
|
||||
total := c.hits + c.misses
|
||||
hitRate := 0.0
|
||||
if total > 0 {
|
||||
hitRate = float64(c.hits) / float64(total)
|
||||
}
|
||||
|
||||
return CacheStats{
|
||||
Entries: len(c.entries),
|
||||
CurrentSize: c.currentSize,
|
||||
MaxSize: c.maxSize,
|
||||
Hits: c.hits,
|
||||
Misses: c.misses,
|
||||
HitRate: hitRate,
|
||||
Evictions: c.evictions,
|
||||
Invalidations: c.invalidations,
|
||||
}
|
||||
}
|
||||
|
||||
func min(a, b int) int {
|
||||
if a < b {
|
||||
return a
|
||||
}
|
||||
return b
|
||||
}
|
||||
@@ -5,6 +5,8 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/dgraph-io/badger/v4"
|
||||
@@ -34,7 +36,9 @@ func (d *D) GetSerialsFromFilter(f *filter.F) (
|
||||
return
|
||||
}
|
||||
// Pre-allocate slice with estimated capacity to reduce reallocations
|
||||
sers = make(types.Uint40s, 0, len(idxs)*100) // Estimate 100 serials per index
|
||||
sers = make(
|
||||
types.Uint40s, 0, len(idxs)*100,
|
||||
) // Estimate 100 serials per index
|
||||
for _, idx := range idxs {
|
||||
var s types.Uint40s
|
||||
if s, err = d.GetSerialsByRange(idx); chk.E(err) {
|
||||
@@ -111,13 +115,13 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) (
|
||||
err = errors.New("nil event")
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
// Reject ephemeral events (kinds 20000-29999) - they should never be stored
|
||||
if ev.Kind >= 20000 && ev.Kind <= 29999 {
|
||||
err = errors.New("blocked: ephemeral events should not be stored")
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
// check if the event already exists
|
||||
var ser *types.Uint40
|
||||
if ser, err = d.GetSerialById(ev.ID); err == nil && ser != nil {
|
||||
@@ -176,7 +180,10 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) (
|
||||
if idxs, err = GetIndexesForEvent(ev, serial); chk.E(err) {
|
||||
return
|
||||
}
|
||||
log.T.F("SaveEvent: generated %d indexes for event %x (kind %d)", len(idxs), ev.ID, ev.Kind)
|
||||
log.T.F(
|
||||
"SaveEvent: generated %d indexes for event %x (kind %d)", len(idxs),
|
||||
ev.ID, ev.Kind,
|
||||
)
|
||||
|
||||
// Serialize event once to check size
|
||||
eventDataBuf := new(bytes.Buffer)
|
||||
@@ -184,9 +191,15 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) (
|
||||
eventData := eventDataBuf.Bytes()
|
||||
|
||||
// Determine storage strategy (Reiser4 optimizations)
|
||||
// 384 bytes covers: ID(32) + Pubkey(32) + Sig(64) + basic fields + small content
|
||||
const smallEventThreshold = 384
|
||||
isSmallEvent := len(eventData) <= smallEventThreshold
|
||||
// Get threshold from environment, default to 0 (disabled)
|
||||
// When enabled, typical values: 384 (conservative), 512 (recommended), 1024 (aggressive)
|
||||
smallEventThreshold := 1024
|
||||
if v := os.Getenv("ORLY_INLINE_EVENT_THRESHOLD"); v != "" {
|
||||
if n, perr := strconv.Atoi(v); perr == nil && n >= 0 {
|
||||
smallEventThreshold = n
|
||||
}
|
||||
}
|
||||
isSmallEvent := smallEventThreshold > 0 && len(eventData) <= smallEventThreshold
|
||||
isReplaceableEvent := kind.IsReplaceable(ev.Kind)
|
||||
isAddressableEvent := kind.IsParameterizedReplaceable(ev.Kind)
|
||||
|
||||
@@ -224,7 +237,9 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) (
|
||||
return
|
||||
}
|
||||
// Append size as uint16 big-endian (2 bytes for size up to 65535)
|
||||
sizeBytes := []byte{byte(len(eventData) >> 8), byte(len(eventData))}
|
||||
sizeBytes := []byte{
|
||||
byte(len(eventData) >> 8), byte(len(eventData)),
|
||||
}
|
||||
keyBuf.Write(sizeBytes)
|
||||
// Append event data
|
||||
keyBuf.Write(eventData)
|
||||
@@ -232,7 +247,10 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) (
|
||||
if err = txn.Set(keyBuf.Bytes(), nil); chk.E(err) {
|
||||
return
|
||||
}
|
||||
log.T.F("SaveEvent: stored small event inline (%d bytes)", len(eventData))
|
||||
log.T.F(
|
||||
"SaveEvent: stored small event inline (%d bytes)",
|
||||
len(eventData),
|
||||
)
|
||||
} else {
|
||||
// Large event: store separately with evt prefix
|
||||
keyBuf := new(bytes.Buffer)
|
||||
@@ -242,7 +260,10 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) (
|
||||
if err = txn.Set(keyBuf.Bytes(), eventData); chk.E(err) {
|
||||
return
|
||||
}
|
||||
log.T.F("SaveEvent: stored large event separately (%d bytes)", len(eventData))
|
||||
log.T.F(
|
||||
"SaveEvent: stored large event separately (%d bytes)",
|
||||
len(eventData),
|
||||
)
|
||||
}
|
||||
|
||||
// Additionally, store replaceable/addressable events with specialized keys for direct access
|
||||
@@ -256,11 +277,15 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) (
|
||||
dTagHash.FromIdent(dTag.Value())
|
||||
|
||||
keyBuf := new(bytes.Buffer)
|
||||
if err = indexes.AddressableEventEnc(pubHash, kindVal, dTagHash).MarshalWrite(keyBuf); chk.E(err) {
|
||||
if err = indexes.AddressableEventEnc(
|
||||
pubHash, kindVal, dTagHash,
|
||||
).MarshalWrite(keyBuf); chk.E(err) {
|
||||
return
|
||||
}
|
||||
// Append size as uint16 big-endian
|
||||
sizeBytes := []byte{byte(len(eventData) >> 8), byte(len(eventData))}
|
||||
sizeBytes := []byte{
|
||||
byte(len(eventData) >> 8), byte(len(eventData)),
|
||||
}
|
||||
keyBuf.Write(sizeBytes)
|
||||
// Append event data
|
||||
keyBuf.Write(eventData)
|
||||
@@ -277,11 +302,15 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) (
|
||||
kindVal.Set(ev.Kind)
|
||||
|
||||
keyBuf := new(bytes.Buffer)
|
||||
if err = indexes.ReplaceableEventEnc(pubHash, kindVal).MarshalWrite(keyBuf); chk.E(err) {
|
||||
if err = indexes.ReplaceableEventEnc(
|
||||
pubHash, kindVal,
|
||||
).MarshalWrite(keyBuf); chk.E(err) {
|
||||
return
|
||||
}
|
||||
// Append size as uint16 big-endian
|
||||
sizeBytes := []byte{byte(len(eventData) >> 8), byte(len(eventData))}
|
||||
sizeBytes := []byte{
|
||||
byte(len(eventData) >> 8), byte(len(eventData)),
|
||||
}
|
||||
keyBuf.Write(sizeBytes)
|
||||
// Append event data
|
||||
keyBuf.Write(eventData)
|
||||
@@ -297,7 +326,7 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) (
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
// Process deletion events to actually delete the referenced events
|
||||
if ev.Kind == kind.Deletion.K {
|
||||
if err = d.ProcessDelete(ev, nil); chk.E(err) {
|
||||
@@ -306,5 +335,13 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) (
|
||||
err = nil
|
||||
}
|
||||
}
|
||||
|
||||
// Invalidate query cache since a new event was stored
|
||||
// This ensures subsequent queries will see the new event
|
||||
if d.queryCache != nil {
|
||||
d.queryCache.Invalidate()
|
||||
log.T.F("SaveEvent: invalidated query cache")
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user