diff --git a/app/handle-req.go b/app/handle-req.go index 4b7b0cc..77a6dc8 100644 --- a/app/handle-req.go +++ b/app/handle-req.go @@ -155,11 +155,15 @@ func (l *Listener) HandleReq(msg []byte) (err error) { // Multi-filter queries are not cached as they're more complex if len(*env.Filters) == 1 && env.Filters != nil { f := (*env.Filters)[0] - if cachedJSON, found := l.DB.GetCachedJSON(f); found { - log.D.F("REQ %s: cache HIT, sending %d cached events", env.Subscription, len(cachedJSON)) - // Send cached JSON directly - for _, jsonEnvelope := range cachedJSON { - if _, err = l.Write(jsonEnvelope); err != nil { + if cachedEvents, found := l.DB.GetCachedEvents(f); found { + log.D.F("REQ %s: cache HIT, sending %d cached events", env.Subscription, len(cachedEvents)) + // Wrap cached events with current subscription ID + for _, ev := range cachedEvents { + var res *eventenvelope.Result + if res, err = eventenvelope.NewResultWith(env.Subscription, ev); chk.E(err) { + return + } + if err = res.Write(l); err != nil { if !strings.Contains(err.Error(), "context canceled") { chk.E(err) } @@ -171,7 +175,7 @@ func (l *Listener) HandleReq(msg []byte) (err error) { return } // Don't create subscription for cached results with satisfied limits - if f.Limit != nil && len(cachedJSON) >= int(*f.Limit) { + if f.Limit != nil && len(cachedEvents) >= int(*f.Limit) { log.D.F("REQ %s: limit satisfied by cache, not creating subscription", env.Subscription) return } @@ -551,8 +555,7 @@ func (l *Listener) HandleReq(msg []byte) (err error) { events = privateFilteredEvents seen := make(map[string]struct{}) - // Collect marshaled JSON for caching (only for single-filter queries) - var marshaledForCache [][]byte + // Cache events for single-filter queries (without subscription ID) shouldCache := len(*env.Filters) == 1 && len(events) > 0 for _, ev := range events { @@ -576,17 +579,6 @@ func (l *Listener) HandleReq(msg []byte) (err error) { return } - // Get serialized envelope for caching - if shouldCache { - serialized := res.Marshal(nil) - if len(serialized) > 0 { - // Make a copy for the cache - cacheCopy := make([]byte, len(serialized)) - copy(cacheCopy, serialized) - marshaledForCache = append(marshaledForCache, cacheCopy) - } - } - if err = res.Write(l); err != nil { // Don't log context canceled errors as they're expected during shutdown if !strings.Contains(err.Error(), "context canceled") { @@ -599,10 +591,11 @@ func (l *Listener) HandleReq(msg []byte) (err error) { } // Populate cache after successfully sending all events - if shouldCache && len(marshaledForCache) > 0 { + // Cache the events themselves (not marshaled JSON with subscription ID) + if shouldCache && len(events) > 0 { f := (*env.Filters)[0] - l.DB.CacheMarshaledJSON(f, marshaledForCache) - log.D.F("REQ %s: cached %d marshaled events", env.Subscription, len(marshaledForCache)) + l.DB.CacheEvents(f, events) + log.D.F("REQ %s: cached %d events", env.Subscription, len(events)) } // write the EOSE to signal to the client that all events found have been // sent. diff --git a/pkg/database/database.go b/pkg/database/database.go index 5175da6..ab58658 100644 --- a/pkg/database/database.go +++ b/pkg/database/database.go @@ -13,6 +13,7 @@ import ( "lol.mleku.dev" "lol.mleku.dev/chk" "next.orly.dev/pkg/database/querycache" + "next.orly.dev/pkg/encoders/event" "next.orly.dev/pkg/encoders/filter" "next.orly.dev/pkg/utils/apputil" "next.orly.dev/pkg/utils/units" @@ -253,6 +254,22 @@ func (d *D) CacheMarshaledJSON(f *filter.F, marshaledJSON [][]byte) { } } +// GetCachedEvents retrieves cached events for a filter (without subscription ID) +// Returns nil, false if not found +func (d *D) GetCachedEvents(f *filter.F) (event.S, bool) { + if d.queryCache == nil { + return nil, false + } + return d.queryCache.GetEvents(f) +} + +// CacheEvents stores events for a filter (without subscription ID) +func (d *D) CacheEvents(f *filter.F, events event.S) { + if d.queryCache != nil && len(events) > 0 { + d.queryCache.PutEvents(f, events) + } +} + // Close releases resources and closes the database. func (d *D) Close() (err error) { if d.seq != nil { diff --git a/pkg/database/interface.go b/pkg/database/interface.go index 5af67f3..ebad4e6 100644 --- a/pkg/database/interface.go +++ b/pkg/database/interface.go @@ -100,6 +100,8 @@ type Database interface { // Query cache methods GetCachedJSON(f *filter.F) ([][]byte, bool) CacheMarshaledJSON(f *filter.F, marshaledJSON [][]byte) + GetCachedEvents(f *filter.F) (event.S, bool) + CacheEvents(f *filter.F, events event.S) InvalidateQueryCache() // Utility methods diff --git a/pkg/database/querycache/event_cache.go b/pkg/database/querycache/event_cache.go index 43b0ce9..930e12b 100644 --- a/pkg/database/querycache/event_cache.go +++ b/pkg/database/querycache/event_cache.go @@ -7,6 +7,7 @@ import ( "github.com/klauspost/compress/zstd" "lol.mleku.dev/log" + "next.orly.dev/pkg/encoders/event" "next.orly.dev/pkg/encoders/filter" ) @@ -400,3 +401,186 @@ func min(a, b int) int { } return b } + +// GetEvents retrieves cached events for a filter (decompresses and deserializes on the fly) +// This is the new method that returns event.E objects instead of marshaled JSON +func (c *EventCache) GetEvents(f *filter.F) (events []*event.E, found bool) { + // Normalize filter by sorting to ensure consistent cache keys + f.Sort() + filterKey := string(f.Serialize()) + + c.mu.RLock() + entry, exists := c.entries[filterKey] + if !exists { + c.mu.RUnlock() + c.mu.Lock() + c.misses++ + c.mu.Unlock() + return nil, false + } + + // Check if entry is expired + if time.Since(entry.CreatedAt) > c.maxAge { + c.mu.RUnlock() + c.mu.Lock() + c.removeEntry(entry) + c.misses++ + c.mu.Unlock() + return nil, false + } + + // Decompress + decompressed, err := c.decoder.DecodeAll(entry.CompressedData, nil) + c.mu.RUnlock() + if err != nil { + log.E.F("failed to decompress cached events: %v", err) + c.mu.Lock() + c.removeEntry(entry) + c.misses++ + c.mu.Unlock() + return nil, false + } + + // Deserialize events from newline-delimited JSON + events = make([]*event.E, 0, entry.EventCount) + start := 0 + for i, b := range decompressed { + if b == '\n' { + if i > start { + ev := event.New() + if _, err := ev.Unmarshal(decompressed[start:i]); err != nil { + log.E.F("failed to unmarshal cached event: %v", err) + c.mu.Lock() + c.removeEntry(entry) + c.misses++ + c.mu.Unlock() + return nil, false + } + events = append(events, ev) + } + start = i + 1 + } + } + + // Handle last event if no trailing newline + if start < len(decompressed) { + ev := event.New() + if _, err := ev.Unmarshal(decompressed[start:]); err != nil { + log.E.F("failed to unmarshal cached event: %v", err) + c.mu.Lock() + c.removeEntry(entry) + c.misses++ + c.mu.Unlock() + return nil, false + } + events = append(events, ev) + } + + // Update access time and move to front + c.mu.Lock() + entry.LastAccess = time.Now() + c.lruList.MoveToFront(entry.listElement) + c.hits++ + c.mu.Unlock() + + log.D.F("event cache HIT: filter=%s events=%d compressed=%d uncompressed=%d ratio=%.2f", + filterKey[:min(50, len(filterKey))], entry.EventCount, entry.CompressedSize, + entry.UncompressedSize, float64(entry.UncompressedSize)/float64(entry.CompressedSize)) + + return events, true +} + +// PutEvents stores events in the cache with ZSTD compression +// This should be called AFTER events are sent to the client +func (c *EventCache) PutEvents(f *filter.F, events []*event.E) { + if len(events) == 0 { + return + } + + // Normalize filter by sorting to ensure consistent cache keys + f.Sort() + filterKey := string(f.Serialize()) + + // Serialize all events as newline-delimited JSON for compression + totalSize := 0 + for _, ev := range events { + totalSize += ev.EstimateSize() + 1 // +1 for newline + } + + uncompressed := make([]byte, 0, totalSize) + for _, ev := range events { + uncompressed = ev.Marshal(uncompressed) + uncompressed = append(uncompressed, '\n') + } + + // Compress with ZSTD level 9 + compressed := c.encoder.EncodeAll(uncompressed, nil) + compressedSize := len(compressed) + + // Don't cache if compressed size is still too large + if int64(compressedSize) > c.maxSize { + log.W.F("event cache: compressed entry too large: %d bytes", compressedSize) + return + } + + c.mu.Lock() + defer c.mu.Unlock() + + // Check if already exists + if existing, exists := c.entries[filterKey]; exists { + c.currentSize -= int64(existing.CompressedSize) + existing.CompressedData = compressed + existing.UncompressedSize = len(uncompressed) + existing.CompressedSize = compressedSize + existing.EventCount = len(events) + existing.LastAccess = time.Now() + existing.CreatedAt = time.Now() + c.currentSize += int64(compressedSize) + c.lruList.MoveToFront(existing.listElement) + c.updateCompressionRatio(len(uncompressed), compressedSize) + log.T.F("event cache UPDATE: filter=%s events=%d ratio=%.2f", + filterKey[:min(50, len(filterKey))], len(events), + float64(len(uncompressed))/float64(compressedSize)) + return + } + + // Evict if necessary + evictionCount := 0 + for c.currentSize+int64(compressedSize) > c.maxSize && c.lruList.Len() > 0 { + oldest := c.lruList.Back() + if oldest != nil { + oldEntry := oldest.Value.(*EventCacheEntry) + c.removeEntry(oldEntry) + c.evictions++ + evictionCount++ + } + } + + if evictionCount > 0 { + c.needsCompaction = true + select { + case c.compactionChan <- struct{}{}: + default: + } + } + + // Create new entry + entry := &EventCacheEntry{ + FilterKey: filterKey, + CompressedData: compressed, + UncompressedSize: len(uncompressed), + CompressedSize: compressedSize, + EventCount: len(events), + LastAccess: time.Now(), + CreatedAt: time.Now(), + } + + entry.listElement = c.lruList.PushFront(entry) + c.entries[filterKey] = entry + c.currentSize += int64(compressedSize) + c.updateCompressionRatio(len(uncompressed), compressedSize) + + log.D.F("event cache PUT: filter=%s events=%d uncompressed=%d compressed=%d ratio=%.2f total=%d/%d", + filterKey[:min(50, len(filterKey))], len(events), len(uncompressed), compressedSize, + float64(len(uncompressed))/float64(compressedSize), c.currentSize, c.maxSize) +} diff --git a/pkg/dgraph/dgraph.go b/pkg/dgraph/dgraph.go index 16eddc9..f0b37c4 100644 --- a/pkg/dgraph/dgraph.go +++ b/pkg/dgraph/dgraph.go @@ -16,6 +16,7 @@ import ( "lol.mleku.dev" "lol.mleku.dev/chk" "next.orly.dev/pkg/database" + "next.orly.dev/pkg/encoders/event" "next.orly.dev/pkg/encoders/filter" "next.orly.dev/pkg/utils/apputil" ) @@ -283,4 +284,6 @@ func (d *D) warmup() { } func (d *D) GetCachedJSON(f *filter.F) ([][]byte, bool) { return nil, false } func (d *D) CacheMarshaledJSON(f *filter.F, marshaledJSON [][]byte) {} +func (d *D) GetCachedEvents(f *filter.F) (event.S, bool) { return nil, false } +func (d *D) CacheEvents(f *filter.F, events event.S) {} func (d *D) InvalidateQueryCache() {} diff --git a/pkg/neo4j/neo4j.go b/pkg/neo4j/neo4j.go index 065001b..e4f6ca3 100644 --- a/pkg/neo4j/neo4j.go +++ b/pkg/neo4j/neo4j.go @@ -13,6 +13,7 @@ import ( "lol.mleku.dev" "lol.mleku.dev/chk" "next.orly.dev/pkg/database" + "next.orly.dev/pkg/encoders/event" "next.orly.dev/pkg/encoders/filter" "next.orly.dev/pkg/utils/apputil" ) @@ -273,5 +274,11 @@ func (n *N) GetCachedJSON(f *filter.F) ([][]byte, bool) { return nil, false } // CacheMarshaledJSON caches marshaled JSON results (not implemented for Neo4j) func (n *N) CacheMarshaledJSON(f *filter.F, marshaledJSON [][]byte) {} +// GetCachedEvents retrieves cached events (not implemented for Neo4j) +func (n *N) GetCachedEvents(f *filter.F) (event.S, bool) { return nil, false } + +// CacheEvents caches events (not implemented for Neo4j) +func (n *N) CacheEvents(f *filter.F, events event.S) {} + // InvalidateQueryCache invalidates the query cache (not implemented for Neo4j) func (n *N) InvalidateQueryCache() {} diff --git a/pkg/version/version b/pkg/version/version index 1f64ca9..e362ce5 100644 --- a/pkg/version/version +++ b/pkg/version/version @@ -1 +1 @@ -v0.29.10 \ No newline at end of file +v0.29.11 \ No newline at end of file