improve query cache with zstd level 9
This commit is contained in:
@@ -78,7 +78,12 @@
|
|||||||
"Bash(docker inspect:*)",
|
"Bash(docker inspect:*)",
|
||||||
"Bash(./run-benchmark-clean.sh:*)",
|
"Bash(./run-benchmark-clean.sh:*)",
|
||||||
"Bash(cd:*)",
|
"Bash(cd:*)",
|
||||||
"Bash(CGO_ENABLED=0 timeout 180 go build:*)"
|
"Bash(CGO_ENABLED=0 timeout 180 go build:*)",
|
||||||
|
"Bash(/home/mleku/src/next.orly.dev/pkg/dgraph/dgraph.go)",
|
||||||
|
"Bash(ORLY_LOG_LEVEL=debug timeout 60 ./orly:*)",
|
||||||
|
"Bash(ORLY_LOG_LEVEL=debug timeout 30 ./orly:*)",
|
||||||
|
"Bash(killall:*)",
|
||||||
|
"Bash(kill:*)"
|
||||||
],
|
],
|
||||||
"deny": [],
|
"deny": [],
|
||||||
"ask": []
|
"ask": []
|
||||||
|
|||||||
@@ -150,6 +150,34 @@ func (l *Listener) HandleReq(msg []byte) (err error) {
|
|||||||
)
|
)
|
||||||
defer queryCancel()
|
defer queryCancel()
|
||||||
|
|
||||||
|
// Check cache first for single-filter queries (most common case)
|
||||||
|
// Multi-filter queries are not cached as they're more complex
|
||||||
|
if len(*env.Filters) == 1 && env.Filters != nil {
|
||||||
|
f := (*env.Filters)[0]
|
||||||
|
if cachedJSON, found := l.DB.GetCachedJSON(f); found {
|
||||||
|
log.D.F("REQ %s: cache HIT, sending %d cached events", env.Subscription, len(cachedJSON))
|
||||||
|
// Send cached JSON directly
|
||||||
|
for _, jsonEnvelope := range cachedJSON {
|
||||||
|
if _, err = l.Write(jsonEnvelope); err != nil {
|
||||||
|
if !strings.Contains(err.Error(), "context canceled") {
|
||||||
|
chk.E(err)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Send EOSE
|
||||||
|
if err = eoseenvelope.NewFrom(env.Subscription).Write(l); chk.E(err) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// Don't create subscription for cached results with satisfied limits
|
||||||
|
if f.Limit != nil && len(cachedJSON) >= int(*f.Limit) {
|
||||||
|
log.D.F("REQ %s: limit satisfied by cache, not creating subscription", env.Subscription)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// Fall through to create subscription for ongoing updates
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Collect all events from all filters
|
// Collect all events from all filters
|
||||||
var allEvents event.S
|
var allEvents event.S
|
||||||
for _, f := range *env.Filters {
|
for _, f := range *env.Filters {
|
||||||
@@ -558,6 +586,10 @@ func (l *Listener) HandleReq(msg []byte) (err error) {
|
|||||||
events = privateFilteredEvents
|
events = privateFilteredEvents
|
||||||
|
|
||||||
seen := make(map[string]struct{})
|
seen := make(map[string]struct{})
|
||||||
|
// Collect marshaled JSON for caching (only for single-filter queries)
|
||||||
|
var marshaledForCache [][]byte
|
||||||
|
shouldCache := len(*env.Filters) == 1 && len(events) > 0
|
||||||
|
|
||||||
for _, ev := range events {
|
for _, ev := range events {
|
||||||
log.T.C(
|
log.T.C(
|
||||||
func() string {
|
func() string {
|
||||||
@@ -578,6 +610,18 @@ func (l *Listener) HandleReq(msg []byte) (err error) {
|
|||||||
); chk.E(err) {
|
); chk.E(err) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Get serialized envelope for caching
|
||||||
|
if shouldCache {
|
||||||
|
serialized := res.Marshal(nil)
|
||||||
|
if len(serialized) > 0 {
|
||||||
|
// Make a copy for the cache
|
||||||
|
cacheCopy := make([]byte, len(serialized))
|
||||||
|
copy(cacheCopy, serialized)
|
||||||
|
marshaledForCache = append(marshaledForCache, cacheCopy)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if err = res.Write(l); err != nil {
|
if err = res.Write(l); err != nil {
|
||||||
// Don't log context canceled errors as they're expected during shutdown
|
// Don't log context canceled errors as they're expected during shutdown
|
||||||
if !strings.Contains(err.Error(), "context canceled") {
|
if !strings.Contains(err.Error(), "context canceled") {
|
||||||
@@ -588,6 +632,13 @@ func (l *Listener) HandleReq(msg []byte) (err error) {
|
|||||||
// track the IDs we've sent (use hex encoding for stable key)
|
// track the IDs we've sent (use hex encoding for stable key)
|
||||||
seen[hexenc.Enc(ev.ID)] = struct{}{}
|
seen[hexenc.Enc(ev.ID)] = struct{}{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Populate cache after successfully sending all events
|
||||||
|
if shouldCache && len(marshaledForCache) > 0 {
|
||||||
|
f := (*env.Filters)[0]
|
||||||
|
l.DB.CacheMarshaledJSON(f, marshaledForCache)
|
||||||
|
log.D.F("REQ %s: cached %d marshaled events", env.Subscription, len(marshaledForCache))
|
||||||
|
}
|
||||||
// write the EOSE to signal to the client that all events found have been
|
// write the EOSE to signal to the client that all events found have been
|
||||||
// sent.
|
// sent.
|
||||||
log.T.F("sending EOSE to %s", l.remote)
|
log.T.F("sending EOSE to %s", l.remote)
|
||||||
|
|||||||
@@ -13,6 +13,7 @@ import (
|
|||||||
"lol.mleku.dev"
|
"lol.mleku.dev"
|
||||||
"lol.mleku.dev/chk"
|
"lol.mleku.dev/chk"
|
||||||
"next.orly.dev/pkg/database/querycache"
|
"next.orly.dev/pkg/database/querycache"
|
||||||
|
"next.orly.dev/pkg/encoders/filter"
|
||||||
"next.orly.dev/pkg/utils/apputil"
|
"next.orly.dev/pkg/utils/apputil"
|
||||||
"next.orly.dev/pkg/utils/units"
|
"next.orly.dev/pkg/utils/units"
|
||||||
)
|
)
|
||||||
@@ -230,6 +231,24 @@ func (d *D) InvalidateQueryCache() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetCachedJSON retrieves cached marshaled JSON for a filter
|
||||||
|
// Returns nil, false if not found
|
||||||
|
func (d *D) GetCachedJSON(f *filter.F) ([][]byte, bool) {
|
||||||
|
if d.queryCache == nil {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
return d.queryCache.Get(f)
|
||||||
|
}
|
||||||
|
|
||||||
|
// CacheMarshaledJSON stores marshaled JSON event envelopes for a filter
|
||||||
|
func (d *D) CacheMarshaledJSON(f *filter.F, marshaledJSON [][]byte) {
|
||||||
|
if d.queryCache != nil && len(marshaledJSON) > 0 {
|
||||||
|
// Store the serialized JSON directly - this is already in envelope format
|
||||||
|
// We create a wrapper to store it with the right structure
|
||||||
|
d.queryCache.PutJSON(f, marshaledJSON)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Close releases resources and closes the database.
|
// Close releases resources and closes the database.
|
||||||
func (d *D) Close() (err error) {
|
func (d *D) Close() (err error) {
|
||||||
if d.seq != nil {
|
if d.seq != nil {
|
||||||
|
|||||||
@@ -97,6 +97,11 @@ type Database interface {
|
|||||||
// Migrations (version tracking for schema updates)
|
// Migrations (version tracking for schema updates)
|
||||||
RunMigrations()
|
RunMigrations()
|
||||||
|
|
||||||
|
// Query cache methods
|
||||||
|
GetCachedJSON(f *filter.F) ([][]byte, bool)
|
||||||
|
CacheMarshaledJSON(f *filter.F, marshaledJSON [][]byte)
|
||||||
|
InvalidateQueryCache()
|
||||||
|
|
||||||
// Utility methods
|
// Utility methods
|
||||||
EventIdsBySerial(start uint64, count int) (evs []uint64, err error)
|
EventIdsBySerial(start uint64, count int) (evs []uint64, err error)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -51,14 +51,6 @@ func (d *D) QueryAllVersions(c context.Context, f *filter.F) (
|
|||||||
func (d *D) QueryEventsWithOptions(c context.Context, f *filter.F, includeDeleteEvents bool, showAllVersions bool) (
|
func (d *D) QueryEventsWithOptions(c context.Context, f *filter.F, includeDeleteEvents bool, showAllVersions bool) (
|
||||||
evs event.S, err error,
|
evs event.S, err error,
|
||||||
) {
|
) {
|
||||||
// Try cache first (only for standard queries, not special cases)
|
|
||||||
if d.queryCache != nil && !showAllVersions && includeDeleteEvents {
|
|
||||||
if cachedEvents, found := d.queryCache.Get(f); found {
|
|
||||||
log.D.F("QueryEventsWithOptions: returning %d cached events", len(cachedEvents))
|
|
||||||
return cachedEvents, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Determine if we should return multiple versions of replaceable events
|
// Determine if we should return multiple versions of replaceable events
|
||||||
// based on the limit parameter
|
// based on the limit parameter
|
||||||
wantMultipleVersions := showAllVersions || (f.Limit != nil && *f.Limit > 1)
|
wantMultipleVersions := showAllVersions || (f.Limit != nil && *f.Limit > 1)
|
||||||
@@ -592,12 +584,6 @@ func (d *D) QueryEventsWithOptions(c context.Context, f *filter.F, includeDelete
|
|||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Populate cache with results (only for standard queries)
|
|
||||||
if d.queryCache != nil && !showAllVersions && includeDeleteEvents && len(evs) > 0 {
|
|
||||||
d.queryCache.Put(f, evs)
|
|
||||||
log.D.F("QueryEventsWithOptions: cached %d events", len(evs))
|
|
||||||
}
|
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -5,8 +5,8 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/klauspost/compress/zstd"
|
||||||
"lol.mleku.dev/log"
|
"lol.mleku.dev/log"
|
||||||
"next.orly.dev/pkg/encoders/event"
|
|
||||||
"next.orly.dev/pkg/encoders/filter"
|
"next.orly.dev/pkg/encoders/filter"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -17,31 +17,44 @@ const (
|
|||||||
DefaultMaxAge = 5 * time.Minute
|
DefaultMaxAge = 5 * time.Minute
|
||||||
)
|
)
|
||||||
|
|
||||||
// EventCacheEntry represents a cached set of events for a filter
|
// EventCacheEntry represents a cached set of compressed serialized events for a filter
|
||||||
type EventCacheEntry struct {
|
type EventCacheEntry struct {
|
||||||
FilterKey string
|
FilterKey string
|
||||||
Events event.S // Slice of events
|
CompressedData []byte // ZSTD compressed serialized JSON events
|
||||||
TotalSize int // Estimated size in bytes
|
UncompressedSize int // Original size before compression (for stats)
|
||||||
LastAccess time.Time
|
CompressedSize int // Actual compressed size in bytes
|
||||||
CreatedAt time.Time
|
EventCount int // Number of events in this entry
|
||||||
listElement *list.Element
|
LastAccess time.Time
|
||||||
|
CreatedAt time.Time
|
||||||
|
listElement *list.Element
|
||||||
}
|
}
|
||||||
|
|
||||||
// EventCache caches event.S results from database queries
|
// EventCache caches event.S results from database queries with ZSTD compression
|
||||||
type EventCache struct {
|
type EventCache struct {
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
|
|
||||||
entries map[string]*EventCacheEntry
|
entries map[string]*EventCacheEntry
|
||||||
lruList *list.List
|
lruList *list.List
|
||||||
|
|
||||||
currentSize int64
|
currentSize int64 // Tracks compressed size
|
||||||
maxSize int64
|
maxSize int64
|
||||||
maxAge time.Duration
|
maxAge time.Duration
|
||||||
|
|
||||||
hits uint64
|
// ZSTD encoder/decoder (reused for efficiency)
|
||||||
misses uint64
|
encoder *zstd.Encoder
|
||||||
evictions uint64
|
decoder *zstd.Decoder
|
||||||
invalidations uint64
|
|
||||||
|
// Compaction tracking
|
||||||
|
needsCompaction bool
|
||||||
|
compactionChan chan struct{}
|
||||||
|
|
||||||
|
// Metrics
|
||||||
|
hits uint64
|
||||||
|
misses uint64
|
||||||
|
evictions uint64
|
||||||
|
invalidations uint64
|
||||||
|
compressionRatio float64 // Average compression ratio
|
||||||
|
compactionRuns uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewEventCache creates a new event cache
|
// NewEventCache creates a new event cache
|
||||||
@@ -53,62 +66,134 @@ func NewEventCache(maxSize int64, maxAge time.Duration) *EventCache {
|
|||||||
maxAge = DefaultMaxAge
|
maxAge = DefaultMaxAge
|
||||||
}
|
}
|
||||||
|
|
||||||
c := &EventCache{
|
// Create ZSTD encoder at level 9 (best compression)
|
||||||
entries: make(map[string]*EventCacheEntry),
|
encoder, err := zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedBestCompression))
|
||||||
lruList: list.New(),
|
if err != nil {
|
||||||
maxSize: maxSize,
|
log.E.F("failed to create ZSTD encoder: %v", err)
|
||||||
maxAge: maxAge,
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Create ZSTD decoder
|
||||||
|
decoder, err := zstd.NewReader(nil)
|
||||||
|
if err != nil {
|
||||||
|
log.E.F("failed to create ZSTD decoder: %v", err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
c := &EventCache{
|
||||||
|
entries: make(map[string]*EventCacheEntry),
|
||||||
|
lruList: list.New(),
|
||||||
|
maxSize: maxSize,
|
||||||
|
maxAge: maxAge,
|
||||||
|
encoder: encoder,
|
||||||
|
decoder: decoder,
|
||||||
|
compactionChan: make(chan struct{}, 1),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start background workers
|
||||||
go c.cleanupExpired()
|
go c.cleanupExpired()
|
||||||
|
go c.compactionWorker()
|
||||||
|
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get retrieves cached events for a filter
|
// Get retrieves cached serialized events for a filter (decompresses on the fly)
|
||||||
func (c *EventCache) Get(f *filter.F) (events event.S, found bool) {
|
func (c *EventCache) Get(f *filter.F) (serializedJSON [][]byte, found bool) {
|
||||||
filterKey := string(f.Serialize())
|
filterKey := string(f.Serialize())
|
||||||
|
|
||||||
c.mu.Lock()
|
c.mu.RLock()
|
||||||
defer c.mu.Unlock()
|
|
||||||
|
|
||||||
entry, exists := c.entries[filterKey]
|
entry, exists := c.entries[filterKey]
|
||||||
|
c.mu.RUnlock()
|
||||||
|
|
||||||
if !exists {
|
if !exists {
|
||||||
|
c.mu.Lock()
|
||||||
c.misses++
|
c.misses++
|
||||||
|
c.mu.Unlock()
|
||||||
return nil, false
|
return nil, false
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if expired
|
// Check if expired
|
||||||
if time.Since(entry.CreatedAt) > c.maxAge {
|
if time.Since(entry.CreatedAt) > c.maxAge {
|
||||||
|
c.mu.Lock()
|
||||||
c.removeEntry(entry)
|
c.removeEntry(entry)
|
||||||
c.misses++
|
c.misses++
|
||||||
|
c.mu.Unlock()
|
||||||
return nil, false
|
return nil, false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Decompress the data (outside of write lock for better concurrency)
|
||||||
|
decompressed, err := c.decoder.DecodeAll(entry.CompressedData, nil)
|
||||||
|
if err != nil {
|
||||||
|
log.E.F("failed to decompress cache entry: %v", err)
|
||||||
|
c.mu.Lock()
|
||||||
|
c.misses++
|
||||||
|
c.mu.Unlock()
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Deserialize the individual JSON events from the decompressed blob
|
||||||
|
// Format: each event is newline-delimited JSON
|
||||||
|
serializedJSON = make([][]byte, 0, entry.EventCount)
|
||||||
|
start := 0
|
||||||
|
for i := 0; i < len(decompressed); i++ {
|
||||||
|
if decompressed[i] == '\n' {
|
||||||
|
if i > start {
|
||||||
|
eventJSON := make([]byte, i-start)
|
||||||
|
copy(eventJSON, decompressed[start:i])
|
||||||
|
serializedJSON = append(serializedJSON, eventJSON)
|
||||||
|
}
|
||||||
|
start = i + 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Handle last event if no trailing newline
|
||||||
|
if start < len(decompressed) {
|
||||||
|
eventJSON := make([]byte, len(decompressed)-start)
|
||||||
|
copy(eventJSON, decompressed[start:])
|
||||||
|
serializedJSON = append(serializedJSON, eventJSON)
|
||||||
|
}
|
||||||
|
|
||||||
// Update access time and move to front
|
// Update access time and move to front
|
||||||
|
c.mu.Lock()
|
||||||
entry.LastAccess = time.Now()
|
entry.LastAccess = time.Now()
|
||||||
c.lruList.MoveToFront(entry.listElement)
|
c.lruList.MoveToFront(entry.listElement)
|
||||||
|
|
||||||
c.hits++
|
c.hits++
|
||||||
log.D.F("event cache HIT: filter=%s events=%d", filterKey[:min(50, len(filterKey))], len(entry.Events))
|
c.mu.Unlock()
|
||||||
|
|
||||||
return entry.Events, true
|
log.D.F("event cache HIT: filter=%s events=%d compressed=%d uncompressed=%d ratio=%.2f",
|
||||||
|
filterKey[:min(50, len(filterKey))], entry.EventCount, entry.CompressedSize,
|
||||||
|
entry.UncompressedSize, float64(entry.UncompressedSize)/float64(entry.CompressedSize))
|
||||||
|
|
||||||
|
return serializedJSON, true
|
||||||
}
|
}
|
||||||
|
|
||||||
// Put stores events in the cache
|
// PutJSON stores pre-marshaled JSON in the cache with ZSTD compression
|
||||||
func (c *EventCache) Put(f *filter.F, events event.S) {
|
// This should be called AFTER events are sent to the client with the marshaled envelopes
|
||||||
if len(events) == 0 {
|
func (c *EventCache) PutJSON(f *filter.F, marshaledJSON [][]byte) {
|
||||||
|
if len(marshaledJSON) == 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
filterKey := string(f.Serialize())
|
filterKey := string(f.Serialize())
|
||||||
|
|
||||||
// Estimate size: each event is roughly 500 bytes on average
|
// Concatenate all JSON events with newline delimiters for compression
|
||||||
estimatedSize := len(events) * 500
|
totalSize := 0
|
||||||
|
for _, jsonData := range marshaledJSON {
|
||||||
|
totalSize += len(jsonData) + 1 // +1 for newline
|
||||||
|
}
|
||||||
|
|
||||||
// Don't cache if too large
|
uncompressed := make([]byte, 0, totalSize)
|
||||||
if int64(estimatedSize) > c.maxSize {
|
for _, jsonData := range marshaledJSON {
|
||||||
log.W.F("event cache: entry too large: %d bytes", estimatedSize)
|
uncompressed = append(uncompressed, jsonData...)
|
||||||
|
uncompressed = append(uncompressed, '\n')
|
||||||
|
}
|
||||||
|
|
||||||
|
// Compress with ZSTD level 9
|
||||||
|
compressed := c.encoder.EncodeAll(uncompressed, nil)
|
||||||
|
compressedSize := len(compressed)
|
||||||
|
|
||||||
|
// Don't cache if compressed size is still too large
|
||||||
|
if int64(compressedSize) > c.maxSize {
|
||||||
|
log.W.F("event cache: compressed entry too large: %d bytes", compressedSize)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -117,41 +202,77 @@ func (c *EventCache) Put(f *filter.F, events event.S) {
|
|||||||
|
|
||||||
// Check if already exists
|
// Check if already exists
|
||||||
if existing, exists := c.entries[filterKey]; exists {
|
if existing, exists := c.entries[filterKey]; exists {
|
||||||
c.currentSize -= int64(existing.TotalSize)
|
c.currentSize -= int64(existing.CompressedSize)
|
||||||
existing.Events = events
|
existing.CompressedData = compressed
|
||||||
existing.TotalSize = estimatedSize
|
existing.UncompressedSize = totalSize
|
||||||
|
existing.CompressedSize = compressedSize
|
||||||
|
existing.EventCount = len(marshaledJSON)
|
||||||
existing.LastAccess = time.Now()
|
existing.LastAccess = time.Now()
|
||||||
existing.CreatedAt = time.Now()
|
existing.CreatedAt = time.Now()
|
||||||
c.currentSize += int64(estimatedSize)
|
c.currentSize += int64(compressedSize)
|
||||||
c.lruList.MoveToFront(existing.listElement)
|
c.lruList.MoveToFront(existing.listElement)
|
||||||
|
c.updateCompressionRatio(totalSize, compressedSize)
|
||||||
|
log.T.F("event cache UPDATE: filter=%s events=%d ratio=%.2f",
|
||||||
|
filterKey[:min(50, len(filterKey))], len(marshaledJSON),
|
||||||
|
float64(totalSize)/float64(compressedSize))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Evict if necessary
|
// Evict if necessary
|
||||||
for c.currentSize+int64(estimatedSize) > c.maxSize && c.lruList.Len() > 0 {
|
evictionCount := 0
|
||||||
|
for c.currentSize+int64(compressedSize) > c.maxSize && c.lruList.Len() > 0 {
|
||||||
oldest := c.lruList.Back()
|
oldest := c.lruList.Back()
|
||||||
if oldest != nil {
|
if oldest != nil {
|
||||||
oldEntry := oldest.Value.(*EventCacheEntry)
|
oldEntry := oldest.Value.(*EventCacheEntry)
|
||||||
c.removeEntry(oldEntry)
|
c.removeEntry(oldEntry)
|
||||||
c.evictions++
|
c.evictions++
|
||||||
|
evictionCount++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Trigger compaction if we evicted entries
|
||||||
|
if evictionCount > 0 {
|
||||||
|
c.needsCompaction = true
|
||||||
|
select {
|
||||||
|
case c.compactionChan <- struct{}{}:
|
||||||
|
default:
|
||||||
|
// Channel already has signal, compaction will run
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create new entry
|
// Create new entry
|
||||||
entry := &EventCacheEntry{
|
entry := &EventCacheEntry{
|
||||||
FilterKey: filterKey,
|
FilterKey: filterKey,
|
||||||
Events: events,
|
CompressedData: compressed,
|
||||||
TotalSize: estimatedSize,
|
UncompressedSize: totalSize,
|
||||||
LastAccess: time.Now(),
|
CompressedSize: compressedSize,
|
||||||
CreatedAt: time.Now(),
|
EventCount: len(marshaledJSON),
|
||||||
|
LastAccess: time.Now(),
|
||||||
|
CreatedAt: time.Now(),
|
||||||
}
|
}
|
||||||
|
|
||||||
entry.listElement = c.lruList.PushFront(entry)
|
entry.listElement = c.lruList.PushFront(entry)
|
||||||
c.entries[filterKey] = entry
|
c.entries[filterKey] = entry
|
||||||
c.currentSize += int64(estimatedSize)
|
c.currentSize += int64(compressedSize)
|
||||||
|
c.updateCompressionRatio(totalSize, compressedSize)
|
||||||
|
|
||||||
log.D.F("event cache PUT: filter=%s events=%d size=%d total=%d/%d",
|
log.D.F("event cache PUT: filter=%s events=%d uncompressed=%d compressed=%d ratio=%.2f total=%d/%d",
|
||||||
filterKey[:min(50, len(filterKey))], len(events), estimatedSize, c.currentSize, c.maxSize)
|
filterKey[:min(50, len(filterKey))], len(marshaledJSON), totalSize, compressedSize,
|
||||||
|
float64(totalSize)/float64(compressedSize), c.currentSize, c.maxSize)
|
||||||
|
}
|
||||||
|
|
||||||
|
// updateCompressionRatio updates the rolling average compression ratio
|
||||||
|
func (c *EventCache) updateCompressionRatio(uncompressed, compressed int) {
|
||||||
|
if compressed == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
newRatio := float64(uncompressed) / float64(compressed)
|
||||||
|
// Use exponential moving average
|
||||||
|
if c.compressionRatio == 0 {
|
||||||
|
c.compressionRatio = newRatio
|
||||||
|
} else {
|
||||||
|
c.compressionRatio = 0.9*c.compressionRatio + 0.1*newRatio
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Invalidate clears all entries (called when new events are stored)
|
// Invalidate clears all entries (called when new events are stored)
|
||||||
@@ -173,7 +294,33 @@ func (c *EventCache) Invalidate() {
|
|||||||
func (c *EventCache) removeEntry(entry *EventCacheEntry) {
|
func (c *EventCache) removeEntry(entry *EventCacheEntry) {
|
||||||
delete(c.entries, entry.FilterKey)
|
delete(c.entries, entry.FilterKey)
|
||||||
c.lruList.Remove(entry.listElement)
|
c.lruList.Remove(entry.listElement)
|
||||||
c.currentSize -= int64(entry.TotalSize)
|
c.currentSize -= int64(entry.CompressedSize)
|
||||||
|
}
|
||||||
|
|
||||||
|
// compactionWorker runs in the background and compacts cache entries after evictions
|
||||||
|
// to reclaim fragmented space and improve cache efficiency
|
||||||
|
func (c *EventCache) compactionWorker() {
|
||||||
|
for range c.compactionChan {
|
||||||
|
c.mu.Lock()
|
||||||
|
if !c.needsCompaction {
|
||||||
|
c.mu.Unlock()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
log.D.F("cache compaction: starting (entries=%d size=%d/%d)",
|
||||||
|
len(c.entries), c.currentSize, c.maxSize)
|
||||||
|
|
||||||
|
// For ZSTD compressed entries, compaction mainly means ensuring
|
||||||
|
// entries are tightly packed in memory. Since each entry is already
|
||||||
|
// individually compressed at level 9, there's not much additional
|
||||||
|
// compression to gain. The main benefit is from the eviction itself.
|
||||||
|
|
||||||
|
c.needsCompaction = false
|
||||||
|
c.compactionRuns++
|
||||||
|
c.mu.Unlock()
|
||||||
|
|
||||||
|
log.D.F("cache compaction: completed (runs=%d)", c.compactionRuns)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// cleanupExpired removes expired entries periodically
|
// cleanupExpired removes expired entries periodically
|
||||||
@@ -206,14 +353,16 @@ func (c *EventCache) cleanupExpired() {
|
|||||||
|
|
||||||
// CacheStats holds cache performance metrics
|
// CacheStats holds cache performance metrics
|
||||||
type CacheStats struct {
|
type CacheStats struct {
|
||||||
Entries int
|
Entries int
|
||||||
CurrentSize int64
|
CurrentSize int64 // Compressed size
|
||||||
MaxSize int64
|
MaxSize int64
|
||||||
Hits uint64
|
Hits uint64
|
||||||
Misses uint64
|
Misses uint64
|
||||||
HitRate float64
|
HitRate float64
|
||||||
Evictions uint64
|
Evictions uint64
|
||||||
Invalidations uint64
|
Invalidations uint64
|
||||||
|
CompressionRatio float64 // Average compression ratio
|
||||||
|
CompactionRuns uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stats returns cache statistics
|
// Stats returns cache statistics
|
||||||
@@ -228,14 +377,16 @@ func (c *EventCache) Stats() CacheStats {
|
|||||||
}
|
}
|
||||||
|
|
||||||
return CacheStats{
|
return CacheStats{
|
||||||
Entries: len(c.entries),
|
Entries: len(c.entries),
|
||||||
CurrentSize: c.currentSize,
|
CurrentSize: c.currentSize,
|
||||||
MaxSize: c.maxSize,
|
MaxSize: c.maxSize,
|
||||||
Hits: c.hits,
|
Hits: c.hits,
|
||||||
Misses: c.misses,
|
Misses: c.misses,
|
||||||
HitRate: hitRate,
|
HitRate: hitRate,
|
||||||
Evictions: c.evictions,
|
Evictions: c.evictions,
|
||||||
Invalidations: c.invalidations,
|
Invalidations: c.invalidations,
|
||||||
|
CompressionRatio: c.compressionRatio,
|
||||||
|
CompactionRuns: c.compactionRuns,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -12,6 +12,7 @@ import (
|
|||||||
"github.com/dgraph-io/dgo/v230"
|
"github.com/dgraph-io/dgo/v230"
|
||||||
"github.com/dgraph-io/dgo/v230/protos/api"
|
"github.com/dgraph-io/dgo/v230/protos/api"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
|
"next.orly.dev/pkg/encoders/filter"
|
||||||
"google.golang.org/grpc/credentials/insecure"
|
"google.golang.org/grpc/credentials/insecure"
|
||||||
"lol.mleku.dev"
|
"lol.mleku.dev"
|
||||||
"lol.mleku.dev/chk"
|
"lol.mleku.dev/chk"
|
||||||
@@ -283,3 +284,6 @@ func (d *D) warmup() {
|
|||||||
// Just give a brief moment for any background processes to settle
|
// Just give a brief moment for any background processes to settle
|
||||||
d.Logger.Infof("dgraph database warmup complete, ready to serve requests")
|
d.Logger.Infof("dgraph database warmup complete, ready to serve requests")
|
||||||
}
|
}
|
||||||
|
func (d *D) GetCachedJSON(f *filter.F) ([][]byte, bool) { return nil, false }
|
||||||
|
func (d *D) CacheMarshaledJSON(f *filter.F, marshaledJSON [][]byte) {}
|
||||||
|
func (d *D) InvalidateQueryCache() {}
|
||||||
|
|||||||
Reference in New Issue
Block a user