package opensearch import ( "bytes" "context" "encoding/json" "errors" "fmt" "strings" "time" "github.com/fiatjaf/eventstore" "github.com/nbd-wtf/go-nostr" "github.com/opensearch-project/opensearch-go/v4" "github.com/opensearch-project/opensearch-go/v4/opensearchapi" "github.com/opensearch-project/opensearch-go/v4/opensearchutil" ) var _ eventstore.Store = (*OpensearchStorage)(nil) type IndexedEvent struct { Event nostr.Event `json:"event"` ContentSearch string `json:"content_search"` } var indexMapping = ` { "settings": { "number_of_shards": 1, "number_of_replicas": 0 }, "mappings": { "dynamic": false, "properties": { "event": { "dynamic": false, "properties": { "id": {"type": "keyword"}, "pubkey": {"type": "keyword"}, "kind": {"type": "integer"}, "tags": {"type": "keyword"}, "created_at": {"type": "date"} } }, "content_search": {"type": "text"} } } } ` type OpensearchStorage struct { URL string IndexName string client *opensearchapi.Client bi opensearchutil.BulkIndexer } func (oss *OpensearchStorage) Close() {} func (oss *OpensearchStorage) Init() error { if oss.IndexName == "" { oss.IndexName = "events" } cfg := opensearchapi.Config{} if oss.URL != "" { cfg.Client.Addresses = strings.Split(oss.URL, ",") } client, err := opensearchapi.NewClient(cfg) if err != nil { return err } ctx := context.Background() createIndexResponse, err := client.Indices.Create( ctx, opensearchapi.IndicesCreateReq{ Index: oss.IndexName, Body: strings.NewReader(indexMapping), }, ) if err != nil { var opensearchError *opensearch.StructError // Load err into opensearch.Error to access the fields and tolerate if the index already exists if errors.As(err, &opensearchError) { if opensearchError.Err.Type != "resource_already_exists_exception" { return err } } else { return err } } fmt.Printf("Created Index: %s\n Shards Acknowledged: %t\n", createIndexResponse.Index, createIndexResponse.ShardsAcknowledged) // bulk indexer bi, err := opensearchutil.NewBulkIndexer(opensearchutil.BulkIndexerConfig{ Index: oss.IndexName, Client: client, NumWorkers: 2, FlushInterval: 3 * time.Second, }) if err != nil { return fmt.Errorf("error creating the indexer: %s", err) } oss.client = client oss.bi = bi return nil } func (oss *OpensearchStorage) DeleteEvent(ctx context.Context, evt *nostr.Event) error { done := make(chan error) err := oss.bi.Add( ctx, opensearchutil.BulkIndexerItem{ Action: "delete", DocumentID: evt.ID, OnSuccess: func(ctx context.Context, item opensearchutil.BulkIndexerItem, res opensearchapi.BulkRespItem) { close(done) }, OnFailure: func(ctx context.Context, item opensearchutil.BulkIndexerItem, res opensearchapi.BulkRespItem, err error) { if err != nil { done <- err } else { // ok if deleted item not found if res.Status == 404 { close(done) return } txt, _ := json.Marshal(res) err := fmt.Errorf("ERROR: %s", txt) done <- err } }, }, ) if err != nil { return err } err = <-done return err } func (oss *OpensearchStorage) SaveEvent(ctx context.Context, evt *nostr.Event) error { ie := &IndexedEvent{ Event: *evt, } // post processing: index for FTS // some ideas: // - index kind=0 fields a set of dedicated mapped fields // (or use a separate index for profiles with a dedicated mapping) // - if it's valid JSON just index the "values" and not the keys // - more content introspection: language detection // - denormalization... attach profile + ranking signals to events if evt.Kind != 4 { ie.ContentSearch = evt.Content } data, err := json.Marshal(ie) if err != nil { return err } done := make(chan error) // adapted from: // https://github.com/elastic/go-elasticsearch/blob/main/_examples/bulk/indexer.go#L196 err = oss.bi.Add( ctx, opensearchutil.BulkIndexerItem{ Action: "index", DocumentID: evt.ID, Body: bytes.NewReader(data), OnSuccess: func(ctx context.Context, item opensearchutil.BulkIndexerItem, res opensearchapi.BulkRespItem) { close(done) }, OnFailure: func(ctx context.Context, item opensearchutil.BulkIndexerItem, res opensearchapi.BulkRespItem, err error) { if err != nil { done <- err } else { err := fmt.Errorf("ERROR: %s: %s", res.Error.Type, res.Error.Reason) done <- err } }, }, ) if err != nil { return err } err = <-done return err }