add dgraph backend to benchmark suite with safe type assertions for multi-backend support

This commit is contained in:
2025-11-17 16:52:38 +00:00
parent 86481a42e8
commit 038d1959ed
26 changed files with 2717 additions and 186 deletions

View File

@@ -2,7 +2,9 @@ package dgraph
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/dgraph-io/dgo/v230/protos/api"
"next.orly.dev/pkg/database/indexes/types"
@@ -98,13 +100,83 @@ func (d *D) DeleteEventBySerial(c context.Context, ser *types.Uint40, ev *event.
return nil
}
// DeleteExpired removes events that have passed their expiration time
// DeleteExpired removes events that have passed their expiration time (NIP-40)
func (d *D) DeleteExpired() {
// Query for events with expiration tags
// This is a stub - full implementation would:
// 1. Find events with "expiration" tag
// 2. Check if current time > expiration time
// 3. Delete those events
// Query for events that have an "expiration" tag
// NIP-40: events should have a tag ["expiration", "<unix timestamp>"]
query := `{
events(func: has(event.tags)) {
uid
event.id
event.tags
event.created_at
}
}`
resp, err := d.Query(context.Background(), query)
if err != nil {
d.Logger.Errorf("failed to query events for expiration: %v", err)
return
}
var result struct {
Events []struct {
UID string `json:"uid"`
ID string `json:"event.id"`
Tags string `json:"event.tags"`
CreatedAt int64 `json:"event.created_at"`
} `json:"events"`
}
if err = unmarshalJSON(resp.Json, &result); err != nil {
d.Logger.Errorf("failed to parse events for expiration: %v", err)
return
}
now := time.Now().Unix()
deletedCount := 0
for _, ev := range result.Events {
// Parse tags
if ev.Tags == "" {
continue
}
var tags [][]string
if err := json.Unmarshal([]byte(ev.Tags), &tags); err != nil {
continue
}
// Look for expiration tag
var expirationTime int64
for _, tag := range tags {
if len(tag) >= 2 && tag[0] == "expiration" {
// Parse expiration timestamp
if _, err := fmt.Sscanf(tag[1], "%d", &expirationTime); err != nil {
continue
}
break
}
}
// If expiration time found and passed, delete the event
if expirationTime > 0 && now > expirationTime {
mutation := &api.Mutation{
DelNquads: []byte(fmt.Sprintf("<%s> * * .", ev.UID)),
CommitNow: true,
}
if _, err := d.Mutate(context.Background(), mutation); err != nil {
d.Logger.Warningf("failed to delete expired event %s: %v", ev.ID, err)
} else {
deletedCount++
}
}
}
if deletedCount > 0 {
d.Logger.Infof("deleted %d expired events", deletedCount)
}
}
// ProcessDelete processes a kind 5 deletion event

View File

@@ -4,6 +4,7 @@ package dgraph
import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
@@ -12,11 +13,11 @@ import (
"github.com/dgraph-io/dgo/v230"
"github.com/dgraph-io/dgo/v230/protos/api"
"google.golang.org/grpc"
"next.orly.dev/pkg/encoders/filter"
"google.golang.org/grpc/credentials/insecure"
"lol.mleku.dev"
"lol.mleku.dev/chk"
"next.orly.dev/pkg/database"
"next.orly.dev/pkg/encoders/filter"
"next.orly.dev/pkg/utils/apputil"
)
@@ -198,8 +199,11 @@ func (d *D) Mutate(ctx context.Context, mutation *api.Mutation) (*api.Response,
return nil, fmt.Errorf("dgraph mutation failed: %w", err)
}
if err := txn.Commit(ctx); err != nil {
return nil, fmt.Errorf("dgraph commit failed: %w", err)
// Only commit if CommitNow is false (mutation didn't auto-commit)
if !mutation.CommitNow {
if err := txn.Commit(ctx); err != nil {
return nil, fmt.Errorf("dgraph commit failed: %w", err)
}
}
return resp, nil
@@ -256,12 +260,38 @@ func (d *D) SetLogLevel(level string) {
// d.Logger.SetLevel(lol.GetLogLevel(level))
}
// EventIdsBySerial retrieves event IDs by serial range (stub)
// EventIdsBySerial retrieves event IDs by serial range
func (d *D) EventIdsBySerial(start uint64, count int) (
evs []uint64, err error,
) {
err = fmt.Errorf("not implemented")
return
// Query for events in the specified serial range
query := fmt.Sprintf(`{
events(func: ge(event.serial, %d), orderdesc: event.serial, first: %d) {
event.serial
}
}`, start, count)
resp, err := d.Query(context.Background(), query)
if err != nil {
return nil, fmt.Errorf("failed to query event IDs by serial: %w", err)
}
var result struct {
Events []struct {
Serial int64 `json:"event.serial"`
} `json:"events"`
}
if err = json.Unmarshal(resp.Json, &result); err != nil {
return nil, err
}
evs = make([]uint64, 0, len(result.Events))
for _, ev := range result.Events {
evs = append(evs, uint64(ev.Serial))
}
return evs, nil
}
// RunMigrations runs database migrations (no-op for dgraph)

View File

@@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"strings"
"next.orly.dev/pkg/database"
"next.orly.dev/pkg/database/indexes/types"
@@ -54,15 +55,16 @@ func (d *D) FetchEventsBySerials(serials []*types.Uint40) (
return make(map[uint64]*event.E), nil
}
// Build query for multiple serials
serialStrs := make([]string, len(serials))
// Build a filter for multiple serials using OR conditions
serialConditions := make([]string, len(serials))
for i, ser := range serials {
serialStrs[i] = fmt.Sprintf("%d", ser.Get())
serialConditions[i] = fmt.Sprintf("eq(event.serial, %d)", ser.Get())
}
serialFilter := strings.Join(serialConditions, " OR ")
// Use uid() function for efficient multi-get
// Query with proper batch filtering
query := fmt.Sprintf(`{
events(func: uid(%s)) {
events(func: has(event.serial)) @filter(%s) {
event.id
event.kind
event.created_at
@@ -72,24 +74,70 @@ func (d *D) FetchEventsBySerials(serials []*types.Uint40) (
event.tags
event.serial
}
}`, serialStrs[0]) // Simplified - in production you'd handle multiple UIDs properly
}`, serialFilter)
resp, err := d.Query(context.Background(), query)
if err != nil {
return nil, fmt.Errorf("failed to fetch events by serials: %w", err)
}
evs, err := d.parseEventsFromResponse(resp.Json)
if err != nil {
// Parse the response including serial numbers
var result struct {
Events []struct {
ID string `json:"event.id"`
Kind int `json:"event.kind"`
CreatedAt int64 `json:"event.created_at"`
Content string `json:"event.content"`
Sig string `json:"event.sig"`
Pubkey string `json:"event.pubkey"`
Tags string `json:"event.tags"`
Serial int64 `json:"event.serial"`
} `json:"events"`
}
if err = json.Unmarshal(resp.Json, &result); err != nil {
return nil, err
}
// Map events by serial
// Map events by their serial numbers
events = make(map[uint64]*event.E)
for i, ser := range serials {
if i < len(evs) {
events[ser.Get()] = evs[i]
for _, ev := range result.Events {
// Decode hex strings
id, err := hex.Dec(ev.ID)
if err != nil {
continue
}
sig, err := hex.Dec(ev.Sig)
if err != nil {
continue
}
pubkey, err := hex.Dec(ev.Pubkey)
if err != nil {
continue
}
// Parse tags from JSON
var tags tag.S
if ev.Tags != "" {
if err := json.Unmarshal([]byte(ev.Tags), &tags); err != nil {
continue
}
}
// Create event
e := &event.E{
Kind: uint16(ev.Kind),
CreatedAt: ev.CreatedAt,
Content: []byte(ev.Content),
Tags: &tags,
}
// Copy fixed-size arrays
copy(e.ID[:], id)
copy(e.Sig[:], sig)
copy(e.Pubkey[:], pubkey)
events[uint64(ev.Serial)] = e
}
return events, nil
@@ -140,17 +188,54 @@ func (d *D) GetSerialsByIds(ids *tag.T) (
return serials, nil
}
// Query each ID individually (simplified implementation)
for _, id := range ids.T {
if len(id) >= 2 {
idStr := string(id[1])
serial, err := d.GetSerialById([]byte(idStr))
if err == nil {
serials[idStr] = serial
}
// Build batch query for all IDs at once
idConditions := make([]string, 0, len(ids.T))
idMap := make(map[string][]byte) // Map hex ID to original bytes
for _, idBytes := range ids.T {
if len(idBytes) > 0 {
idStr := hex.Enc(idBytes)
idConditions = append(idConditions, fmt.Sprintf("eq(event.id, %q)", idStr))
idMap[idStr] = idBytes
}
}
if len(idConditions) == 0 {
return serials, nil
}
// Create single query with OR conditions
idFilter := strings.Join(idConditions, " OR ")
query := fmt.Sprintf(`{
events(func: has(event.id)) @filter(%s) {
event.id
event.serial
}
}`, idFilter)
resp, err := d.Query(context.Background(), query)
if err != nil {
return nil, fmt.Errorf("failed to batch query serials by IDs: %w", err)
}
var result struct {
Events []struct {
ID string `json:"event.id"`
Serial int64 `json:"event.serial"`
} `json:"events"`
}
if err = json.Unmarshal(resp.Json, &result); err != nil {
return nil, err
}
// Map results back
for _, ev := range result.Events {
serial := types.Uint40{}
serial.Set(uint64(ev.Serial))
serials[ev.ID] = &serial
}
return serials, nil
}
@@ -191,10 +276,47 @@ func (d *D) GetSerialsByIdsWithFilter(
func (d *D) GetSerialsByRange(idx database.Range) (
serials types.Uint40s, err error,
) {
// This would need to be implemented based on how ranges are defined
// For now, returning not implemented
err = fmt.Errorf("not implemented")
return
// Range represents a byte-prefix range for index scanning
// For dgraph, we need to convert this to a query on indexed fields
// The range is typically used for scanning event IDs or other hex-encoded keys
if len(idx.Start) == 0 && len(idx.End) == 0 {
return nil, fmt.Errorf("empty range provided")
}
startStr := hex.Enc(idx.Start)
endStr := hex.Enc(idx.End)
// Query for events with IDs in the specified range
query := fmt.Sprintf(`{
events(func: ge(event.id, %q)) @filter(le(event.id, %q)) {
event.serial
}
}`, startStr, endStr)
resp, err := d.Query(context.Background(), query)
if err != nil {
return nil, fmt.Errorf("failed to query serials by range: %w", err)
}
var result struct {
Events []struct {
Serial int64 `json:"event.serial"`
} `json:"events"`
}
if err = json.Unmarshal(resp.Json, &result); err != nil {
return nil, err
}
serials = make([]*types.Uint40, 0, len(result.Events))
for _, ev := range result.Events {
serial := types.Uint40{}
serial.Set(uint64(ev.Serial))
serials = append(serials, &serial)
}
return serials, nil
}
// GetFullIdPubkeyBySerial retrieves ID and pubkey for a serial number

View File

@@ -6,8 +6,10 @@ import (
"encoding/json"
"fmt"
"io"
"strings"
"next.orly.dev/pkg/encoders/event"
"next.orly.dev/pkg/encoders/hex"
)
// Import imports events from a reader (JSONL format)
@@ -17,11 +19,83 @@ func (d *D) Import(rr io.Reader) {
// Export exports events to a writer (JSONL format)
func (d *D) Export(c context.Context, w io.Writer, pubkeys ...[]byte) {
// Query all events or events for specific pubkeys
// Write as JSONL
// Build query based on whether pubkeys are specified
var query string
// Stub implementation
fmt.Fprintf(w, "# Export not yet implemented for dgraph\n")
if len(pubkeys) > 0 {
// Build pubkey filter
pubkeyStrs := make([]string, len(pubkeys))
for i, pk := range pubkeys {
pubkeyStrs[i] = fmt.Sprintf("eq(event.pubkey, %q)", hex.Enc(pk))
}
pubkeyFilter := strings.Join(pubkeyStrs, " OR ")
query = fmt.Sprintf(`{
events(func: has(event.id)) @filter(%s) {
event.id
event.kind
event.created_at
event.content
event.sig
event.pubkey
event.tags
}
}`, pubkeyFilter)
} else {
// Export all events
query = `{
events(func: has(event.id)) {
event.id
event.kind
event.created_at
event.content
event.sig
event.pubkey
event.tags
}
}`
}
// Execute query
resp, err := d.Query(c, query)
if err != nil {
d.Logger.Errorf("failed to query events for export: %v", err)
fmt.Fprintf(w, "# Error: failed to query events: %v\n", err)
return
}
// Parse events
evs, err := d.parseEventsFromResponse(resp.Json)
if err != nil {
d.Logger.Errorf("failed to parse events for export: %v", err)
fmt.Fprintf(w, "# Error: failed to parse events: %v\n", err)
return
}
// Write header comment
fmt.Fprintf(w, "# Exported %d events from dgraph\n", len(evs))
// Write each event as JSONL
count := 0
for _, ev := range evs {
jsonData, err := json.Marshal(ev)
if err != nil {
d.Logger.Warningf("failed to marshal event: %v", err)
continue
}
if _, err := fmt.Fprintf(w, "%s\n", jsonData); err != nil {
d.Logger.Errorf("failed to write event: %v", err)
return
}
count++
if count%1000 == 0 {
d.Logger.Infof("exported %d events", count)
}
}
d.Logger.Infof("export complete: %d events written", count)
}
// ImportEventsFromReader imports events from a reader

View File

@@ -48,6 +48,20 @@ func (d *D) QueryEventsWithOptions(
// buildDQLQuery constructs a DQL query from a Nostr filter
func (d *D) buildDQLQuery(f *filter.F, includeDeleteEvents bool) string {
return d.buildDQLQueryWithFields(f, includeDeleteEvents, []string{
"uid",
"event.id",
"event.kind",
"event.created_at",
"event.content",
"event.sig",
"event.pubkey",
"event.tags",
})
}
// buildDQLQueryWithFields constructs a DQL query with custom field selection
func (d *D) buildDQLQueryWithFields(f *filter.F, includeDeleteEvents bool, fields []string) string {
var conditions []string
var funcQuery string
@@ -139,18 +153,14 @@ func (d *D) buildDQLQuery(f *filter.F, includeDeleteEvents bool) string {
limitStr = fmt.Sprintf(", first: %d", f.Limit)
}
// Build field list
fieldStr := strings.Join(fields, "\n\t\t\t")
query := fmt.Sprintf(`{
events(func: %s%s%s%s) {
uid
event.id
event.kind
event.created_at
event.content
event.sig
event.pubkey
event.tags
%s
}
}`, funcQuery, filterStr, orderBy, limitStr)
}`, funcQuery, filterStr, orderBy, limitStr, fieldStr)
return query
}
@@ -257,12 +267,8 @@ func (d *D) QueryDeleteEventsByTargetId(c context.Context, targetEventId []byte)
func (d *D) QueryForSerials(c context.Context, f *filter.F) (
serials types.Uint40s, err error,
) {
// Build query
query := d.buildDQLQuery(f, false)
// Modify query to only return serial numbers
query = strings.Replace(query, "event.id\n\t\t\tevent.kind", "event.serial", 1)
query = strings.Replace(query, "\t\t\tevent.created_at\n\t\t\tevent.content\n\t\t\tevent.sig\n\t\t\tevent.pubkey\n\t\t\tevent.tags", "", 1)
// Build query requesting only serial numbers
query := d.buildDQLQueryWithFields(f, false, []string{"event.serial"})
resp, err := d.Query(c, query)
if err != nil {
@@ -293,11 +299,13 @@ func (d *D) QueryForSerials(c context.Context, f *filter.F) (
func (d *D) QueryForIds(c context.Context, f *filter.F) (
idPkTs []*store.IdPkTs, err error,
) {
// Build query
query := d.buildDQLQuery(f, false)
// Modify query to only return ID, pubkey, created_at, serial
query = strings.Replace(query, "event.kind\n\t\t\tevent.created_at\n\t\t\tevent.content\n\t\t\tevent.sig\n\t\t\tevent.pubkey\n\t\t\tevent.tags", "event.id\n\t\t\tevent.pubkey\n\t\t\tevent.created_at\n\t\t\tevent.serial", 1)
// Build query requesting only ID, pubkey, created_at, serial
query := d.buildDQLQueryWithFields(f, false, []string{
"event.id",
"event.pubkey",
"event.created_at",
"event.serial",
})
resp, err := d.Query(c, query)
if err != nil {
@@ -342,11 +350,8 @@ func (d *D) QueryForIds(c context.Context, f *filter.F) (
func (d *D) CountEvents(c context.Context, f *filter.F) (
count int, approximate bool, err error,
) {
// Build query with count
query := d.buildDQLQuery(f, false)
// Modify to count instead of returning full data
query = strings.Replace(query, "uid\n\t\t\tevent.id\n\t\t\tevent.kind\n\t\t\tevent.created_at\n\t\t\tevent.content\n\t\t\tevent.sig\n\t\t\tevent.pubkey\n\t\t\tevent.tags", "count(uid)", 1)
// Build query requesting only count
query := d.buildDQLQueryWithFields(f, false, []string{"count(uid)"})
resp, err := d.Query(c, query)
if err != nil {

View File

@@ -127,10 +127,8 @@ func (d *D) buildEventNQuads(ev *event.E, serial uint64) string {
// GetSerialsFromFilter returns event serials matching a filter
func (d *D) GetSerialsFromFilter(f *filter.F) (serials types.Uint40s, err error) {
// For dgraph, we'll use the event.serial field
// This is a stub implementation
err = fmt.Errorf("not implemented")
return
// Use QueryForSerials which already implements the proper filter logic
return d.QueryForSerials(context.Background(), f)
}
// WouldReplaceEvent checks if an event would replace existing events