186 lines
4.0 KiB
Go
186 lines
4.0 KiB
Go
package elasticsearch
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/elastic/go-elasticsearch/v8"
|
|
"github.com/elastic/go-elasticsearch/v8/esutil"
|
|
"github.com/fiatjaf/eventstore"
|
|
"github.com/nbd-wtf/go-nostr"
|
|
)
|
|
|
|
var _ eventstore.Storage = (*ElasticsearchStorage)(nil)
|
|
|
|
type IndexedEvent struct {
|
|
Event nostr.Event `json:"event"`
|
|
ContentSearch string `json:"content_search"`
|
|
}
|
|
|
|
var indexMapping = `
|
|
{
|
|
"settings": {
|
|
"number_of_shards": 1,
|
|
"number_of_replicas": 0
|
|
},
|
|
"mappings": {
|
|
"dynamic": false,
|
|
"properties": {
|
|
"event": {
|
|
"dynamic": false,
|
|
"properties": {
|
|
"id": {"type": "keyword"},
|
|
"pubkey": {"type": "keyword"},
|
|
"kind": {"type": "integer"},
|
|
"tags": {"type": "keyword"},
|
|
"created_at": {"type": "date"}
|
|
}
|
|
},
|
|
"content_search": {"type": "text"}
|
|
}
|
|
}
|
|
}
|
|
`
|
|
|
|
type ElasticsearchStorage struct {
|
|
URL string
|
|
IndexName string
|
|
|
|
es *elasticsearch.Client
|
|
bi esutil.BulkIndexer
|
|
}
|
|
|
|
func (ess *ElasticsearchStorage) Init() error {
|
|
if ess.IndexName == "" {
|
|
ess.IndexName = "events"
|
|
}
|
|
|
|
cfg := elasticsearch.Config{}
|
|
if ess.URL != "" {
|
|
cfg.Addresses = strings.Split(ess.URL, ",")
|
|
}
|
|
es, err := elasticsearch.NewClient(cfg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
res, err := es.Indices.Create(ess.IndexName, es.Indices.Create.WithBody(strings.NewReader(indexMapping)))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if res.IsError() {
|
|
body, _ := io.ReadAll(res.Body)
|
|
txt := string(body)
|
|
if !strings.Contains(txt, "resource_already_exists_exception") {
|
|
return fmt.Errorf("%s", txt)
|
|
}
|
|
}
|
|
|
|
// bulk indexer
|
|
bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
|
|
Index: ess.IndexName,
|
|
Client: es,
|
|
NumWorkers: 2,
|
|
FlushInterval: 3 * time.Second,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("error creating the indexer: %s", err)
|
|
}
|
|
|
|
ess.es = es
|
|
ess.bi = bi
|
|
|
|
return nil
|
|
}
|
|
|
|
func (ess *ElasticsearchStorage) DeleteEvent(ctx context.Context, evt *nostr.Event) error {
|
|
done := make(chan error)
|
|
err := ess.bi.Add(
|
|
ctx,
|
|
esutil.BulkIndexerItem{
|
|
Action: "delete",
|
|
DocumentID: evt.ID,
|
|
OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) {
|
|
close(done)
|
|
},
|
|
OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) {
|
|
if err != nil {
|
|
done <- err
|
|
} else {
|
|
// ok if deleted item not found
|
|
if res.Status == 404 {
|
|
close(done)
|
|
return
|
|
}
|
|
txt, _ := json.Marshal(res)
|
|
err := fmt.Errorf("ERROR: %s", txt)
|
|
done <- err
|
|
}
|
|
},
|
|
},
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = <-done
|
|
return err
|
|
}
|
|
|
|
func (ess *ElasticsearchStorage) SaveEvent(ctx context.Context, evt *nostr.Event) error {
|
|
ie := &IndexedEvent{
|
|
Event: *evt,
|
|
}
|
|
|
|
// post processing: index for FTS
|
|
// some ideas:
|
|
// - index kind=0 fields a set of dedicated mapped fields
|
|
// (or use a separate index for profiles with a dedicated mapping)
|
|
// - if it's valid JSON just index the "values" and not the keys
|
|
// - more content introspection: language detection
|
|
// - denormalization... attach profile + ranking signals to events
|
|
if evt.Kind != 4 {
|
|
ie.ContentSearch = evt.Content
|
|
}
|
|
|
|
data, err := json.Marshal(ie)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
done := make(chan error)
|
|
|
|
// adapted from:
|
|
// https://github.com/elastic/go-elasticsearch/blob/main/_examples/bulk/indexer.go#L196
|
|
err = ess.bi.Add(
|
|
ctx,
|
|
esutil.BulkIndexerItem{
|
|
Action: "index",
|
|
DocumentID: evt.ID,
|
|
Body: bytes.NewReader(data),
|
|
OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) {
|
|
close(done)
|
|
},
|
|
OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) {
|
|
if err != nil {
|
|
done <- err
|
|
} else {
|
|
err := fmt.Errorf("ERROR: %s: %s", res.Error.Type, res.Error.Reason)
|
|
done <- err
|
|
}
|
|
},
|
|
},
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = <-done
|
|
return err
|
|
}
|