fix cache to disregard subscription ids
Some checks failed
Go / build-and-release (push) Has been cancelled
Some checks failed
Go / build-and-release (push) Has been cancelled
This commit is contained in:
@@ -155,11 +155,15 @@ func (l *Listener) HandleReq(msg []byte) (err error) {
|
||||
// Multi-filter queries are not cached as they're more complex
|
||||
if len(*env.Filters) == 1 && env.Filters != nil {
|
||||
f := (*env.Filters)[0]
|
||||
if cachedJSON, found := l.DB.GetCachedJSON(f); found {
|
||||
log.D.F("REQ %s: cache HIT, sending %d cached events", env.Subscription, len(cachedJSON))
|
||||
// Send cached JSON directly
|
||||
for _, jsonEnvelope := range cachedJSON {
|
||||
if _, err = l.Write(jsonEnvelope); err != nil {
|
||||
if cachedEvents, found := l.DB.GetCachedEvents(f); found {
|
||||
log.D.F("REQ %s: cache HIT, sending %d cached events", env.Subscription, len(cachedEvents))
|
||||
// Wrap cached events with current subscription ID
|
||||
for _, ev := range cachedEvents {
|
||||
var res *eventenvelope.Result
|
||||
if res, err = eventenvelope.NewResultWith(env.Subscription, ev); chk.E(err) {
|
||||
return
|
||||
}
|
||||
if err = res.Write(l); err != nil {
|
||||
if !strings.Contains(err.Error(), "context canceled") {
|
||||
chk.E(err)
|
||||
}
|
||||
@@ -171,7 +175,7 @@ func (l *Listener) HandleReq(msg []byte) (err error) {
|
||||
return
|
||||
}
|
||||
// Don't create subscription for cached results with satisfied limits
|
||||
if f.Limit != nil && len(cachedJSON) >= int(*f.Limit) {
|
||||
if f.Limit != nil && len(cachedEvents) >= int(*f.Limit) {
|
||||
log.D.F("REQ %s: limit satisfied by cache, not creating subscription", env.Subscription)
|
||||
return
|
||||
}
|
||||
@@ -551,8 +555,7 @@ func (l *Listener) HandleReq(msg []byte) (err error) {
|
||||
events = privateFilteredEvents
|
||||
|
||||
seen := make(map[string]struct{})
|
||||
// Collect marshaled JSON for caching (only for single-filter queries)
|
||||
var marshaledForCache [][]byte
|
||||
// Cache events for single-filter queries (without subscription ID)
|
||||
shouldCache := len(*env.Filters) == 1 && len(events) > 0
|
||||
|
||||
for _, ev := range events {
|
||||
@@ -576,17 +579,6 @@ func (l *Listener) HandleReq(msg []byte) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
// Get serialized envelope for caching
|
||||
if shouldCache {
|
||||
serialized := res.Marshal(nil)
|
||||
if len(serialized) > 0 {
|
||||
// Make a copy for the cache
|
||||
cacheCopy := make([]byte, len(serialized))
|
||||
copy(cacheCopy, serialized)
|
||||
marshaledForCache = append(marshaledForCache, cacheCopy)
|
||||
}
|
||||
}
|
||||
|
||||
if err = res.Write(l); err != nil {
|
||||
// Don't log context canceled errors as they're expected during shutdown
|
||||
if !strings.Contains(err.Error(), "context canceled") {
|
||||
@@ -599,10 +591,11 @@ func (l *Listener) HandleReq(msg []byte) (err error) {
|
||||
}
|
||||
|
||||
// Populate cache after successfully sending all events
|
||||
if shouldCache && len(marshaledForCache) > 0 {
|
||||
// Cache the events themselves (not marshaled JSON with subscription ID)
|
||||
if shouldCache && len(events) > 0 {
|
||||
f := (*env.Filters)[0]
|
||||
l.DB.CacheMarshaledJSON(f, marshaledForCache)
|
||||
log.D.F("REQ %s: cached %d marshaled events", env.Subscription, len(marshaledForCache))
|
||||
l.DB.CacheEvents(f, events)
|
||||
log.D.F("REQ %s: cached %d events", env.Subscription, len(events))
|
||||
}
|
||||
// write the EOSE to signal to the client that all events found have been
|
||||
// sent.
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
"lol.mleku.dev"
|
||||
"lol.mleku.dev/chk"
|
||||
"next.orly.dev/pkg/database/querycache"
|
||||
"next.orly.dev/pkg/encoders/event"
|
||||
"next.orly.dev/pkg/encoders/filter"
|
||||
"next.orly.dev/pkg/utils/apputil"
|
||||
"next.orly.dev/pkg/utils/units"
|
||||
@@ -253,6 +254,22 @@ func (d *D) CacheMarshaledJSON(f *filter.F, marshaledJSON [][]byte) {
|
||||
}
|
||||
}
|
||||
|
||||
// GetCachedEvents retrieves cached events for a filter (without subscription ID)
|
||||
// Returns nil, false if not found
|
||||
func (d *D) GetCachedEvents(f *filter.F) (event.S, bool) {
|
||||
if d.queryCache == nil {
|
||||
return nil, false
|
||||
}
|
||||
return d.queryCache.GetEvents(f)
|
||||
}
|
||||
|
||||
// CacheEvents stores events for a filter (without subscription ID)
|
||||
func (d *D) CacheEvents(f *filter.F, events event.S) {
|
||||
if d.queryCache != nil && len(events) > 0 {
|
||||
d.queryCache.PutEvents(f, events)
|
||||
}
|
||||
}
|
||||
|
||||
// Close releases resources and closes the database.
|
||||
func (d *D) Close() (err error) {
|
||||
if d.seq != nil {
|
||||
|
||||
@@ -100,6 +100,8 @@ type Database interface {
|
||||
// Query cache methods
|
||||
GetCachedJSON(f *filter.F) ([][]byte, bool)
|
||||
CacheMarshaledJSON(f *filter.F, marshaledJSON [][]byte)
|
||||
GetCachedEvents(f *filter.F) (event.S, bool)
|
||||
CacheEvents(f *filter.F, events event.S)
|
||||
InvalidateQueryCache()
|
||||
|
||||
// Utility methods
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
|
||||
"github.com/klauspost/compress/zstd"
|
||||
"lol.mleku.dev/log"
|
||||
"next.orly.dev/pkg/encoders/event"
|
||||
"next.orly.dev/pkg/encoders/filter"
|
||||
)
|
||||
|
||||
@@ -400,3 +401,186 @@ func min(a, b int) int {
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
||||
// GetEvents retrieves cached events for a filter (decompresses and deserializes on the fly)
|
||||
// This is the new method that returns event.E objects instead of marshaled JSON
|
||||
func (c *EventCache) GetEvents(f *filter.F) (events []*event.E, found bool) {
|
||||
// Normalize filter by sorting to ensure consistent cache keys
|
||||
f.Sort()
|
||||
filterKey := string(f.Serialize())
|
||||
|
||||
c.mu.RLock()
|
||||
entry, exists := c.entries[filterKey]
|
||||
if !exists {
|
||||
c.mu.RUnlock()
|
||||
c.mu.Lock()
|
||||
c.misses++
|
||||
c.mu.Unlock()
|
||||
return nil, false
|
||||
}
|
||||
|
||||
// Check if entry is expired
|
||||
if time.Since(entry.CreatedAt) > c.maxAge {
|
||||
c.mu.RUnlock()
|
||||
c.mu.Lock()
|
||||
c.removeEntry(entry)
|
||||
c.misses++
|
||||
c.mu.Unlock()
|
||||
return nil, false
|
||||
}
|
||||
|
||||
// Decompress
|
||||
decompressed, err := c.decoder.DecodeAll(entry.CompressedData, nil)
|
||||
c.mu.RUnlock()
|
||||
if err != nil {
|
||||
log.E.F("failed to decompress cached events: %v", err)
|
||||
c.mu.Lock()
|
||||
c.removeEntry(entry)
|
||||
c.misses++
|
||||
c.mu.Unlock()
|
||||
return nil, false
|
||||
}
|
||||
|
||||
// Deserialize events from newline-delimited JSON
|
||||
events = make([]*event.E, 0, entry.EventCount)
|
||||
start := 0
|
||||
for i, b := range decompressed {
|
||||
if b == '\n' {
|
||||
if i > start {
|
||||
ev := event.New()
|
||||
if _, err := ev.Unmarshal(decompressed[start:i]); err != nil {
|
||||
log.E.F("failed to unmarshal cached event: %v", err)
|
||||
c.mu.Lock()
|
||||
c.removeEntry(entry)
|
||||
c.misses++
|
||||
c.mu.Unlock()
|
||||
return nil, false
|
||||
}
|
||||
events = append(events, ev)
|
||||
}
|
||||
start = i + 1
|
||||
}
|
||||
}
|
||||
|
||||
// Handle last event if no trailing newline
|
||||
if start < len(decompressed) {
|
||||
ev := event.New()
|
||||
if _, err := ev.Unmarshal(decompressed[start:]); err != nil {
|
||||
log.E.F("failed to unmarshal cached event: %v", err)
|
||||
c.mu.Lock()
|
||||
c.removeEntry(entry)
|
||||
c.misses++
|
||||
c.mu.Unlock()
|
||||
return nil, false
|
||||
}
|
||||
events = append(events, ev)
|
||||
}
|
||||
|
||||
// Update access time and move to front
|
||||
c.mu.Lock()
|
||||
entry.LastAccess = time.Now()
|
||||
c.lruList.MoveToFront(entry.listElement)
|
||||
c.hits++
|
||||
c.mu.Unlock()
|
||||
|
||||
log.D.F("event cache HIT: filter=%s events=%d compressed=%d uncompressed=%d ratio=%.2f",
|
||||
filterKey[:min(50, len(filterKey))], entry.EventCount, entry.CompressedSize,
|
||||
entry.UncompressedSize, float64(entry.UncompressedSize)/float64(entry.CompressedSize))
|
||||
|
||||
return events, true
|
||||
}
|
||||
|
||||
// PutEvents stores events in the cache with ZSTD compression
|
||||
// This should be called AFTER events are sent to the client
|
||||
func (c *EventCache) PutEvents(f *filter.F, events []*event.E) {
|
||||
if len(events) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// Normalize filter by sorting to ensure consistent cache keys
|
||||
f.Sort()
|
||||
filterKey := string(f.Serialize())
|
||||
|
||||
// Serialize all events as newline-delimited JSON for compression
|
||||
totalSize := 0
|
||||
for _, ev := range events {
|
||||
totalSize += ev.EstimateSize() + 1 // +1 for newline
|
||||
}
|
||||
|
||||
uncompressed := make([]byte, 0, totalSize)
|
||||
for _, ev := range events {
|
||||
uncompressed = ev.Marshal(uncompressed)
|
||||
uncompressed = append(uncompressed, '\n')
|
||||
}
|
||||
|
||||
// Compress with ZSTD level 9
|
||||
compressed := c.encoder.EncodeAll(uncompressed, nil)
|
||||
compressedSize := len(compressed)
|
||||
|
||||
// Don't cache if compressed size is still too large
|
||||
if int64(compressedSize) > c.maxSize {
|
||||
log.W.F("event cache: compressed entry too large: %d bytes", compressedSize)
|
||||
return
|
||||
}
|
||||
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
// Check if already exists
|
||||
if existing, exists := c.entries[filterKey]; exists {
|
||||
c.currentSize -= int64(existing.CompressedSize)
|
||||
existing.CompressedData = compressed
|
||||
existing.UncompressedSize = len(uncompressed)
|
||||
existing.CompressedSize = compressedSize
|
||||
existing.EventCount = len(events)
|
||||
existing.LastAccess = time.Now()
|
||||
existing.CreatedAt = time.Now()
|
||||
c.currentSize += int64(compressedSize)
|
||||
c.lruList.MoveToFront(existing.listElement)
|
||||
c.updateCompressionRatio(len(uncompressed), compressedSize)
|
||||
log.T.F("event cache UPDATE: filter=%s events=%d ratio=%.2f",
|
||||
filterKey[:min(50, len(filterKey))], len(events),
|
||||
float64(len(uncompressed))/float64(compressedSize))
|
||||
return
|
||||
}
|
||||
|
||||
// Evict if necessary
|
||||
evictionCount := 0
|
||||
for c.currentSize+int64(compressedSize) > c.maxSize && c.lruList.Len() > 0 {
|
||||
oldest := c.lruList.Back()
|
||||
if oldest != nil {
|
||||
oldEntry := oldest.Value.(*EventCacheEntry)
|
||||
c.removeEntry(oldEntry)
|
||||
c.evictions++
|
||||
evictionCount++
|
||||
}
|
||||
}
|
||||
|
||||
if evictionCount > 0 {
|
||||
c.needsCompaction = true
|
||||
select {
|
||||
case c.compactionChan <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// Create new entry
|
||||
entry := &EventCacheEntry{
|
||||
FilterKey: filterKey,
|
||||
CompressedData: compressed,
|
||||
UncompressedSize: len(uncompressed),
|
||||
CompressedSize: compressedSize,
|
||||
EventCount: len(events),
|
||||
LastAccess: time.Now(),
|
||||
CreatedAt: time.Now(),
|
||||
}
|
||||
|
||||
entry.listElement = c.lruList.PushFront(entry)
|
||||
c.entries[filterKey] = entry
|
||||
c.currentSize += int64(compressedSize)
|
||||
c.updateCompressionRatio(len(uncompressed), compressedSize)
|
||||
|
||||
log.D.F("event cache PUT: filter=%s events=%d uncompressed=%d compressed=%d ratio=%.2f total=%d/%d",
|
||||
filterKey[:min(50, len(filterKey))], len(events), len(uncompressed), compressedSize,
|
||||
float64(len(uncompressed))/float64(compressedSize), c.currentSize, c.maxSize)
|
||||
}
|
||||
|
||||
@@ -16,6 +16,7 @@ import (
|
||||
"lol.mleku.dev"
|
||||
"lol.mleku.dev/chk"
|
||||
"next.orly.dev/pkg/database"
|
||||
"next.orly.dev/pkg/encoders/event"
|
||||
"next.orly.dev/pkg/encoders/filter"
|
||||
"next.orly.dev/pkg/utils/apputil"
|
||||
)
|
||||
@@ -283,4 +284,6 @@ func (d *D) warmup() {
|
||||
}
|
||||
func (d *D) GetCachedJSON(f *filter.F) ([][]byte, bool) { return nil, false }
|
||||
func (d *D) CacheMarshaledJSON(f *filter.F, marshaledJSON [][]byte) {}
|
||||
func (d *D) GetCachedEvents(f *filter.F) (event.S, bool) { return nil, false }
|
||||
func (d *D) CacheEvents(f *filter.F, events event.S) {}
|
||||
func (d *D) InvalidateQueryCache() {}
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
"lol.mleku.dev"
|
||||
"lol.mleku.dev/chk"
|
||||
"next.orly.dev/pkg/database"
|
||||
"next.orly.dev/pkg/encoders/event"
|
||||
"next.orly.dev/pkg/encoders/filter"
|
||||
"next.orly.dev/pkg/utils/apputil"
|
||||
)
|
||||
@@ -273,5 +274,11 @@ func (n *N) GetCachedJSON(f *filter.F) ([][]byte, bool) { return nil, false }
|
||||
// CacheMarshaledJSON caches marshaled JSON results (not implemented for Neo4j)
|
||||
func (n *N) CacheMarshaledJSON(f *filter.F, marshaledJSON [][]byte) {}
|
||||
|
||||
// GetCachedEvents retrieves cached events (not implemented for Neo4j)
|
||||
func (n *N) GetCachedEvents(f *filter.F) (event.S, bool) { return nil, false }
|
||||
|
||||
// CacheEvents caches events (not implemented for Neo4j)
|
||||
func (n *N) CacheEvents(f *filter.F, events event.S) {}
|
||||
|
||||
// InvalidateQueryCache invalidates the query cache (not implemented for Neo4j)
|
||||
func (n *N) InvalidateQueryCache() {}
|
||||
|
||||
@@ -1 +1 @@
|
||||
v0.29.10
|
||||
v0.29.11
|
||||
Reference in New Issue
Block a user