bring in mysql backend from relayer.

This commit is contained in:
fiatjaf
2023-10-31 16:06:44 -03:00
parent a82b9b4bde
commit 28fc5b0571
7 changed files with 371 additions and 23 deletions

12
mysql/delete.go Normal file
View File

@@ -0,0 +1,12 @@
package mysql
import (
"context"
"github.com/nbd-wtf/go-nostr"
)
func (b MySQLBackend) DeleteEvent(ctx context.Context, evt *nostr.Event) error {
_, err := b.DB.ExecContext(ctx, "DELETE FROM event WHERE id = ?", evt.ID)
return err
}

71
mysql/init.go Normal file
View File

@@ -0,0 +1,71 @@
package mysql
import (
"strings"
"github.com/fiatjaf/eventstore"
_ "github.com/go-sql-driver/mysql"
"github.com/jmoiron/sqlx"
"github.com/jmoiron/sqlx/reflectx"
)
const (
queryLimit = 100
queryIDsLimit = 500
queryAuthorsLimit = 500
queryKindsLimit = 10
queryTagsLimit = 10
)
var _ eventstore.Storage = (*MySQLBackend)(nil)
var ddls = []string{
`CREATE TABLE IF NOT EXISTS event (
id char(64) NOT NULL primary key,
pubkey char(64) NOT NULL,
created_at int NOT NULL,
kind integer NOT NULL,
tags json NOT NULL,
content text NOT NULL,
sig text NOT NULL);`,
`CREATE INDEX pubkeyprefix ON event (pubkey);`,
`CREATE INDEX timeidx ON event (created_at DESC);`,
`CREATE INDEX kindidx ON event (kind);`,
}
func (b *MySQLBackend) Init() error {
db, err := sqlx.Connect("mysql", b.DatabaseURL)
if err != nil {
return err
}
// sqlx default is 0 (unlimited), while mysql by default accepts up to 100 connections
db.SetMaxOpenConns(80)
db.Mapper = reflectx.NewMapperFunc("json", sqlx.NameMapper)
b.DB = db
for _, ddl := range ddls {
_, err := b.DB.Exec(ddl)
if err != nil && !strings.HasPrefix(err.Error(), `Error 1061: Duplicate key name`) {
return err
}
}
if b.QueryLimit == 0 {
b.QueryLimit = queryLimit
}
if b.QueryIDsLimit == 0 {
b.QueryIDsLimit = queryIDsLimit
}
if b.QueryAuthorsLimit == 0 {
b.QueryAuthorsLimit = queryAuthorsLimit
}
if b.QueryKindsLimit == 0 {
b.QueryKindsLimit = queryKindsLimit
}
if b.QueryTagsLimit == 0 {
b.QueryTagsLimit = queryTagsLimit
}
return err
}

15
mysql/mysql.go Normal file
View File

@@ -0,0 +1,15 @@
package mysql
import (
"github.com/jmoiron/sqlx"
)
type MySQLBackend struct {
*sqlx.DB
DatabaseURL string
QueryLimit int
QueryIDsLimit int
QueryAuthorsLimit int
QueryKindsLimit int
QueryTagsLimit int
}

189
mysql/query.go Normal file
View File

@@ -0,0 +1,189 @@
package mysql
import (
"context"
"database/sql"
"encoding/hex"
"fmt"
"strconv"
"strings"
"github.com/jmoiron/sqlx"
"github.com/nbd-wtf/go-nostr"
)
func (b MySQLBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (ch chan *nostr.Event, err error) {
ch = make(chan *nostr.Event)
query, params, err := b.queryEventsSql(filter, false)
if err != nil {
close(ch)
return nil, err
}
rows, err := b.DB.Query(query, params...)
if err != nil && err != sql.ErrNoRows {
close(ch)
return nil, fmt.Errorf("failed to fetch events using query %q: %w", query, err)
}
go func() {
defer rows.Close()
defer close(ch)
for rows.Next() {
var evt nostr.Event
var timestamp int64
err := rows.Scan(&evt.ID, &evt.PubKey, &timestamp,
&evt.Kind, &evt.Tags, &evt.Content, &evt.Sig)
if err != nil {
return
}
evt.CreatedAt = nostr.Timestamp(timestamp)
ch <- &evt
}
}()
return ch, nil
}
func (b MySQLBackend) CountEvents(ctx context.Context, filter nostr.Filter) (int64, error) {
query, params, err := b.queryEventsSql(filter, true)
if err != nil {
return 0, err
}
var count int64
if err = b.DB.QueryRow(query, params...).Scan(&count); err != nil && err != sql.ErrNoRows {
return 0, fmt.Errorf("failed to fetch events using query %q: %w", query, err)
}
return count, nil
}
func (b MySQLBackend) queryEventsSql(filter nostr.Filter, doCount bool) (string, []any, error) {
var conditions []string
var params []any
if filter.IDs != nil {
if len(filter.IDs) > b.QueryIDsLimit {
// too many ids, fail everything
return "", nil, nil
}
likeids := make([]string, 0, len(filter.IDs))
for _, id := range filter.IDs {
// to prevent sql attack here we will check if
// these ids are valid 32byte hex
parsed, err := hex.DecodeString(id)
if err != nil || len(parsed) != 32 {
continue
}
likeids = append(likeids, fmt.Sprintf("id LIKE '%x%%'", parsed))
}
if len(likeids) == 0 {
// ids being [] mean you won't get anything
return "", nil, nil
}
conditions = append(conditions, "("+strings.Join(likeids, " OR ")+")")
}
if filter.Authors != nil {
if len(filter.Authors) > b.QueryAuthorsLimit {
// too many authors, fail everything
return "", nil, nil
}
likekeys := make([]string, 0, len(filter.Authors))
for _, key := range filter.Authors {
// to prevent sql attack here we will check if
// these keys are valid 32byte hex
parsed, err := hex.DecodeString(key)
if err != nil || len(parsed) != 32 {
continue
}
likekeys = append(likekeys, fmt.Sprintf("pubkey LIKE '%x%%'", parsed))
}
if len(likekeys) == 0 {
// authors being [] mean you won't get anything
return "", nil, nil
}
conditions = append(conditions, "("+strings.Join(likekeys, " OR ")+")")
}
if filter.Kinds != nil {
if len(filter.Kinds) > b.QueryKindsLimit {
// too many kinds, fail everything
return "", nil, nil
}
if len(filter.Kinds) == 0 {
// kinds being [] mean you won't get anything
return "", nil, nil
}
// no sql injection issues since these are ints
inkinds := make([]string, len(filter.Kinds))
for i, kind := range filter.Kinds {
inkinds[i] = strconv.Itoa(kind)
}
conditions = append(conditions, `kind IN (`+strings.Join(inkinds, ",")+`)`)
}
tagQuery := make([]string, 0, 1)
for _, values := range filter.Tags {
if len(values) == 0 {
// any tag set to [] is wrong
return "", nil, nil
}
// add these tags to the query
tagQuery = append(tagQuery, values...)
if len(tagQuery) > b.QueryTagsLimit {
// too many tags, fail everything
return "", nil, nil
}
}
// we use a very bad implementation in which we only check the tag values and
// ignore the tag names
for _, tagValue := range tagQuery {
params = append(params, "%"+tagValue+"%")
conditions = append(conditions, "tags LIKE ?")
}
if filter.Since != nil {
conditions = append(conditions, "created_at >= ?")
params = append(params, filter.Since)
}
if filter.Until != nil {
conditions = append(conditions, "created_at <= ?")
params = append(params, filter.Until)
}
if len(conditions) == 0 {
// fallback
conditions = append(conditions, "true")
}
if filter.Limit < 1 || filter.Limit > b.QueryLimit {
params = append(params, b.QueryLimit)
} else {
params = append(params, filter.Limit)
}
var query string
if doCount {
query = sqlx.Rebind(sqlx.BindType("mysql"), `SELECT
COUNT(*)
FROM event WHERE `+
strings.Join(conditions, " AND ")+
" ORDER BY created_at DESC LIMIT ?")
} else {
query = sqlx.Rebind(sqlx.BindType("mysql"), `SELECT
id, pubkey, created_at, kind, tags, content, sig
FROM event WHERE `+
strings.Join(conditions, " AND ")+
" ORDER BY created_at DESC LIMIT ?")
}
return query, params, nil
}

76
mysql/save.go Normal file
View File

@@ -0,0 +1,76 @@
package mysql
import (
"context"
"encoding/json"
"github.com/fiatjaf/eventstore"
"github.com/nbd-wtf/go-nostr"
)
func (b *MySQLBackend) SaveEvent(ctx context.Context, evt *nostr.Event) error {
deleteQuery, deleteParams, shouldDelete := deleteBeforeSaveSql(evt)
if shouldDelete {
_, _ = b.DB.ExecContext(ctx, deleteQuery, deleteParams...)
}
sql, params, _ := saveEventSql(evt)
res, err := b.DB.ExecContext(ctx, sql, params...)
if err != nil {
return err
}
nr, err := res.RowsAffected()
if err != nil {
return err
}
if nr == 0 {
return eventstore.ErrDupEvent
}
return nil
}
func deleteBeforeSaveSql(evt *nostr.Event) (string, []any, bool) {
// react to different kinds of events
var (
query = ""
params []any
shouldDelete bool
)
if evt.Kind == nostr.KindSetMetadata || evt.Kind == nostr.KindContactList || (10000 <= evt.Kind && evt.Kind < 20000) {
// delete past events from this user
query = `DELETE FROM event WHERE pubkey = ? AND kind = ?`
params = []any{evt.PubKey, evt.Kind}
shouldDelete = true
} else if evt.Kind == nostr.KindRecommendServer {
// delete past recommend_server events equal to this one
query = `DELETE FROM event WHERE pubkey = ? AND kind = ? AND content = ?`
params = []any{evt.PubKey, evt.Kind, evt.Content}
shouldDelete = true
} else if evt.Kind >= 30000 && evt.Kind < 40000 {
// NIP-33
d := evt.Tags.GetFirst([]string{"d"})
if d != nil {
query = `DELETE FROM event WHERE pubkey = ? AND kind = ? AND tags LIKE ?`
params = []any{evt.PubKey, evt.Kind, d.Value()}
shouldDelete = true
}
}
return query, params, shouldDelete
}
func saveEventSql(evt *nostr.Event) (string, []any, error) {
const query = `INSERT INTO event (
id, pubkey, created_at, kind, tags, content, sig)
VALUES (?, ?, ?, ?, ?, ?, ?)`
var (
tagsj, _ = json.Marshal(evt.Tags)
params = []any{evt.ID, evt.PubKey, evt.CreatedAt, evt.Kind, tagsj, evt.Content, evt.Sig}
)
return query, params, nil
}