implement preliminary implementation of graph data model

This commit is contained in:
2025-11-25 18:08:44 +00:00
parent 655a7d9473
commit 6412edeabb
17 changed files with 4923 additions and 100 deletions

View File

@@ -39,6 +39,32 @@ type N struct {
// Ensure N implements database.Database interface at compile time
var _ database.Database = (*N)(nil)
// CollectedResult wraps pre-fetched Neo4j records for iteration after session close
// This is necessary because Neo4j results are lazy and need an open session for iteration
type CollectedResult struct {
records []*neo4j.Record
index int
}
// Next advances to the next record, returning true if there is one
func (r *CollectedResult) Next(ctx context.Context) bool {
r.index++
return r.index < len(r.records)
}
// Record returns the current record
func (r *CollectedResult) Record() *neo4j.Record {
if r.index < 0 || r.index >= len(r.records) {
return nil
}
return r.records[r.index]
}
// Len returns the number of records
func (r *CollectedResult) Len() int {
return len(r.records)
}
// init registers the neo4j database factory
func init() {
database.RegisterNeo4jFactory(func(
@@ -159,7 +185,8 @@ func (n *N) initNeo4jClient() error {
// ExecuteRead executes a read query against Neo4j
func (n *N) ExecuteRead(ctx context.Context, cypher string, params map[string]any) (neo4j.ResultWithContext, error) {
// Returns a collected result that can be iterated after the session closes
func (n *N) ExecuteRead(ctx context.Context, cypher string, params map[string]any) (*CollectedResult, error) {
session := n.driver.NewSession(ctx, neo4j.SessionConfig{AccessMode: neo4j.AccessModeRead})
defer session.Close(ctx)
@@ -168,7 +195,14 @@ func (n *N) ExecuteRead(ctx context.Context, cypher string, params map[string]an
return nil, fmt.Errorf("neo4j read query failed: %w", err)
}
return result, nil
// Collect all records before the session closes
// (Neo4j results are lazy and need an open session for iteration)
records, err := result.Collect(ctx)
if err != nil {
return nil, fmt.Errorf("neo4j result collect failed: %w", err)
}
return &CollectedResult{records: records, index: -1}, nil
}
// ExecuteWrite executes a write query against Neo4j
@@ -217,7 +251,7 @@ func (n *N) Close() (err error) {
return
}
// Wipe removes all data
// Wipe removes all data and re-applies schema
func (n *N) Wipe() (err error) {
// Delete all nodes and relationships in Neo4j
ctx := context.Background()
@@ -226,9 +260,14 @@ func (n *N) Wipe() (err error) {
return fmt.Errorf("failed to wipe neo4j database: %w", err)
}
// Remove data directory
if err = os.RemoveAll(n.dataDir); chk.E(err) {
return
// Re-apply schema (indexes and constraints were deleted with the data)
if err = n.applySchema(ctx); err != nil {
return fmt.Errorf("failed to re-apply schema after wipe: %w", err)
}
// Re-initialize serial counter (it was deleted with the Marker node)
if err = n.initSerialCounter(); err != nil {
return fmt.Errorf("failed to re-init serial counter after wipe: %w", err)
}
return nil