Merge pull request #17 from mattn/opensearch
Some checks failed
build cli / make-release (push) Has been cancelled
build cli / build-linux (push) Has been cancelled

add opensearch
This commit is contained in:
mattn
2024-05-24 00:48:29 +09:00
committed by GitHub
4 changed files with 448 additions and 3 deletions

3
go.mod
View File

@@ -13,7 +13,8 @@ require (
github.com/mailru/easyjson v0.7.7
github.com/mattn/go-sqlite3 v1.14.18
github.com/nbd-wtf/go-nostr v0.28.5
github.com/stretchr/testify v1.8.4
github.com/opensearch-project/opensearch-go/v4 v4.0.0
github.com/stretchr/testify v1.9.0
github.com/urfave/cli/v3 v3.0.0-alpha7
go.etcd.io/bbolt v1.3.9
golang.org/x/exp v0.0.0-20231006140011-7918f672742d

10
go.sum
View File

@@ -116,6 +116,8 @@ github.com/mattn/go-sqlite3 v1.14.18 h1:JL0eqdCOq6DJVNPSvArO/bIV9/P7fbGrV00LZHc+
github.com/mattn/go-sqlite3 v1.14.18/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg=
github.com/nbd-wtf/go-nostr v0.28.5 h1:5vBAFKGVJ6Rhq2Jrtj+v+j8bUVLdsao5SFdBIQ7PJR4=
github.com/nbd-wtf/go-nostr v0.28.5/go.mod h1:aFcp8NO3erHg+glzBfh4wpaMrV1/ahcUPAgITdptxwA=
github.com/opensearch-project/opensearch-go/v4 v4.0.0 h1:Nrh30HhaknKcaPcIzlqA6Jf0CBgWP5XUaSp0HMsRBlA=
github.com/opensearch-project/opensearch-go/v4 v4.0.0/go.mod h1:amlBgHgAX9AwwW50eOuzYa5n/8aD18LoWO8eDLoe8KQ=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
@@ -130,8 +132,8 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/tidwall/gjson v1.17.0 h1:/Jocvlh98kcTfpN2+JzGQWQcqrPQwDrVEMApx/M5ZwM=
github.com/tidwall/gjson v1.17.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
@@ -139,8 +141,12 @@ github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JT
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4=
github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY=
github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28=
github.com/urfave/cli/v3 v3.0.0-alpha7 h1:dj+WjtBA2StTinGwue+o2oyFFvo8aQ/AGb5MYvUqk/8=
github.com/urfave/cli/v3 v3.0.0-alpha7/go.mod h1:0kK/RUFHyh+yIKSfWxwheGndfnrvYSmYFVeKCh03ZUc=
github.com/wI2L/jsondiff v0.5.1 h1:xS4zYUspH4U3IB0Lwo9+jv+MSRJSWMF87Y4BpDbFMHo=
github.com/wI2L/jsondiff v0.5.1/go.mod h1:qqG6hnK0Lsrz2BpIVCxWiK9ItsBCpIZQiv0izJjOZ9s=
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 h1:bAn7/zixMGCfxrRTfdpNzjtPYqr8smhKouy9mxVdGPU=
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673/go.mod h1:N3UwUGtsrSj3ccvlPHLoLsHnpR27oXr4ZE984MbSER8=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=

207
opensearch/opensearch.go Normal file
View File

@@ -0,0 +1,207 @@
package opensearch
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"net/http"
"strings"
"time"
"github.com/fiatjaf/eventstore"
"github.com/nbd-wtf/go-nostr"
"github.com/opensearch-project/opensearch-go/v4"
"github.com/opensearch-project/opensearch-go/v4/opensearchapi"
"github.com/opensearch-project/opensearch-go/v4/opensearchutil"
)
var _ eventstore.Store = (*OpensearchStorage)(nil)
type IndexedEvent struct {
Event nostr.Event `json:"event"`
ContentSearch string `json:"content_search"`
}
var indexMapping = `
{
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0
},
"mappings": {
"dynamic": false,
"properties": {
"event": {
"dynamic": false,
"properties": {
"id": {"type": "keyword"},
"pubkey": {"type": "keyword"},
"kind": {"type": "integer"},
"tags": {"type": "keyword"},
"created_at": {"type": "date"}
}
},
"content_search": {"type": "text"}
}
}
}
`
type OpensearchStorage struct {
URL string
IndexName string
Insecure bool
client *opensearchapi.Client
bi opensearchutil.BulkIndexer
}
func (oss *OpensearchStorage) Close() {}
func (oss *OpensearchStorage) Init() error {
if oss.IndexName == "" {
oss.IndexName = "events"
}
cfg := opensearchapi.Config{}
if oss.URL != "" {
cfg.Client.Addresses = strings.Split(oss.URL, ",")
}
if oss.Insecure {
transport := http.DefaultTransport.(*http.Transport).Clone()
transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
cfg.Client.Transport = transport
}
client, err := opensearchapi.NewClient(cfg)
if err != nil {
return err
}
ctx := context.Background()
createIndexResponse, err := client.Indices.Create(
ctx,
opensearchapi.IndicesCreateReq{
Index: oss.IndexName,
Body: strings.NewReader(indexMapping),
},
)
if err != nil {
var opensearchError *opensearch.StructError
// Load err into opensearch.Error to access the fields and tolerate if the index already exists
if errors.As(err, &opensearchError) {
if opensearchError.Err.Type != "resource_already_exists_exception" {
return err
}
} else {
return err
}
}
fmt.Printf("Created Index: %s\n Shards Acknowledged: %t\n", createIndexResponse.Index, createIndexResponse.ShardsAcknowledged)
// bulk indexer
bi, err := opensearchutil.NewBulkIndexer(opensearchutil.BulkIndexerConfig{
Index: oss.IndexName,
Client: client,
NumWorkers: 2,
FlushInterval: 3 * time.Second,
})
if err != nil {
return fmt.Errorf("error creating the indexer: %s", err)
}
oss.client = client
oss.bi = bi
return nil
}
func (oss *OpensearchStorage) DeleteEvent(ctx context.Context, evt *nostr.Event) error {
done := make(chan error)
err := oss.bi.Add(
ctx,
opensearchutil.BulkIndexerItem{
Action: "delete",
DocumentID: evt.ID,
OnSuccess: func(ctx context.Context, item opensearchutil.BulkIndexerItem, res opensearchapi.BulkRespItem) {
close(done)
},
OnFailure: func(ctx context.Context, item opensearchutil.BulkIndexerItem, res opensearchapi.BulkRespItem, err error) {
if err != nil {
done <- err
} else {
// ok if deleted item not found
if res.Status == 404 {
close(done)
return
}
txt, _ := json.Marshal(res)
err := fmt.Errorf("ERROR: %s", txt)
done <- err
}
},
},
)
if err != nil {
return err
}
err = <-done
return err
}
func (oss *OpensearchStorage) SaveEvent(ctx context.Context, evt *nostr.Event) error {
ie := &IndexedEvent{
Event: *evt,
}
// post processing: index for FTS
// some ideas:
// - index kind=0 fields a set of dedicated mapped fields
// (or use a separate index for profiles with a dedicated mapping)
// - if it's valid JSON just index the "values" and not the keys
// - more content introspection: language detection
// - denormalization... attach profile + ranking signals to events
if evt.Kind != 4 {
ie.ContentSearch = evt.Content
}
data, err := json.Marshal(ie)
if err != nil {
return err
}
done := make(chan error)
// adapted from:
// https://github.com/elastic/go-elasticsearch/blob/main/_examples/bulk/indexer.go#L196
err = oss.bi.Add(
ctx,
opensearchutil.BulkIndexerItem{
Action: "index",
DocumentID: evt.ID,
Body: bytes.NewReader(data),
OnSuccess: func(ctx context.Context, item opensearchutil.BulkIndexerItem, res opensearchapi.BulkRespItem) {
close(done)
},
OnFailure: func(ctx context.Context, item opensearchutil.BulkIndexerItem, res opensearchapi.BulkRespItem, err error) {
if err != nil {
done <- err
} else {
err := fmt.Errorf("ERROR: %s: %s", res.Error.Type, res.Error.Reason)
done <- err
}
},
},
)
if err != nil {
return err
}
err = <-done
return err
}

231
opensearch/query.go Normal file
View File

@@ -0,0 +1,231 @@
package opensearch
import (
"bytes"
"context"
"encoding/json"
"fmt"
"reflect"
"github.com/aquasecurity/esquery"
"github.com/nbd-wtf/go-nostr"
"github.com/opensearch-project/opensearch-go/v4/opensearchapi"
"github.com/opensearch-project/opensearch-go/v4/opensearchutil"
)
func buildDsl(filter nostr.Filter) ([]byte, error) {
dsl := esquery.Bool()
prefixFilter := func(fieldName string, values []string) {
if len(values) == 0 {
return
}
prefixQ := esquery.Bool()
for _, v := range values {
if len(v) < 64 {
prefixQ.Should(esquery.Prefix(fieldName, v))
} else {
prefixQ.Should(esquery.Term(fieldName, v))
}
}
dsl.Must(prefixQ)
}
// ids
prefixFilter("event.id", filter.IDs)
// authors
prefixFilter("event.pubkey", filter.Authors)
// kinds
if len(filter.Kinds) > 0 {
dsl.Must(esquery.Terms("event.kind", toInterfaceSlice(filter.Kinds)...))
}
// tags
if len(filter.Tags) > 0 {
tagQ := esquery.Bool()
for char, terms := range filter.Tags {
vs := toInterfaceSlice(append(terms, char))
tagQ.Should(esquery.Terms("event.tags", vs...))
}
dsl.Must(tagQ)
}
// since
if filter.Since != nil {
dsl.Must(esquery.Range("event.created_at").Gte(filter.Since))
}
// until
if filter.Until != nil {
dsl.Must(esquery.Range("event.created_at").Lte(filter.Until))
}
// search
if filter.Search != "" {
dsl.Must(esquery.Match("content_search", filter.Search))
}
return json.Marshal(esquery.Query(dsl))
}
func (oss *OpensearchStorage) getByID(filter nostr.Filter) ([]*nostr.Event, error) {
ctx := context.Background()
mgetResponse, err := oss.client.MGet(
ctx,
opensearchapi.MGetReq{
Body: opensearchutil.NewJSONReader(filter),
Index: oss.IndexName,
},
)
if err != nil {
return nil, err
}
events := make([]*nostr.Event, 0, len(mgetResponse.Docs))
for _, e := range mgetResponse.Docs {
if e.Found {
if b, err := e.Source.MarshalJSON(); err == nil {
var payload struct {
Event nostr.Event `json:"event"`
}
if err = json.Unmarshal(b, &payload); err == nil {
events = append(events, &payload.Event)
}
}
}
}
return events, nil
}
func (oss *OpensearchStorage) QueryEvents(ctx context.Context, filter nostr.Filter) (chan *nostr.Event, error) {
ch := make(chan *nostr.Event)
// optimization: get by id
if isGetByID(filter) {
if evts, err := oss.getByID(filter); err == nil {
for _, evt := range evts {
ch <- evt
}
close(ch)
} else {
return nil, fmt.Errorf("error getting by id: %w", err)
}
}
dsl, err := buildDsl(filter)
if err != nil {
return nil, err
}
limit := 1000
if filter.Limit > 0 && filter.Limit < limit {
limit = filter.Limit
}
ctx = context.Background()
searchResponse, err := oss.client.Search(
ctx,
&opensearchapi.SearchReq{
Indices: []string{oss.IndexName},
Body: bytes.NewReader(dsl),
Params: opensearchapi.SearchParams{
Size: opensearchapi.ToPointer(limit),
Sort: []string{"event.created_at:desc"},
},
},
)
if err != nil {
return nil, err
}
go func() {
for _, e := range searchResponse.Hits.Hits {
if b, err := e.Source.MarshalJSON(); err == nil {
var payload struct {
Event nostr.Event `json:"event"`
}
if err = json.Unmarshal(b, &payload); err == nil {
ch <- &payload.Event
}
}
}
close(ch)
}()
return ch, nil
}
func isGetByID(filter nostr.Filter) bool {
isGetById := len(filter.IDs) > 0 &&
len(filter.Authors) == 0 &&
len(filter.Kinds) == 0 &&
len(filter.Tags) == 0 &&
len(filter.Search) == 0 &&
filter.Since == nil &&
filter.Until == nil
if isGetById {
for _, id := range filter.IDs {
if len(id) != 64 {
return false
}
}
}
return isGetById
}
// from: https://stackoverflow.com/a/12754757
func toInterfaceSlice(slice interface{}) []interface{} {
s := reflect.ValueOf(slice)
if s.Kind() != reflect.Slice {
panic("InterfaceSlice() given a non-slice type")
}
// Keep the distinction between nil and empty slice input
if s.IsNil() {
return nil
}
ret := make([]interface{}, s.Len())
for i := 0; i < s.Len(); i++ {
ret[i] = s.Index(i).Interface()
}
return ret
}
func (oss *OpensearchStorage) CountEvents(ctx context.Context, filter nostr.Filter) (int64, error) {
count := int64(0)
// optimization: get by id
if isGetByID(filter) {
if evts, err := oss.getByID(filter); err == nil {
count += int64(len(evts))
} else {
return 0, fmt.Errorf("error getting by id: %w", err)
}
}
dsl, err := buildDsl(filter)
if err != nil {
return 0, err
}
ctx = context.Background()
countRes, err := oss.client.Indices.Count(
ctx,
&opensearchapi.IndicesCountReq{
Indices: []string{oss.IndexName},
Body: bytes.NewReader(dsl),
},
)
if err != nil {
return 0, err
}
return int64(countRes.Count) + count, nil
}