Files
ratel/main.go
2024-09-13 17:55:31 +01:00

124 lines
3.3 KiB
Go

package ratel
import (
"encoding/binary"
"sync"
"time"
"ratel.mleku.dev/keys/index"
"ratel.mleku.dev/keys/serial"
eventstore "store.mleku.dev"
"util.mleku.dev/context"
)
type T struct {
Ctx context.T
WG *sync.WaitGroup
dataDir string
// DBSizeLimit is the number of bytes we want to keep the data store from exceeding.
DBSizeLimit int
// DBLowWater is the percentage of DBSizeLimit a GC run will reduce the used storage down
// to.
DBLowWater int
// DBHighWater is the trigger point at which a GC run should start if exceeded.
DBHighWater int
// GCFrequency is the frequency of checks of the current utilisation.
GCFrequency time.Duration
HasL2 bool
BlockCacheSize int
InitLogLevel int
Logger *logger
// DB is the badger db enveloper
*badger.DB
// seq is the monotonic collision free index for raw event storage.
seq *badger.Sequence
// Threads is how many CPU threads we dedicate to concurrent actions, flatten and GC mark
Threads int
// MaxLimit is a default limit that applies to a query without a limit, to avoid sending out
// too many events to a client from a malformed or excessively broad filter.
MaxLimit int
// ActuallyDelete sets whether we actually delete or rewrite deleted entries with a modified
// deleted prefix value (8th bit set)
ActuallyDelete bool
}
var _ eventstore.I = (*T)(nil)
// GetBackend returns a reasonably configured badger.Backend.
//
// The variadic params correspond to DBSizeLimit, DBLowWater, DBHighWater and
// GCFrequency as an integer multiplier of number of seconds.
//
// Note that the cancel function for the context needs to be managed by the
// caller.
func GetBackend(Ctx context.T, WG *sync.WaitGroup, path S, hasL2 bool,
blockCacheSize, logLevel,
maxLimit int, params ...int) (b *T) {
var sizeLimit, lw, hw, freq = 0, 86, 92, 60
switch len(params) {
case 4:
freq = params[3]
fallthrough
case 3:
hw = params[2]
fallthrough
case 2:
lw = params[1]
fallthrough
case 1:
sizeLimit = params[0]
}
// if unset, assume a safe maximum limit for unlimited filters.
if maxLimit == 0 {
maxLimit = 512
}
b = &T{
Ctx: Ctx,
WG: WG,
DBSizeLimit: sizeLimit,
DBLowWater: lw,
DBHighWater: hw,
GCFrequency: time.Duration(freq) * time.Second,
HasL2: hasL2,
BlockCacheSize: blockCacheSize,
InitLogLevel: logLevel,
MaxLimit: maxLimit,
dataDir: path,
}
return
}
func (r *T) Path() S { return r.dataDir }
// SerialKey returns a key used for storing events, and the raw serial counter
// bytes to copy into index keys.
func (r *T) SerialKey() (idx []byte, ser *serial.T) {
var err error
var s []byte
if s, err = r.SerialBytes(); Chk.E(err) {
panic(err)
}
ser = serial.New(s)
return index.Event.Key(ser), ser
}
// Serial returns the next monotonic conflict free unique serial on the database.
func (r *T) Serial() (ser uint64, err error) {
if ser, err = r.seq.Next(); Chk.E(err) {
}
// Log.T.F("serial %x", ser)
return
}
// SerialBytes returns a new serial value, used to store an event record with a
// conflict-free unique code (it is a monotonic, atomic, ascending counter).
func (r *T) SerialBytes() (ser []byte, err error) {
var serU64 uint64
if serU64, err = r.Serial(); Chk.E(err) {
panic(err)
}
ser = make([]byte, serial.Len)
binary.BigEndian.PutUint64(ser, serU64)
return
}