Use dsl builder for es query

This commit is contained in:
Steve Perkins
2023-02-15 16:28:39 -05:00
parent 0e18a49861
commit a7a0bb6682
6 changed files with 114 additions and 113 deletions

3
go.mod
View File

@@ -28,6 +28,7 @@ require (
github.com/SaveTheRbtz/generic-sync-map-go v0.0.0-20220414055132-a37292614db8 // indirect
github.com/aead/siphash v1.0.1 // indirect
github.com/andybalholm/cascadia v1.3.1 // indirect
github.com/aquasecurity/esquery v0.2.0 // indirect
github.com/btcsuite/btcd v0.23.1 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect
github.com/btcsuite/btcd/btcutil v1.1.1 // indirect
@@ -52,6 +53,8 @@ require (
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect
github.com/decred/dcrd/lru v1.0.0 // indirect
github.com/elastic/elastic-transport-go/v8 v8.0.0-20211216131617-bbee439d559c // indirect
github.com/elastic/go-elasticsearch/v7 v7.6.0 // indirect
github.com/fatih/structs v1.1.0 // indirect
github.com/go-errors/errors v1.0.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/snappy v0.0.4 // indirect

7
go.sum
View File

@@ -26,6 +26,8 @@ github.com/andybalholm/brotli v1.0.3 h1:fpcw+r1N1h0Poc1F/pHbW40cUm/lMEQslZtCkBQ0
github.com/andybalholm/cascadia v1.1.0/go.mod h1:GsXiBklL0woXo1j/WYWtSYYC4ouU9PqHO0sqidkEA4Y=
github.com/andybalholm/cascadia v1.3.1 h1:nhxRkql1kdYCc8Snf7D5/D3spOX+dBgjA6u8x004T2c=
github.com/andybalholm/cascadia v1.3.1/go.mod h1:R4bJ1UQfqADjvDa4P6HZHLh/3OxWWEqc0Sk8XGwHqvA=
github.com/aquasecurity/esquery v0.2.0 h1:9WWXve95TE8hbm3736WB7nS6Owl8UGDeu+0jiyE9ttA=
github.com/aquasecurity/esquery v0.2.0/go.mod h1:VU+CIFR6C+H142HHZf9RUkp4Eedpo9UrEKeCQHWf9ao=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
@@ -125,6 +127,8 @@ github.com/dvyukov/go-fuzz v0.0.0-20210602112143-b1f3d6f4ef4e h1:qTP1telKJHlToHl
github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385/go.mod h1:0vRUJqYpeSZifjYj7uP3BG/gKcuzL9xWVV/Y+cK33KM=
github.com/elastic/elastic-transport-go/v8 v8.0.0-20211216131617-bbee439d559c h1:onA2RpIyeCPvYAj1LFYiiMTrSpqVINWMfYFRS7lofJs=
github.com/elastic/elastic-transport-go/v8 v8.0.0-20211216131617-bbee439d559c/go.mod h1:87Tcz8IVNe6rVSLdBux1o/PEItLtyabHU3naC7IoqKI=
github.com/elastic/go-elasticsearch/v7 v7.6.0 h1:sYpGLpEFHgLUKLsZUBfuaVI9QgHjS3JdH9fX4/z8QI8=
github.com/elastic/go-elasticsearch/v7 v7.6.0/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4=
github.com/elastic/go-elasticsearch/v8 v8.6.0 h1:xMaSe8jIh7NHzmNo9YBkewmaD2Pr+tX+zLkXxhieny4=
github.com/elastic/go-elasticsearch/v8 v8.6.0/go.mod h1:Usvydt+x0dv9a1TzEUaovqbJor8rmOHy5dSmPeMAE2k=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
@@ -132,6 +136,7 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/etcd-io/bbolt v1.3.3/go.mod h1:ZF2nL25h33cCyBtcyWeZ2/I3HQOfTP+0PIEvHjkjCrw=
github.com/fasthttp-contrib/websocket v0.0.0-20160511215533-1f3b11f56072/go.mod h1:duJ4Jxv5lDcvg4QuQr0oowTf7dz4/CR8NtyCooz9HL8=
github.com/fatih/structs v1.1.0 h1:Q7juDM0QtcnhCpeyLGQKyg4TOIghuNXrkL32pHAUMxo=
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
github.com/fergusstrange/embedded-postgres v1.10.0 h1:YnwF6xAQYmKLAXXrrRx4rHDLih47YJwVPvg8jeKfdNg=
github.com/flosch/pongo2 v0.0.0-20190707114632-bbf5a6c351f4/go.mod h1:T9YF2M40nIgbVgp3rreNmTged+9HrbNTIQf1PsaIiTA=
@@ -223,6 +228,8 @@ github.com/jb55/lnsocket/go v0.0.0-20220725174341-b98b5cd37bb6/go.mod h1:atFK/q4
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jessevdk/go-flags v1.4.0 h1:4IU2WS7AumrZ/40jfhf4QVDMsQwqA7VEHozFRrGARJA=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jgroeneveld/schema v1.0.0/go.mod h1:M14lv7sNMtGvo3ops1MwslaSYgDYxrSmbzWIQ0Mr5rs=
github.com/jgroeneveld/trial v2.0.0+incompatible/go.mod h1:I6INLW96EN8WysNBXUFI3M4RIC8ePg9ntAc/Wy+U/+M=
github.com/jmoiron/sqlx v1.3.1 h1:aLN7YINNZ7cYOPK3QC83dbM6KT0NMqVMw961TqrejlE=
github.com/jmoiron/sqlx v1.3.1/go.mod h1:2BljVx/86SuTyjE+aPYlHCTNvZrnJXghYGpNiXLBMCQ=
github.com/jonboulle/clockwork v0.2.2 h1:UOGuzwb1PwsrDAObMuhUnj0p5ULPj8V/xJ7Kx9qUBdQ=

View File

@@ -1,3 +1,9 @@
# Search Relay
Uses ElasticSearch storage backend for all queries, with some basic full text search support.
Index some events:
```
bzip2 -cd nostr-wellorder-early-1m-v1.jsonl.bz2 | \
jq -c '["EVENT", .]' | \
@@ -5,16 +11,16 @@ bzip2 -cd nostr-wellorder-early-1m-v1.jsonl.bz2 | \
websocat -n -B 200000 ws://127.0.0.1:7447
```
Do a search:
```
echo '["REQ", "asdf", {"search": "steve", "kinds": [0]}]' | websocat -n ws://127.0.0.1:7447
```
todo:
## Customize
* index `content_search` field
* support search queries
* some kind of ranking signal (based on pubkey)
* better config for ES: adjust bulk indexer settings, use custom mapping?
Currently the indexing is very basic: It will index the `contents` field for all events where kind != 4.
Some additional mapping and pre-processing could add better support for different content types.
See comments in `storage/elasticsearch/elasticsearch.go`.
* ES DSL string builder might not escape json strings correctly...

View File

@@ -6,7 +6,6 @@ import (
"encoding/json"
"fmt"
"io"
"log"
"os"
"strings"
"time"
@@ -47,12 +46,18 @@ var indexMapping = `
`
type ElasticsearchStorage struct {
es *elasticsearch.Client
bi esutil.BulkIndexer
indexName string
IndexName string
es *elasticsearch.Client
bi esutil.BulkIndexer
}
func (ess *ElasticsearchStorage) Init() error {
if ess.IndexName == "" {
ess.IndexName = "events"
}
cfg := elasticsearch.Config{}
if x := os.Getenv("ES_URL"); x != "" {
cfg.Addresses = strings.Split(x, ",")
@@ -62,10 +67,7 @@ func (ess *ElasticsearchStorage) Init() error {
return err
}
// todo: config + mapping settings
ess.indexName = "test3"
res, err := es.Indices.Create(ess.indexName, es.Indices.Create.WithBody(strings.NewReader(indexMapping)))
res, err := es.Indices.Create(ess.IndexName, es.Indices.Create.WithBody(strings.NewReader(indexMapping)))
if err != nil {
return err
}
@@ -79,13 +81,13 @@ func (ess *ElasticsearchStorage) Init() error {
// bulk indexer
bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Index: ess.indexName, // The default index name
Client: es, // The Elasticsearch client
NumWorkers: 2, // The number of worker goroutines
FlushInterval: 3 * time.Second, // The periodic flush interval
Index: ess.IndexName,
Client: es,
NumWorkers: 2,
FlushInterval: 3 * time.Second,
})
if err != nil {
log.Fatalf("Error creating the indexer: %s", err)
return fmt.Errorf("error creating the indexer: %s", err)
}
ess.es = es
@@ -127,9 +129,6 @@ func (ess *ElasticsearchStorage) DeleteEvent(id string, pubkey string) error {
}
err = <-done
if err != nil {
log.Println("DEL", err)
}
return err
}
@@ -182,8 +181,5 @@ func (ess *ElasticsearchStorage) SaveEvent(event *nostr.Event) error {
}
err = <-done
if err != nil {
log.Println("SAVE", err)
}
return err
}

View File

@@ -8,8 +8,9 @@ import (
"fmt"
"io"
"log"
"strings"
"reflect"
"github.com/aquasecurity/esquery"
"github.com/elastic/go-elasticsearch/v8/esutil"
"github.com/nbd-wtf/go-nostr"
)
@@ -28,87 +29,67 @@ type EsSearchResult struct {
}
}
func buildDsl(filter *nostr.Filter) string {
b := &strings.Builder{}
b.WriteString(`{"query": {"bool": {"filter": {"bool": {"must": [`)
func buildDsl(filter *nostr.Filter) ([]byte, error) {
dsl := esquery.Bool()
prefixFilter := func(fieldName string, values []string) {
b.WriteString(`{"bool": {"should": [`)
for idx, val := range values {
if idx > 0 {
b.WriteRune(',')
}
op := "term"
if len(val) < 64 {
op = "prefix"
}
b.WriteString(fmt.Sprintf(`{"%s": {"event.%s": %q}}`, op, fieldName, val))
if len(values) == 0 {
return
}
b.WriteString(`]}},`)
prefixQ := esquery.Bool()
for _, v := range values {
if len(v) < 64 {
prefixQ.Should(esquery.Prefix(fieldName, v))
} else {
prefixQ.Should(esquery.Term(fieldName, v))
}
}
dsl.Must(prefixQ)
}
// ids
prefixFilter("id", filter.IDs)
prefixFilter("event.id", filter.IDs)
// authors
prefixFilter("pubkey", filter.Authors)
prefixFilter("event.pubkey", filter.Authors)
// kinds
if len(filter.Kinds) > 0 {
k, _ := json.Marshal(filter.Kinds)
b.WriteString(fmt.Sprintf(`{"terms": {"event.kind": %s}},`, k))
dsl.Must(esquery.Terms("event.kind", toInterfaceSlice(filter.Kinds)...))
}
// tags
{
b.WriteString(`{"bool": {"should": [`)
commaIdx := 0
if len(filter.Tags) > 0 {
tagQ := esquery.Bool()
for char, terms := range filter.Tags {
if len(terms) == 0 {
continue
}
if commaIdx > 0 {
b.WriteRune(',')
}
commaIdx++
b.WriteString(`{"bool": {"must": [`)
for _, t := range terms {
b.WriteString(fmt.Sprintf(`{"term": {"event.tags": %q}},`, t))
}
// add the tag type at the end
b.WriteString(fmt.Sprintf(`{"term": {"event.tags": %q}}`, char))
b.WriteString(`]}}`)
vs := toInterfaceSlice(append(terms, char))
tagQ.Should(esquery.Terms("event.tags", vs...))
}
b.WriteString(`]}},`)
dsl.Must(tagQ)
}
// since
if filter.Since != nil {
b.WriteString(fmt.Sprintf(`{"range": {"event.created_at": {"gt": %d}}},`, filter.Since.Unix()))
dsl.Must(esquery.Range("event.created_at").Gt(filter.Since.Unix()))
}
// until
if filter.Until != nil {
b.WriteString(fmt.Sprintf(`{"range": {"event.created_at": {"lt": %d}}},`, filter.Until.Unix()))
dsl.Must(esquery.Range("event.created_at").Lt(filter.Until.Unix()))
}
// search
if filter.Search != "" {
b.WriteString(fmt.Sprintf(`{"match": {"content_search": {"query": %s}}},`, filter.Search))
dsl.Must(esquery.Match("content_search", filter.Search))
}
// all blocks have a trailing comma...
// add a match_all "noop" at the end
// so json is valid
b.WriteString(`{"match_all": {}}`)
b.WriteString(`]}}}}}`)
return b.String()
return json.Marshal(esquery.Query(dsl))
}
func (ess *ElasticsearchStorage) getByID(filter *nostr.Filter) ([]nostr.Event, error) {
got, err := ess.es.Mget(
esutil.NewJSONReader(filter),
ess.es.Mget.WithIndex(ess.indexName))
ess.es.Mget.WithIndex(ess.IndexName))
if err != nil {
return nil, err
}
@@ -134,8 +115,6 @@ func (ess *ElasticsearchStorage) getByID(filter *nostr.Filter) ([]nostr.Event, e
}
func (ess *ElasticsearchStorage) QueryEvents(filter *nostr.Filter) ([]nostr.Event, error) {
// Perform the search request...
// need to build up query body...
if filter == nil {
return nil, errors.New("filter cannot be null")
}
@@ -145,8 +124,10 @@ func (ess *ElasticsearchStorage) QueryEvents(filter *nostr.Filter) ([]nostr.Even
return ess.getByID(filter)
}
dsl := buildDsl(filter)
pprint([]byte(dsl))
dsl, err := buildDsl(filter)
if err != nil {
return nil, err
}
limit := 1000
if filter.Limit > 0 && filter.Limit < limit {
@@ -156,14 +137,11 @@ func (ess *ElasticsearchStorage) QueryEvents(filter *nostr.Filter) ([]nostr.Even
es := ess.es
res, err := es.Search(
es.Search.WithContext(context.Background()),
es.Search.WithIndex(ess.indexName),
es.Search.WithIndex(ess.IndexName),
es.Search.WithBody(strings.NewReader(dsl)),
es.Search.WithBody(bytes.NewReader(dsl)),
es.Search.WithSize(limit),
es.Search.WithSort("event.created_at:desc"),
// es.Search.WithTrackTotalHits(true),
// es.Search.WithPretty(),
)
if err != nil {
log.Fatalf("Error getting response: %s", err)
@@ -207,12 +185,23 @@ func isGetByID(filter *nostr.Filter) bool {
return isGetById
}
func pprint(j []byte) {
var dst bytes.Buffer
err := json.Indent(&dst, j, "", " ")
if err != nil {
fmt.Println("invalid JSON", err, string(j))
} else {
fmt.Println(dst.String())
// from: https://stackoverflow.com/a/12754757
func toInterfaceSlice(slice interface{}) []interface{} {
s := reflect.ValueOf(slice)
if s.Kind() != reflect.Slice {
panic("InterfaceSlice() given a non-slice type")
}
// Keep the distinction between nil and empty slice input
if s.IsNil() {
return nil
}
ret := make([]interface{}, s.Len())
for i := 0; i < s.Len(); i++ {
ret[i] = s.Index(i).Interface()
}
return ret
}

View File

@@ -1,7 +1,9 @@
package elasticsearch
import (
"bytes"
"encoding/json"
"fmt"
"testing"
"time"
@@ -12,34 +14,32 @@ func TestQuery(t *testing.T) {
now := time.Now()
yesterday := now.Add(time.Hour * -24)
filter := &nostr.Filter{
// IDs: []string{"abc", "123", "971b9489b4fd4e41a85951607922b982d981fa9d55318bc304f21f390721404c"},
IDs: []string{"abc", "123", "971b9489b4fd4e41a85951607922b982d981fa9d55318bc304f21f390721404c"},
Kinds: []int{0, 1},
// Tags: nostr.TagMap{
// "a": []string{"abc"},
// "b": []string{"aaa", "bbb"},
// },
Since: &yesterday,
Until: &now,
Limit: 100,
Tags: nostr.TagMap{
"e": []string{"abc"},
"p": []string{"aaa", "bbb"},
},
Since: &yesterday,
Until: &now,
Limit: 100,
Search: "other stuff",
}
dsl := buildDsl(filter)
pprint([]byte(dsl))
if !json.Valid([]byte(dsl)) {
t.Fail()
dsl, err := buildDsl(filter)
if err != nil {
t.Fatal(err)
}
pprint(dsl)
// "integration" test
// ess := &ElasticsearchStorage{}
// err := ess.Init()
// if err != nil {
// t.Error(err)
// }
// found, err := ess.QueryEvents(filter)
// if err != nil {
// t.Error(err)
// }
// fmt.Println(found)
}
func pprint(j []byte) {
var dst bytes.Buffer
err := json.Indent(&dst, j, "", " ")
if err != nil {
fmt.Println("invalid JSON", err, string(j))
} else {
fmt.Println(dst.String())
}
}