diff --git a/go.mod b/go.mod index 758159d..d0c2554 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,8 @@ require ( github.com/mailru/easyjson v0.7.7 github.com/mattn/go-sqlite3 v1.14.18 github.com/nbd-wtf/go-nostr v0.28.5 - github.com/stretchr/testify v1.8.4 + github.com/opensearch-project/opensearch-go/v4 v4.0.0 + github.com/stretchr/testify v1.9.0 github.com/urfave/cli/v3 v3.0.0-alpha7 go.etcd.io/bbolt v1.3.9 golang.org/x/exp v0.0.0-20231006140011-7918f672742d diff --git a/go.sum b/go.sum index ef2b903..2deebe6 100644 --- a/go.sum +++ b/go.sum @@ -116,6 +116,8 @@ github.com/mattn/go-sqlite3 v1.14.18 h1:JL0eqdCOq6DJVNPSvArO/bIV9/P7fbGrV00LZHc+ github.com/mattn/go-sqlite3 v1.14.18/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= github.com/nbd-wtf/go-nostr v0.28.5 h1:5vBAFKGVJ6Rhq2Jrtj+v+j8bUVLdsao5SFdBIQ7PJR4= github.com/nbd-wtf/go-nostr v0.28.5/go.mod h1:aFcp8NO3erHg+glzBfh4wpaMrV1/ahcUPAgITdptxwA= +github.com/opensearch-project/opensearch-go/v4 v4.0.0 h1:Nrh30HhaknKcaPcIzlqA6Jf0CBgWP5XUaSp0HMsRBlA= +github.com/opensearch-project/opensearch-go/v4 v4.0.0/go.mod h1:amlBgHgAX9AwwW50eOuzYa5n/8aD18LoWO8eDLoe8KQ= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -130,8 +132,8 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= -github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/tidwall/gjson v1.17.0 h1:/Jocvlh98kcTfpN2+JzGQWQcqrPQwDrVEMApx/M5ZwM= github.com/tidwall/gjson v1.17.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= @@ -139,8 +141,12 @@ github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JT github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4= github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= +github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY= +github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28= github.com/urfave/cli/v3 v3.0.0-alpha7 h1:dj+WjtBA2StTinGwue+o2oyFFvo8aQ/AGb5MYvUqk/8= github.com/urfave/cli/v3 v3.0.0-alpha7/go.mod h1:0kK/RUFHyh+yIKSfWxwheGndfnrvYSmYFVeKCh03ZUc= +github.com/wI2L/jsondiff v0.5.1 h1:xS4zYUspH4U3IB0Lwo9+jv+MSRJSWMF87Y4BpDbFMHo= +github.com/wI2L/jsondiff v0.5.1/go.mod h1:qqG6hnK0Lsrz2BpIVCxWiK9ItsBCpIZQiv0izJjOZ9s= github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 h1:bAn7/zixMGCfxrRTfdpNzjtPYqr8smhKouy9mxVdGPU= github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673/go.mod h1:N3UwUGtsrSj3ccvlPHLoLsHnpR27oXr4ZE984MbSER8= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= diff --git a/opensearch/opensearch.go b/opensearch/opensearch.go new file mode 100644 index 0000000..3986c1b --- /dev/null +++ b/opensearch/opensearch.go @@ -0,0 +1,207 @@ +package opensearch + +import ( + "bytes" + "context" + "crypto/tls" + "encoding/json" + "errors" + "fmt" + "net/http" + "strings" + "time" + + "github.com/fiatjaf/eventstore" + "github.com/nbd-wtf/go-nostr" + "github.com/opensearch-project/opensearch-go/v4" + "github.com/opensearch-project/opensearch-go/v4/opensearchapi" + "github.com/opensearch-project/opensearch-go/v4/opensearchutil" +) + +var _ eventstore.Store = (*OpensearchStorage)(nil) + +type IndexedEvent struct { + Event nostr.Event `json:"event"` + ContentSearch string `json:"content_search"` +} + +var indexMapping = ` +{ + "settings": { + "number_of_shards": 1, + "number_of_replicas": 0 + }, + "mappings": { + "dynamic": false, + "properties": { + "event": { + "dynamic": false, + "properties": { + "id": {"type": "keyword"}, + "pubkey": {"type": "keyword"}, + "kind": {"type": "integer"}, + "tags": {"type": "keyword"}, + "created_at": {"type": "date"} + } + }, + "content_search": {"type": "text"} + } + } +} +` + +type OpensearchStorage struct { + URL string + IndexName string + Insecure bool + + client *opensearchapi.Client + bi opensearchutil.BulkIndexer +} + +func (oss *OpensearchStorage) Close() {} + +func (oss *OpensearchStorage) Init() error { + if oss.IndexName == "" { + oss.IndexName = "events" + } + + cfg := opensearchapi.Config{} + if oss.URL != "" { + cfg.Client.Addresses = strings.Split(oss.URL, ",") + } + if oss.Insecure { + transport := http.DefaultTransport.(*http.Transport).Clone() + transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} + cfg.Client.Transport = transport + } + + client, err := opensearchapi.NewClient(cfg) + if err != nil { + return err + } + + ctx := context.Background() + createIndexResponse, err := client.Indices.Create( + ctx, + opensearchapi.IndicesCreateReq{ + Index: oss.IndexName, + Body: strings.NewReader(indexMapping), + }, + ) + if err != nil { + var opensearchError *opensearch.StructError + + // Load err into opensearch.Error to access the fields and tolerate if the index already exists + if errors.As(err, &opensearchError) { + if opensearchError.Err.Type != "resource_already_exists_exception" { + return err + } + } else { + return err + } + } + fmt.Printf("Created Index: %s\n Shards Acknowledged: %t\n", createIndexResponse.Index, createIndexResponse.ShardsAcknowledged) + + // bulk indexer + bi, err := opensearchutil.NewBulkIndexer(opensearchutil.BulkIndexerConfig{ + Index: oss.IndexName, + Client: client, + NumWorkers: 2, + FlushInterval: 3 * time.Second, + }) + if err != nil { + return fmt.Errorf("error creating the indexer: %s", err) + } + + oss.client = client + oss.bi = bi + + return nil +} + +func (oss *OpensearchStorage) DeleteEvent(ctx context.Context, evt *nostr.Event) error { + done := make(chan error) + err := oss.bi.Add( + ctx, + opensearchutil.BulkIndexerItem{ + Action: "delete", + DocumentID: evt.ID, + OnSuccess: func(ctx context.Context, item opensearchutil.BulkIndexerItem, res opensearchapi.BulkRespItem) { + close(done) + }, + OnFailure: func(ctx context.Context, item opensearchutil.BulkIndexerItem, res opensearchapi.BulkRespItem, err error) { + if err != nil { + done <- err + } else { + // ok if deleted item not found + if res.Status == 404 { + close(done) + return + } + txt, _ := json.Marshal(res) + err := fmt.Errorf("ERROR: %s", txt) + done <- err + } + }, + }, + ) + if err != nil { + return err + } + + err = <-done + return err +} + +func (oss *OpensearchStorage) SaveEvent(ctx context.Context, evt *nostr.Event) error { + ie := &IndexedEvent{ + Event: *evt, + } + + // post processing: index for FTS + // some ideas: + // - index kind=0 fields a set of dedicated mapped fields + // (or use a separate index for profiles with a dedicated mapping) + // - if it's valid JSON just index the "values" and not the keys + // - more content introspection: language detection + // - denormalization... attach profile + ranking signals to events + if evt.Kind != 4 { + ie.ContentSearch = evt.Content + } + + data, err := json.Marshal(ie) + if err != nil { + return err + } + + done := make(chan error) + + // adapted from: + // https://github.com/elastic/go-elasticsearch/blob/main/_examples/bulk/indexer.go#L196 + err = oss.bi.Add( + ctx, + opensearchutil.BulkIndexerItem{ + Action: "index", + DocumentID: evt.ID, + Body: bytes.NewReader(data), + OnSuccess: func(ctx context.Context, item opensearchutil.BulkIndexerItem, res opensearchapi.BulkRespItem) { + close(done) + }, + OnFailure: func(ctx context.Context, item opensearchutil.BulkIndexerItem, res opensearchapi.BulkRespItem, err error) { + if err != nil { + done <- err + } else { + err := fmt.Errorf("ERROR: %s: %s", res.Error.Type, res.Error.Reason) + done <- err + } + }, + }, + ) + if err != nil { + return err + } + + err = <-done + return err +} diff --git a/opensearch/query.go b/opensearch/query.go new file mode 100644 index 0000000..bcc1eaf --- /dev/null +++ b/opensearch/query.go @@ -0,0 +1,231 @@ +package opensearch + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "reflect" + + "github.com/aquasecurity/esquery" + "github.com/nbd-wtf/go-nostr" + "github.com/opensearch-project/opensearch-go/v4/opensearchapi" + "github.com/opensearch-project/opensearch-go/v4/opensearchutil" +) + +func buildDsl(filter nostr.Filter) ([]byte, error) { + dsl := esquery.Bool() + + prefixFilter := func(fieldName string, values []string) { + if len(values) == 0 { + return + } + prefixQ := esquery.Bool() + for _, v := range values { + if len(v) < 64 { + prefixQ.Should(esquery.Prefix(fieldName, v)) + } else { + prefixQ.Should(esquery.Term(fieldName, v)) + } + } + dsl.Must(prefixQ) + } + + // ids + prefixFilter("event.id", filter.IDs) + + // authors + prefixFilter("event.pubkey", filter.Authors) + + // kinds + if len(filter.Kinds) > 0 { + dsl.Must(esquery.Terms("event.kind", toInterfaceSlice(filter.Kinds)...)) + } + + // tags + if len(filter.Tags) > 0 { + tagQ := esquery.Bool() + for char, terms := range filter.Tags { + vs := toInterfaceSlice(append(terms, char)) + tagQ.Should(esquery.Terms("event.tags", vs...)) + } + dsl.Must(tagQ) + } + + // since + if filter.Since != nil { + dsl.Must(esquery.Range("event.created_at").Gte(filter.Since)) + } + + // until + if filter.Until != nil { + dsl.Must(esquery.Range("event.created_at").Lte(filter.Until)) + } + + // search + if filter.Search != "" { + dsl.Must(esquery.Match("content_search", filter.Search)) + } + + return json.Marshal(esquery.Query(dsl)) +} + +func (oss *OpensearchStorage) getByID(filter nostr.Filter) ([]*nostr.Event, error) { + ctx := context.Background() + mgetResponse, err := oss.client.MGet( + ctx, + opensearchapi.MGetReq{ + Body: opensearchutil.NewJSONReader(filter), + Index: oss.IndexName, + }, + ) + if err != nil { + return nil, err + } + + events := make([]*nostr.Event, 0, len(mgetResponse.Docs)) + for _, e := range mgetResponse.Docs { + if e.Found { + if b, err := e.Source.MarshalJSON(); err == nil { + var payload struct { + Event nostr.Event `json:"event"` + } + if err = json.Unmarshal(b, &payload); err == nil { + events = append(events, &payload.Event) + } + } + } + } + + return events, nil +} + +func (oss *OpensearchStorage) QueryEvents(ctx context.Context, filter nostr.Filter) (chan *nostr.Event, error) { + ch := make(chan *nostr.Event) + + // optimization: get by id + if isGetByID(filter) { + if evts, err := oss.getByID(filter); err == nil { + for _, evt := range evts { + ch <- evt + } + close(ch) + } else { + return nil, fmt.Errorf("error getting by id: %w", err) + } + } + + dsl, err := buildDsl(filter) + if err != nil { + return nil, err + } + + limit := 1000 + if filter.Limit > 0 && filter.Limit < limit { + limit = filter.Limit + } + + ctx = context.Background() + searchResponse, err := oss.client.Search( + ctx, + &opensearchapi.SearchReq{ + Indices: []string{oss.IndexName}, + Body: bytes.NewReader(dsl), + Params: opensearchapi.SearchParams{ + Size: opensearchapi.ToPointer(limit), + Sort: []string{"event.created_at:desc"}, + }, + }, + ) + if err != nil { + return nil, err + } + + go func() { + for _, e := range searchResponse.Hits.Hits { + if b, err := e.Source.MarshalJSON(); err == nil { + var payload struct { + Event nostr.Event `json:"event"` + } + if err = json.Unmarshal(b, &payload); err == nil { + ch <- &payload.Event + } + } + } + close(ch) + }() + + return ch, nil +} + +func isGetByID(filter nostr.Filter) bool { + isGetById := len(filter.IDs) > 0 && + len(filter.Authors) == 0 && + len(filter.Kinds) == 0 && + len(filter.Tags) == 0 && + len(filter.Search) == 0 && + filter.Since == nil && + filter.Until == nil + + if isGetById { + for _, id := range filter.IDs { + if len(id) != 64 { + return false + } + } + } + return isGetById +} + +// from: https://stackoverflow.com/a/12754757 +func toInterfaceSlice(slice interface{}) []interface{} { + s := reflect.ValueOf(slice) + if s.Kind() != reflect.Slice { + panic("InterfaceSlice() given a non-slice type") + } + + // Keep the distinction between nil and empty slice input + if s.IsNil() { + return nil + } + + ret := make([]interface{}, s.Len()) + + for i := 0; i < s.Len(); i++ { + ret[i] = s.Index(i).Interface() + } + + return ret +} + +func (oss *OpensearchStorage) CountEvents(ctx context.Context, filter nostr.Filter) (int64, error) { + count := int64(0) + + // optimization: get by id + if isGetByID(filter) { + if evts, err := oss.getByID(filter); err == nil { + count += int64(len(evts)) + } else { + return 0, fmt.Errorf("error getting by id: %w", err) + } + } + + dsl, err := buildDsl(filter) + if err != nil { + return 0, err + } + + ctx = context.Background() + countRes, err := oss.client.Indices.Count( + ctx, + &opensearchapi.IndicesCountReq{ + Indices: []string{oss.IndexName}, + Body: bytes.NewReader(dsl), + }, + ) + if err != nil { + return 0, err + } + + return int64(countRes.Count) + count, nil +}