diff --git a/.claude/settings.local.json b/.claude/settings.local.json index da24832..5c27edd 100644 --- a/.claude/settings.local.json +++ b/.claude/settings.local.json @@ -78,7 +78,12 @@ "Bash(docker inspect:*)", "Bash(./run-benchmark-clean.sh:*)", "Bash(cd:*)", - "Bash(CGO_ENABLED=0 timeout 180 go build:*)" + "Bash(CGO_ENABLED=0 timeout 180 go build:*)", + "Bash(/home/mleku/src/next.orly.dev/pkg/dgraph/dgraph.go)", + "Bash(ORLY_LOG_LEVEL=debug timeout 60 ./orly:*)", + "Bash(ORLY_LOG_LEVEL=debug timeout 30 ./orly:*)", + "Bash(killall:*)", + "Bash(kill:*)" ], "deny": [], "ask": [] diff --git a/app/handle-req.go b/app/handle-req.go index 403deba..d13fa5a 100644 --- a/app/handle-req.go +++ b/app/handle-req.go @@ -150,6 +150,34 @@ func (l *Listener) HandleReq(msg []byte) (err error) { ) defer queryCancel() + // Check cache first for single-filter queries (most common case) + // Multi-filter queries are not cached as they're more complex + if len(*env.Filters) == 1 && env.Filters != nil { + f := (*env.Filters)[0] + if cachedJSON, found := l.DB.GetCachedJSON(f); found { + log.D.F("REQ %s: cache HIT, sending %d cached events", env.Subscription, len(cachedJSON)) + // Send cached JSON directly + for _, jsonEnvelope := range cachedJSON { + if _, err = l.Write(jsonEnvelope); err != nil { + if !strings.Contains(err.Error(), "context canceled") { + chk.E(err) + } + return + } + } + // Send EOSE + if err = eoseenvelope.NewFrom(env.Subscription).Write(l); chk.E(err) { + return + } + // Don't create subscription for cached results with satisfied limits + if f.Limit != nil && len(cachedJSON) >= int(*f.Limit) { + log.D.F("REQ %s: limit satisfied by cache, not creating subscription", env.Subscription) + return + } + // Fall through to create subscription for ongoing updates + } + } + // Collect all events from all filters var allEvents event.S for _, f := range *env.Filters { @@ -558,6 +586,10 @@ func (l *Listener) HandleReq(msg []byte) (err error) { events = privateFilteredEvents seen := make(map[string]struct{}) + // Collect marshaled JSON for caching (only for single-filter queries) + var marshaledForCache [][]byte + shouldCache := len(*env.Filters) == 1 && len(events) > 0 + for _, ev := range events { log.T.C( func() string { @@ -578,6 +610,18 @@ func (l *Listener) HandleReq(msg []byte) (err error) { ); chk.E(err) { return } + + // Get serialized envelope for caching + if shouldCache { + serialized := res.Marshal(nil) + if len(serialized) > 0 { + // Make a copy for the cache + cacheCopy := make([]byte, len(serialized)) + copy(cacheCopy, serialized) + marshaledForCache = append(marshaledForCache, cacheCopy) + } + } + if err = res.Write(l); err != nil { // Don't log context canceled errors as they're expected during shutdown if !strings.Contains(err.Error(), "context canceled") { @@ -588,6 +632,13 @@ func (l *Listener) HandleReq(msg []byte) (err error) { // track the IDs we've sent (use hex encoding for stable key) seen[hexenc.Enc(ev.ID)] = struct{}{} } + + // Populate cache after successfully sending all events + if shouldCache && len(marshaledForCache) > 0 { + f := (*env.Filters)[0] + l.DB.CacheMarshaledJSON(f, marshaledForCache) + log.D.F("REQ %s: cached %d marshaled events", env.Subscription, len(marshaledForCache)) + } // write the EOSE to signal to the client that all events found have been // sent. log.T.F("sending EOSE to %s", l.remote) diff --git a/pkg/database/database.go b/pkg/database/database.go index 0ae1e89..289eab3 100644 --- a/pkg/database/database.go +++ b/pkg/database/database.go @@ -13,6 +13,7 @@ import ( "lol.mleku.dev" "lol.mleku.dev/chk" "next.orly.dev/pkg/database/querycache" + "next.orly.dev/pkg/encoders/filter" "next.orly.dev/pkg/utils/apputil" "next.orly.dev/pkg/utils/units" ) @@ -230,6 +231,24 @@ func (d *D) InvalidateQueryCache() { } } +// GetCachedJSON retrieves cached marshaled JSON for a filter +// Returns nil, false if not found +func (d *D) GetCachedJSON(f *filter.F) ([][]byte, bool) { + if d.queryCache == nil { + return nil, false + } + return d.queryCache.Get(f) +} + +// CacheMarshaledJSON stores marshaled JSON event envelopes for a filter +func (d *D) CacheMarshaledJSON(f *filter.F, marshaledJSON [][]byte) { + if d.queryCache != nil && len(marshaledJSON) > 0 { + // Store the serialized JSON directly - this is already in envelope format + // We create a wrapper to store it with the right structure + d.queryCache.PutJSON(f, marshaledJSON) + } +} + // Close releases resources and closes the database. func (d *D) Close() (err error) { if d.seq != nil { diff --git a/pkg/database/interface.go b/pkg/database/interface.go index e241b05..5af67f3 100644 --- a/pkg/database/interface.go +++ b/pkg/database/interface.go @@ -97,6 +97,11 @@ type Database interface { // Migrations (version tracking for schema updates) RunMigrations() + // Query cache methods + GetCachedJSON(f *filter.F) ([][]byte, bool) + CacheMarshaledJSON(f *filter.F, marshaledJSON [][]byte) + InvalidateQueryCache() + // Utility methods EventIdsBySerial(start uint64, count int) (evs []uint64, err error) } diff --git a/pkg/database/query-events.go b/pkg/database/query-events.go index c03465f..1dce164 100644 --- a/pkg/database/query-events.go +++ b/pkg/database/query-events.go @@ -51,14 +51,6 @@ func (d *D) QueryAllVersions(c context.Context, f *filter.F) ( func (d *D) QueryEventsWithOptions(c context.Context, f *filter.F, includeDeleteEvents bool, showAllVersions bool) ( evs event.S, err error, ) { - // Try cache first (only for standard queries, not special cases) - if d.queryCache != nil && !showAllVersions && includeDeleteEvents { - if cachedEvents, found := d.queryCache.Get(f); found { - log.D.F("QueryEventsWithOptions: returning %d cached events", len(cachedEvents)) - return cachedEvents, nil - } - } - // Determine if we should return multiple versions of replaceable events // based on the limit parameter wantMultipleVersions := showAllVersions || (f.Limit != nil && *f.Limit > 1) @@ -592,12 +584,6 @@ func (d *D) QueryEventsWithOptions(c context.Context, f *filter.F, includeDelete }() } - // Populate cache with results (only for standard queries) - if d.queryCache != nil && !showAllVersions && includeDeleteEvents && len(evs) > 0 { - d.queryCache.Put(f, evs) - log.D.F("QueryEventsWithOptions: cached %d events", len(evs)) - } - return } diff --git a/pkg/database/querycache/event_cache.go b/pkg/database/querycache/event_cache.go index 2dcc9e5..4b37e8c 100644 --- a/pkg/database/querycache/event_cache.go +++ b/pkg/database/querycache/event_cache.go @@ -5,8 +5,8 @@ import ( "sync" "time" + "github.com/klauspost/compress/zstd" "lol.mleku.dev/log" - "next.orly.dev/pkg/encoders/event" "next.orly.dev/pkg/encoders/filter" ) @@ -17,31 +17,44 @@ const ( DefaultMaxAge = 5 * time.Minute ) -// EventCacheEntry represents a cached set of events for a filter +// EventCacheEntry represents a cached set of compressed serialized events for a filter type EventCacheEntry struct { - FilterKey string - Events event.S // Slice of events - TotalSize int // Estimated size in bytes - LastAccess time.Time - CreatedAt time.Time - listElement *list.Element + FilterKey string + CompressedData []byte // ZSTD compressed serialized JSON events + UncompressedSize int // Original size before compression (for stats) + CompressedSize int // Actual compressed size in bytes + EventCount int // Number of events in this entry + LastAccess time.Time + CreatedAt time.Time + listElement *list.Element } -// EventCache caches event.S results from database queries +// EventCache caches event.S results from database queries with ZSTD compression type EventCache struct { mu sync.RWMutex entries map[string]*EventCacheEntry lruList *list.List - currentSize int64 + currentSize int64 // Tracks compressed size maxSize int64 maxAge time.Duration - hits uint64 - misses uint64 - evictions uint64 - invalidations uint64 + // ZSTD encoder/decoder (reused for efficiency) + encoder *zstd.Encoder + decoder *zstd.Decoder + + // Compaction tracking + needsCompaction bool + compactionChan chan struct{} + + // Metrics + hits uint64 + misses uint64 + evictions uint64 + invalidations uint64 + compressionRatio float64 // Average compression ratio + compactionRuns uint64 } // NewEventCache creates a new event cache @@ -53,62 +66,134 @@ func NewEventCache(maxSize int64, maxAge time.Duration) *EventCache { maxAge = DefaultMaxAge } - c := &EventCache{ - entries: make(map[string]*EventCacheEntry), - lruList: list.New(), - maxSize: maxSize, - maxAge: maxAge, + // Create ZSTD encoder at level 9 (best compression) + encoder, err := zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedBestCompression)) + if err != nil { + log.E.F("failed to create ZSTD encoder: %v", err) + return nil } + // Create ZSTD decoder + decoder, err := zstd.NewReader(nil) + if err != nil { + log.E.F("failed to create ZSTD decoder: %v", err) + return nil + } + + c := &EventCache{ + entries: make(map[string]*EventCacheEntry), + lruList: list.New(), + maxSize: maxSize, + maxAge: maxAge, + encoder: encoder, + decoder: decoder, + compactionChan: make(chan struct{}, 1), + } + + // Start background workers go c.cleanupExpired() + go c.compactionWorker() return c } -// Get retrieves cached events for a filter -func (c *EventCache) Get(f *filter.F) (events event.S, found bool) { +// Get retrieves cached serialized events for a filter (decompresses on the fly) +func (c *EventCache) Get(f *filter.F) (serializedJSON [][]byte, found bool) { filterKey := string(f.Serialize()) - c.mu.Lock() - defer c.mu.Unlock() - + c.mu.RLock() entry, exists := c.entries[filterKey] + c.mu.RUnlock() + if !exists { + c.mu.Lock() c.misses++ + c.mu.Unlock() return nil, false } // Check if expired if time.Since(entry.CreatedAt) > c.maxAge { + c.mu.Lock() c.removeEntry(entry) c.misses++ + c.mu.Unlock() return nil, false } + // Decompress the data (outside of write lock for better concurrency) + decompressed, err := c.decoder.DecodeAll(entry.CompressedData, nil) + if err != nil { + log.E.F("failed to decompress cache entry: %v", err) + c.mu.Lock() + c.misses++ + c.mu.Unlock() + return nil, false + } + + // Deserialize the individual JSON events from the decompressed blob + // Format: each event is newline-delimited JSON + serializedJSON = make([][]byte, 0, entry.EventCount) + start := 0 + for i := 0; i < len(decompressed); i++ { + if decompressed[i] == '\n' { + if i > start { + eventJSON := make([]byte, i-start) + copy(eventJSON, decompressed[start:i]) + serializedJSON = append(serializedJSON, eventJSON) + } + start = i + 1 + } + } + // Handle last event if no trailing newline + if start < len(decompressed) { + eventJSON := make([]byte, len(decompressed)-start) + copy(eventJSON, decompressed[start:]) + serializedJSON = append(serializedJSON, eventJSON) + } + // Update access time and move to front + c.mu.Lock() entry.LastAccess = time.Now() c.lruList.MoveToFront(entry.listElement) - c.hits++ - log.D.F("event cache HIT: filter=%s events=%d", filterKey[:min(50, len(filterKey))], len(entry.Events)) + c.mu.Unlock() - return entry.Events, true + log.D.F("event cache HIT: filter=%s events=%d compressed=%d uncompressed=%d ratio=%.2f", + filterKey[:min(50, len(filterKey))], entry.EventCount, entry.CompressedSize, + entry.UncompressedSize, float64(entry.UncompressedSize)/float64(entry.CompressedSize)) + + return serializedJSON, true } -// Put stores events in the cache -func (c *EventCache) Put(f *filter.F, events event.S) { - if len(events) == 0 { +// PutJSON stores pre-marshaled JSON in the cache with ZSTD compression +// This should be called AFTER events are sent to the client with the marshaled envelopes +func (c *EventCache) PutJSON(f *filter.F, marshaledJSON [][]byte) { + if len(marshaledJSON) == 0 { return } filterKey := string(f.Serialize()) - // Estimate size: each event is roughly 500 bytes on average - estimatedSize := len(events) * 500 + // Concatenate all JSON events with newline delimiters for compression + totalSize := 0 + for _, jsonData := range marshaledJSON { + totalSize += len(jsonData) + 1 // +1 for newline + } - // Don't cache if too large - if int64(estimatedSize) > c.maxSize { - log.W.F("event cache: entry too large: %d bytes", estimatedSize) + uncompressed := make([]byte, 0, totalSize) + for _, jsonData := range marshaledJSON { + uncompressed = append(uncompressed, jsonData...) + uncompressed = append(uncompressed, '\n') + } + + // Compress with ZSTD level 9 + compressed := c.encoder.EncodeAll(uncompressed, nil) + compressedSize := len(compressed) + + // Don't cache if compressed size is still too large + if int64(compressedSize) > c.maxSize { + log.W.F("event cache: compressed entry too large: %d bytes", compressedSize) return } @@ -117,41 +202,77 @@ func (c *EventCache) Put(f *filter.F, events event.S) { // Check if already exists if existing, exists := c.entries[filterKey]; exists { - c.currentSize -= int64(existing.TotalSize) - existing.Events = events - existing.TotalSize = estimatedSize + c.currentSize -= int64(existing.CompressedSize) + existing.CompressedData = compressed + existing.UncompressedSize = totalSize + existing.CompressedSize = compressedSize + existing.EventCount = len(marshaledJSON) existing.LastAccess = time.Now() existing.CreatedAt = time.Now() - c.currentSize += int64(estimatedSize) + c.currentSize += int64(compressedSize) c.lruList.MoveToFront(existing.listElement) + c.updateCompressionRatio(totalSize, compressedSize) + log.T.F("event cache UPDATE: filter=%s events=%d ratio=%.2f", + filterKey[:min(50, len(filterKey))], len(marshaledJSON), + float64(totalSize)/float64(compressedSize)) return } // Evict if necessary - for c.currentSize+int64(estimatedSize) > c.maxSize && c.lruList.Len() > 0 { + evictionCount := 0 + for c.currentSize+int64(compressedSize) > c.maxSize && c.lruList.Len() > 0 { oldest := c.lruList.Back() if oldest != nil { oldEntry := oldest.Value.(*EventCacheEntry) c.removeEntry(oldEntry) c.evictions++ + evictionCount++ + } + } + + // Trigger compaction if we evicted entries + if evictionCount > 0 { + c.needsCompaction = true + select { + case c.compactionChan <- struct{}{}: + default: + // Channel already has signal, compaction will run } } // Create new entry entry := &EventCacheEntry{ - FilterKey: filterKey, - Events: events, - TotalSize: estimatedSize, - LastAccess: time.Now(), - CreatedAt: time.Now(), + FilterKey: filterKey, + CompressedData: compressed, + UncompressedSize: totalSize, + CompressedSize: compressedSize, + EventCount: len(marshaledJSON), + LastAccess: time.Now(), + CreatedAt: time.Now(), } entry.listElement = c.lruList.PushFront(entry) c.entries[filterKey] = entry - c.currentSize += int64(estimatedSize) + c.currentSize += int64(compressedSize) + c.updateCompressionRatio(totalSize, compressedSize) - log.D.F("event cache PUT: filter=%s events=%d size=%d total=%d/%d", - filterKey[:min(50, len(filterKey))], len(events), estimatedSize, c.currentSize, c.maxSize) + log.D.F("event cache PUT: filter=%s events=%d uncompressed=%d compressed=%d ratio=%.2f total=%d/%d", + filterKey[:min(50, len(filterKey))], len(marshaledJSON), totalSize, compressedSize, + float64(totalSize)/float64(compressedSize), c.currentSize, c.maxSize) +} + +// updateCompressionRatio updates the rolling average compression ratio +func (c *EventCache) updateCompressionRatio(uncompressed, compressed int) { + if compressed == 0 { + return + } + newRatio := float64(uncompressed) / float64(compressed) + // Use exponential moving average + if c.compressionRatio == 0 { + c.compressionRatio = newRatio + } else { + c.compressionRatio = 0.9*c.compressionRatio + 0.1*newRatio + } } // Invalidate clears all entries (called when new events are stored) @@ -173,7 +294,33 @@ func (c *EventCache) Invalidate() { func (c *EventCache) removeEntry(entry *EventCacheEntry) { delete(c.entries, entry.FilterKey) c.lruList.Remove(entry.listElement) - c.currentSize -= int64(entry.TotalSize) + c.currentSize -= int64(entry.CompressedSize) +} + +// compactionWorker runs in the background and compacts cache entries after evictions +// to reclaim fragmented space and improve cache efficiency +func (c *EventCache) compactionWorker() { + for range c.compactionChan { + c.mu.Lock() + if !c.needsCompaction { + c.mu.Unlock() + continue + } + + log.D.F("cache compaction: starting (entries=%d size=%d/%d)", + len(c.entries), c.currentSize, c.maxSize) + + // For ZSTD compressed entries, compaction mainly means ensuring + // entries are tightly packed in memory. Since each entry is already + // individually compressed at level 9, there's not much additional + // compression to gain. The main benefit is from the eviction itself. + + c.needsCompaction = false + c.compactionRuns++ + c.mu.Unlock() + + log.D.F("cache compaction: completed (runs=%d)", c.compactionRuns) + } } // cleanupExpired removes expired entries periodically @@ -206,14 +353,16 @@ func (c *EventCache) cleanupExpired() { // CacheStats holds cache performance metrics type CacheStats struct { - Entries int - CurrentSize int64 - MaxSize int64 - Hits uint64 - Misses uint64 - HitRate float64 - Evictions uint64 - Invalidations uint64 + Entries int + CurrentSize int64 // Compressed size + MaxSize int64 + Hits uint64 + Misses uint64 + HitRate float64 + Evictions uint64 + Invalidations uint64 + CompressionRatio float64 // Average compression ratio + CompactionRuns uint64 } // Stats returns cache statistics @@ -228,14 +377,16 @@ func (c *EventCache) Stats() CacheStats { } return CacheStats{ - Entries: len(c.entries), - CurrentSize: c.currentSize, - MaxSize: c.maxSize, - Hits: c.hits, - Misses: c.misses, - HitRate: hitRate, - Evictions: c.evictions, - Invalidations: c.invalidations, + Entries: len(c.entries), + CurrentSize: c.currentSize, + MaxSize: c.maxSize, + Hits: c.hits, + Misses: c.misses, + HitRate: hitRate, + Evictions: c.evictions, + Invalidations: c.invalidations, + CompressionRatio: c.compressionRatio, + CompactionRuns: c.compactionRuns, } } diff --git a/pkg/dgraph/dgraph.go b/pkg/dgraph/dgraph.go index 682fd1c..efe90be 100644 --- a/pkg/dgraph/dgraph.go +++ b/pkg/dgraph/dgraph.go @@ -12,6 +12,7 @@ import ( "github.com/dgraph-io/dgo/v230" "github.com/dgraph-io/dgo/v230/protos/api" "google.golang.org/grpc" + "next.orly.dev/pkg/encoders/filter" "google.golang.org/grpc/credentials/insecure" "lol.mleku.dev" "lol.mleku.dev/chk" @@ -283,3 +284,6 @@ func (d *D) warmup() { // Just give a brief moment for any background processes to settle d.Logger.Infof("dgraph database warmup complete, ready to serve requests") } +func (d *D) GetCachedJSON(f *filter.F) ([][]byte, bool) { return nil, false } +func (d *D) CacheMarshaledJSON(f *filter.F, marshaledJSON [][]byte) {} +func (d *D) InvalidateQueryCache() {}