import from khatru.

This commit is contained in:
fiatjaf
2023-10-31 15:40:42 -03:00
commit 473d817cc6
29 changed files with 3269 additions and 0 deletions

View File

@@ -0,0 +1,182 @@
package elasticsearch
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"strings"
"time"
"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esutil"
"github.com/nbd-wtf/go-nostr"
)
type IndexedEvent struct {
Event nostr.Event `json:"event"`
ContentSearch string `json:"content_search"`
}
var indexMapping = `
{
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0
},
"mappings": {
"dynamic": false,
"properties": {
"event": {
"dynamic": false,
"properties": {
"id": {"type": "keyword"},
"pubkey": {"type": "keyword"},
"kind": {"type": "integer"},
"tags": {"type": "keyword"},
"created_at": {"type": "date"}
}
},
"content_search": {"type": "text"}
}
}
}
`
type ElasticsearchStorage struct {
URL string
IndexName string
es *elasticsearch.Client
bi esutil.BulkIndexer
}
func (ess *ElasticsearchStorage) Init() error {
if ess.IndexName == "" {
ess.IndexName = "events"
}
cfg := elasticsearch.Config{}
if ess.URL != "" {
cfg.Addresses = strings.Split(ess.URL, ",")
}
es, err := elasticsearch.NewClient(cfg)
if err != nil {
return err
}
res, err := es.Indices.Create(ess.IndexName, es.Indices.Create.WithBody(strings.NewReader(indexMapping)))
if err != nil {
return err
}
if res.IsError() {
body, _ := io.ReadAll(res.Body)
txt := string(body)
if !strings.Contains(txt, "resource_already_exists_exception") {
return fmt.Errorf("%s", txt)
}
}
// bulk indexer
bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Index: ess.IndexName,
Client: es,
NumWorkers: 2,
FlushInterval: 3 * time.Second,
})
if err != nil {
return fmt.Errorf("error creating the indexer: %s", err)
}
ess.es = es
ess.bi = bi
return nil
}
func (ess *ElasticsearchStorage) DeleteEvent(ctx context.Context, evt *nostr.Event) error {
done := make(chan error)
err := ess.bi.Add(
ctx,
esutil.BulkIndexerItem{
Action: "delete",
DocumentID: evt.ID,
OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) {
close(done)
},
OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) {
if err != nil {
done <- err
} else {
// ok if deleted item not found
if res.Status == 404 {
close(done)
return
}
txt, _ := json.Marshal(res)
err := fmt.Errorf("ERROR: %s", txt)
done <- err
}
},
},
)
if err != nil {
return err
}
err = <-done
return err
}
func (ess *ElasticsearchStorage) SaveEvent(ctx context.Context, evt *nostr.Event) error {
ie := &IndexedEvent{
Event: *evt,
}
// post processing: index for FTS
// some ideas:
// - index kind=0 fields a set of dedicated mapped fields
// (or use a separate index for profiles with a dedicated mapping)
// - if it's valid JSON just index the "values" and not the keys
// - more content introspection: language detection
// - denormalization... attach profile + ranking signals to events
if evt.Kind != 4 {
ie.ContentSearch = evt.Content
}
data, err := json.Marshal(ie)
if err != nil {
return err
}
done := make(chan error)
// adapted from:
// https://github.com/elastic/go-elasticsearch/blob/main/_examples/bulk/indexer.go#L196
err = ess.bi.Add(
ctx,
esutil.BulkIndexerItem{
Action: "index",
DocumentID: evt.ID,
Body: bytes.NewReader(data),
OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) {
close(done)
},
OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) {
if err != nil {
done <- err
} else {
err := fmt.Errorf("ERROR: %s: %s", res.Error.Type, res.Error.Reason)
done <- err
}
},
},
)
if err != nil {
return err
}
err = <-done
return err
}

261
elasticsearch/query.go Normal file
View File

@@ -0,0 +1,261 @@
package elasticsearch
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"log"
"reflect"
"github.com/aquasecurity/esquery"
"github.com/elastic/go-elasticsearch/v8/esutil"
"github.com/nbd-wtf/go-nostr"
)
type EsSearchResult struct {
Took int
TimedOut bool `json:"timed_out"`
Hits struct {
Total struct {
Value int
Relation string
}
Hits []struct {
Source IndexedEvent `json:"_source"`
}
}
}
type EsCountResult struct {
Count int64
}
func buildDsl(filter nostr.Filter) ([]byte, error) {
dsl := esquery.Bool()
prefixFilter := func(fieldName string, values []string) {
if len(values) == 0 {
return
}
prefixQ := esquery.Bool()
for _, v := range values {
if len(v) < 64 {
prefixQ.Should(esquery.Prefix(fieldName, v))
} else {
prefixQ.Should(esquery.Term(fieldName, v))
}
}
dsl.Must(prefixQ)
}
// ids
prefixFilter("event.id", filter.IDs)
// authors
prefixFilter("event.pubkey", filter.Authors)
// kinds
if len(filter.Kinds) > 0 {
dsl.Must(esquery.Terms("event.kind", toInterfaceSlice(filter.Kinds)...))
}
// tags
if len(filter.Tags) > 0 {
tagQ := esquery.Bool()
for char, terms := range filter.Tags {
vs := toInterfaceSlice(append(terms, char))
tagQ.Should(esquery.Terms("event.tags", vs...))
}
dsl.Must(tagQ)
}
// since
if filter.Since != nil {
dsl.Must(esquery.Range("event.created_at").Gt(filter.Since))
}
// until
if filter.Until != nil {
dsl.Must(esquery.Range("event.created_at").Lt(filter.Until))
}
// search
if filter.Search != "" {
dsl.Must(esquery.Match("content_search", filter.Search))
}
return json.Marshal(esquery.Query(dsl))
}
func (ess *ElasticsearchStorage) getByID(filter nostr.Filter) ([]*nostr.Event, error) {
got, err := ess.es.Mget(
esutil.NewJSONReader(filter),
ess.es.Mget.WithIndex(ess.IndexName))
if err != nil {
return nil, err
}
var mgetResponse struct {
Docs []struct {
Found bool
Source IndexedEvent `json:"_source"`
}
}
if err := json.NewDecoder(got.Body).Decode(&mgetResponse); err != nil {
return nil, err
}
events := make([]*nostr.Event, 0, len(mgetResponse.Docs))
for _, e := range mgetResponse.Docs {
if e.Found {
events = append(events, &e.Source.Event)
}
}
return events, nil
}
func (ess *ElasticsearchStorage) QueryEvents(ctx context.Context, filter nostr.Filter) (chan *nostr.Event, error) {
ch := make(chan *nostr.Event)
// optimization: get by id
if isGetByID(filter) {
if evts, err := ess.getByID(filter); err == nil {
for _, evt := range evts {
ch <- evt
}
close(ch)
} else {
return nil, fmt.Errorf("error getting by id: %w", err)
}
}
dsl, err := buildDsl(filter)
if err != nil {
return nil, err
}
limit := 1000
if filter.Limit > 0 && filter.Limit < limit {
limit = filter.Limit
}
es := ess.es
res, err := es.Search(
es.Search.WithContext(ctx),
es.Search.WithIndex(ess.IndexName),
es.Search.WithBody(bytes.NewReader(dsl)),
es.Search.WithSize(limit),
es.Search.WithSort("event.created_at:desc"),
)
if err != nil {
log.Fatalf("Error getting response: %s", err)
}
defer res.Body.Close()
if res.IsError() {
txt, _ := io.ReadAll(res.Body)
fmt.Println("oh no", string(txt))
return nil, fmt.Errorf("%s", txt)
}
var r EsSearchResult
if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
return nil, err
}
go func() {
for _, e := range r.Hits.Hits {
ch <- &e.Source.Event
}
close(ch)
}()
return ch, nil
}
func isGetByID(filter nostr.Filter) bool {
isGetById := len(filter.IDs) > 0 &&
len(filter.Authors) == 0 &&
len(filter.Kinds) == 0 &&
len(filter.Tags) == 0 &&
len(filter.Search) == 0 &&
filter.Since == nil &&
filter.Until == nil
if isGetById {
for _, id := range filter.IDs {
if len(id) != 64 {
return false
}
}
}
return isGetById
}
// from: https://stackoverflow.com/a/12754757
func toInterfaceSlice(slice interface{}) []interface{} {
s := reflect.ValueOf(slice)
if s.Kind() != reflect.Slice {
panic("InterfaceSlice() given a non-slice type")
}
// Keep the distinction between nil and empty slice input
if s.IsNil() {
return nil
}
ret := make([]interface{}, s.Len())
for i := 0; i < s.Len(); i++ {
ret[i] = s.Index(i).Interface()
}
return ret
}
func (ess *ElasticsearchStorage) CountEvents(ctx context.Context, filter nostr.Filter) (int64, error) {
count := int64(0)
// optimization: get by id
if isGetByID(filter) {
if evts, err := ess.getByID(filter); err == nil {
count += int64(len(evts))
} else {
return 0, fmt.Errorf("error getting by id: %w", err)
}
}
dsl, err := buildDsl(filter)
if err != nil {
return 0, err
}
es := ess.es
res, err := es.Count(
es.Count.WithContext(ctx),
es.Count.WithIndex(ess.IndexName),
es.Count.WithBody(bytes.NewReader(dsl)),
)
if err != nil {
log.Fatalf("Error getting response: %s", err)
}
defer res.Body.Close()
if res.IsError() {
txt, _ := io.ReadAll(res.Body)
fmt.Println("oh no", string(txt))
return 0, fmt.Errorf("%s", txt)
}
var r EsCountResult
if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
return 0, err
}
return r.Count + count, nil
}

View File

@@ -0,0 +1,43 @@
package elasticsearch
import (
"bytes"
"encoding/json"
"fmt"
"testing"
"github.com/nbd-wtf/go-nostr"
)
func TestQuery(t *testing.T) {
now := nostr.Now()
yesterday := now - 60*60*24
filter := &nostr.Filter{
IDs: []string{"abc", "123", "971b9489b4fd4e41a85951607922b982d981fa9d55318bc304f21f390721404c"},
Kinds: []int{0, 1},
Tags: nostr.TagMap{
"e": []string{"abc"},
"p": []string{"aaa", "bbb"},
},
Since: &yesterday,
Until: &now,
Limit: 100,
Search: "other stuff",
}
dsl, err := buildDsl(filter)
if err != nil {
t.Fatal(err)
}
pprint(dsl)
}
func pprint(j []byte) {
var dst bytes.Buffer
err := json.Indent(&dst, j, "", " ")
if err != nil {
fmt.Println("invalid JSON", err, string(j))
} else {
fmt.Println(dst.String())
}
}