bring in updates from relayer.

This commit is contained in:
fiatjaf
2023-10-31 15:49:01 -03:00
parent 473d817cc6
commit e95c3cb033
8 changed files with 48 additions and 34 deletions

View File

@@ -11,9 +11,12 @@ import (
"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esutil"
"github.com/fiatjaf/eventstore"
"github.com/nbd-wtf/go-nostr"
)
var _ eventstore.Storage = (*ElasticsearchStorage)(nil)
type IndexedEvent struct {
Event nostr.Event `json:"event"`
ContentSearch string `json:"content_search"`

View File

@@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log"
@@ -73,12 +74,12 @@ func buildDsl(filter nostr.Filter) ([]byte, error) {
// since
if filter.Since != nil {
dsl.Must(esquery.Range("event.created_at").Gt(filter.Since))
dsl.Must(esquery.Range("event.created_at").Gte(filter.Since))
}
// until
if filter.Until != nil {
dsl.Must(esquery.Range("event.created_at").Lt(filter.Until))
dsl.Must(esquery.Range("event.created_at").Lte(filter.Until))
}
// search

View File

@@ -1,6 +1,7 @@
package postgresql
import (
"github.com/fiatjaf/eventstore"
"github.com/jmoiron/sqlx"
"github.com/jmoiron/sqlx/reflectx"
_ "github.com/lib/pq"
@@ -14,6 +15,8 @@ const (
queryTagsLimit = 10
)
var _ eventstore.Storage = (*PostgresBackend)(nil)
func (b *PostgresBackend) Init() error {
db, err := sqlx.Connect("postgres", b.DatabaseURL)
if err != nil {

View File

@@ -17,11 +17,13 @@ func (b PostgresBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (
query, params, err := b.queryEventsSql(filter, false)
if err != nil {
close(ch)
return nil, err
}
rows, err := b.DB.Query(query, params...)
if err != nil && err != sql.ErrNoRows {
close(ch)
return nil, fmt.Errorf("failed to fetch events using query %q: %w", query, err)
}
@@ -155,11 +157,11 @@ func (b PostgresBackend) queryEventsSql(filter nostr.Filter, doCount bool) (stri
}
if filter.Since != nil {
conditions = append(conditions, "created_at > ?")
conditions = append(conditions, "created_at >= ?")
params = append(params, filter.Since)
}
if filter.Until != nil {
conditions = append(conditions, "created_at < ?")
conditions = append(conditions, "created_at <= ?")
params = append(params, filter.Until)
}

View File

@@ -1,33 +1,46 @@
package sqlite3
import (
"github.com/fiatjaf/eventstore"
"github.com/jmoiron/sqlx"
"github.com/jmoiron/sqlx/reflectx"
_ "github.com/mattn/go-sqlite3"
)
var _ eventstore.Storage = (*SQLite3Backend)(nil)
var ddls = []string{
`CREATE TABLE IF NOT EXISTS event (
id text NOT NULL,
pubkey text NOT NULL,
created_at integer NOT NULL,
kind integer NOT NULL,
tags jsonb NOT NULL,
content text NOT NULL,
sig text NOT NULL);`,
`CREATE UNIQUE INDEX IF NOT EXISTS ididx ON event(id)`,
`CREATE INDEX IF NOT EXISTS pubkeyprefix ON event(pubkey)`,
`CREATE INDEX IF NOT EXISTS timeidx ON event(created_at DESC)`,
`CREATE INDEX IF NOT EXISTS kindidx ON event(kind)`,
}
func (b *SQLite3Backend) Init() error {
db, err := sqlx.Connect("sqlite3", b.DatabaseURL)
if err != nil {
return err
}
// sqlx default is 0 (unlimited), while sqlite3 by default accepts up to 100 connections
db.SetMaxOpenConns(80)
db.SetMaxOpenConns(b.MaxOpenConns)
db.SetMaxIdleConns(b.MaxIdleConns)
db.Mapper = reflectx.NewMapperFunc("json", sqlx.NameMapper)
b.DB = db
_, err = b.DB.Exec(`
CREATE TABLE IF NOT EXISTS event (
id text NOT NULL,
pubkey text NOT NULL,
created_at integer NOT NULL,
kind integer NOT NULL,
tags jsonb NOT NULL,
content text NOT NULL,
sig text NOT NULL
);
`)
return err
for _, ddl := range ddls {
_, err = b.DB.Exec(ddl)
if err != nil {
return err
}
}
return nil
}

View File

@@ -17,11 +17,13 @@ func (b SQLite3Backend) QueryEvents(ctx context.Context, filter nostr.Filter) (c
query, params, err := queryEventsSql(filter, false)
if err != nil {
close(ch)
return nil, err
}
rows, err := b.DB.Query(query, params...)
if err != nil && err != sql.ErrNoRows {
close(ch)
return nil, fmt.Errorf("failed to fetch events using query %q: %w", query, err)
}
@@ -150,11 +152,11 @@ func queryEventsSql(filter nostr.Filter, doCount bool) (string, []any, error) {
}
if filter.Since != nil {
conditions = append(conditions, "created_at > ?")
conditions = append(conditions, "created_at >= ?")
params = append(params, filter.Since)
}
if filter.Until != nil {
conditions = append(conditions, "created_at < ?")
conditions = append(conditions, "created_at <= ?")
params = append(params, filter.Until)
}
if filter.Search != "" {

View File

@@ -25,20 +25,8 @@ func (b *SQLite3Backend) SaveEvent(ctx context.Context, evt *nostr.Event) error
}
if nr == 0 {
return khatru.ErrDupEvent
return storage.ErrDupEvent
}
return nil
}
func (b *SQLite3Backend) BeforeSave(ctx context.Context, evt *nostr.Event) {
// do nothing
}
func (b *SQLite3Backend) AfterSave(evt *nostr.Event) {
// delete all but the 100 most recent ones for each key
b.DB.Exec(`DELETE FROM event WHERE pubkey = $1 AND kind = $2 AND created_at < (
SELECT created_at FROM event WHERE pubkey = $1
ORDER BY created_at DESC OFFSET 100 LIMIT 1
)`, evt.PubKey, evt.Kind)
}

View File

@@ -6,5 +6,7 @@ import (
type SQLite3Backend struct {
*sqlx.DB
DatabaseURL string
DatabaseURL string
MaxOpenConns int
MaxIdleConns int
}