document ratel, remove count
count is just stupid, and nobody uses it anyway. for now just commented out but eventually it should be cleared out
This commit is contained in:
@@ -43,10 +43,10 @@ func (b *Backend) Init(path string) (err error) { return b.Backend.Init(path) }
|
||||
// IC is a request/response API authing at each request.
|
||||
func (b *Backend) Close() (err error) { return b.Backend.Close() }
|
||||
|
||||
// CountEvents returns the number of events found matching the filter.
|
||||
func (b *Backend) CountEvents(c context.T, f *filter.T) (count int, approx bool, err error) {
|
||||
return b.Backend.CountEvents(c, f)
|
||||
}
|
||||
// // CountEvents returns the number of events found matching the filter.
|
||||
// func (b *Backend) CountEvents(c context.T, f *filter.T) (count int, approx bool, err error) {
|
||||
// return b.Backend.CountEvents(c, f)
|
||||
// }
|
||||
|
||||
// DeleteEvent removes an event from the event store.
|
||||
func (b *Backend) DeleteEvent(c context.T, eid *eventid.T, noTombstone ...bool) (err error) {
|
||||
|
||||
@@ -192,37 +192,37 @@ func (b *Backend) QueryEvents(c context.T, f *filter.T) (evs event.Ts, err error
|
||||
return
|
||||
}
|
||||
|
||||
// CountEvents counts how many events match on a filter, providing an approximate flag if either
|
||||
// of the layers return this, and the result is the maximum of the two layers results.
|
||||
func (b *Backend) CountEvents(c context.T, f *filter.T) (count int, approx bool, err error) {
|
||||
var wg sync.WaitGroup
|
||||
var count1, count2 int
|
||||
var approx1, approx2 bool
|
||||
var err1, err2 error
|
||||
go func() {
|
||||
count1, approx1, err1 = b.L1.CountEvents(c, f)
|
||||
wg.Done()
|
||||
}()
|
||||
// because this is a low-data query we will wait until the L2 also gets a count,
|
||||
// which should be under a few hundred ms in most cases
|
||||
go func() {
|
||||
wg.Add(1)
|
||||
count2, approx2, err2 = b.L2.CountEvents(c, f)
|
||||
}()
|
||||
wg.Wait()
|
||||
// we return the maximum, it is assumed the L2 is authoritative, but it could be
|
||||
// the L1 has more for whatever reason, so return the maximum of the two.
|
||||
count = count1
|
||||
approx = approx1
|
||||
if count2 > count {
|
||||
count = count2
|
||||
// the approximate flag probably will be false if the L2 got more, and it is a
|
||||
// very large, non GC store.
|
||||
approx = approx2
|
||||
}
|
||||
err = errors.Join(err1, err2)
|
||||
return
|
||||
}
|
||||
// // CountEvents counts how many events match on a filter, providing an approximate flag if either
|
||||
// // of the layers return this, and the result is the maximum of the two layers results.
|
||||
// func (b *Backend) CountEvents(c context.T, f *filter.T) (count int, approx bool, err error) {
|
||||
// var wg sync.WaitGroup
|
||||
// var count1, count2 int
|
||||
// var approx1, approx2 bool
|
||||
// var err1, err2 error
|
||||
// go func() {
|
||||
// count1, approx1, err1 = b.L1.CountEvents(c, f)
|
||||
// wg.Done()
|
||||
// }()
|
||||
// // because this is a low-data query we will wait until the L2 also gets a count,
|
||||
// // which should be under a few hundred ms in most cases
|
||||
// go func() {
|
||||
// wg.Add(1)
|
||||
// count2, approx2, err2 = b.L2.CountEvents(c, f)
|
||||
// }()
|
||||
// wg.Wait()
|
||||
// // we return the maximum, it is assumed the L2 is authoritative, but it could be
|
||||
// // the L1 has more for whatever reason, so return the maximum of the two.
|
||||
// count = count1
|
||||
// approx = approx1
|
||||
// if count2 > count {
|
||||
// count = count2
|
||||
// // the approximate flag probably will be false if the L2 got more, and it is a
|
||||
// // very large, non GC store.
|
||||
// approx = approx2
|
||||
// }
|
||||
// err = errors.Join(err1, err2)
|
||||
// return
|
||||
// }
|
||||
|
||||
// DeleteEvent deletes an event on both the layer1 and layer2.
|
||||
func (b *Backend) DeleteEvent(c context.T, ev *eventid.T, noTombstone ...bool) (err error) {
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
package ratel
|
||||
|
||||
// Close the database. If the Flatten flag was set, then trigger the flattening of tables before
|
||||
// shutting down.
|
||||
func (r *T) Close() (err error) {
|
||||
// chk.E(r.DB.Sync())
|
||||
r.WG.Wait()
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"realy.lol/event"
|
||||
)
|
||||
|
||||
// Unmarshal an event from bytes, using compact encoding if configured.
|
||||
func (r *T) Unmarshal(ev *event.T, evb []byte) (rem []byte, err error) {
|
||||
if r.UseCompact {
|
||||
if rem, err = ev.UnmarshalCompact(evb); chk.E(err) {
|
||||
@@ -21,6 +22,7 @@ func (r *T) Unmarshal(ev *event.T, evb []byte) (rem []byte, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
// Marshal an event using compact encoding if configured.
|
||||
func (r *T) Marshal(ev *event.T, dst []byte) (b []byte) {
|
||||
if r.UseCompact {
|
||||
b = ev.MarshalCompact(dst)
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"realy.lol/store"
|
||||
)
|
||||
|
||||
// SetConfiguration stores the store.Configuration value to a provided setting.
|
||||
func (r *T) SetConfiguration(c *store.Configuration) (err error) {
|
||||
var b []byte
|
||||
if b, err = json.Marshal(c); chk.E(err) {
|
||||
@@ -24,6 +25,7 @@ func (r *T) SetConfiguration(c *store.Configuration) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
// GetConfiguration returns the current store.Configuration stored in the database.
|
||||
func (r *T) GetConfiguration() (c *store.Configuration, err error) {
|
||||
err = r.View(func(txn *badger.Txn) (err error) {
|
||||
c = &store.Configuration{BlockList: make([]string, 0)}
|
||||
|
||||
@@ -1,153 +1,135 @@
|
||||
package ratel
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/dgraph-io/badger/v4"
|
||||
|
||||
"realy.lol/context"
|
||||
"realy.lol/event"
|
||||
"realy.lol/eventid"
|
||||
"realy.lol/filter"
|
||||
"realy.lol/ratel/keys/createdat"
|
||||
"realy.lol/ratel/keys/serial"
|
||||
"realy.lol/ratel/prefixes"
|
||||
"realy.lol/sha256"
|
||||
"realy.lol/tag"
|
||||
)
|
||||
|
||||
func (r *T) CountEvents(c context.T, f *filter.T) (count int, approx bool, err error) {
|
||||
log.T.F("QueryEvents,%s", f.Serialize())
|
||||
var queries []query
|
||||
var extraFilter *filter.T
|
||||
var since uint64
|
||||
if queries, extraFilter, since, err = PrepareQueries(f); chk.E(err) {
|
||||
return
|
||||
}
|
||||
var delEvs [][]byte
|
||||
defer func() {
|
||||
// after the count delete any events that are expired as per NIP-40
|
||||
for _, d := range delEvs {
|
||||
chk.E(r.DeleteEvent(r.Ctx, eventid.NewWith(d)))
|
||||
}
|
||||
}()
|
||||
// search for the keys generated from the filter
|
||||
for _, q := range queries {
|
||||
select {
|
||||
case <-c.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
var eventKey []byte
|
||||
err = r.View(func(txn *badger.Txn) (err error) {
|
||||
// iterate only through keys and in reverse order
|
||||
opts := badger.IteratorOptions{
|
||||
Reverse: true,
|
||||
}
|
||||
it := txn.NewIterator(opts)
|
||||
defer it.Close()
|
||||
for it.Seek(q.start); it.ValidForPrefix(q.searchPrefix); it.Next() {
|
||||
select {
|
||||
case <-r.Ctx.Done():
|
||||
return
|
||||
case <-c.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
item := it.Item()
|
||||
k := item.KeyCopy(nil)
|
||||
if !q.skipTS {
|
||||
if len(k) < createdat.Len+serial.Len {
|
||||
continue
|
||||
}
|
||||
createdAt := createdat.FromKey(k)
|
||||
if createdAt.Val.U64() < since {
|
||||
break
|
||||
}
|
||||
}
|
||||
// todo: here we should get the kind field from the key and and collate the
|
||||
// todo: matches that are replaceable/parameterized replaceable ones to decode
|
||||
// todo: to check for replacements so we can actually not set the approx flag.
|
||||
ser := serial.FromKey(k)
|
||||
eventKey = prefixes.Event.Key(ser)
|
||||
// eventKeys = append(eventKeys, idx)
|
||||
}
|
||||
return
|
||||
})
|
||||
if chk.E(err) {
|
||||
// this means shutdown, probably
|
||||
if errors.Is(err, badger.ErrDBClosed) {
|
||||
return
|
||||
}
|
||||
}
|
||||
// todo: here we should decode replaceable events and discard the outdated versions
|
||||
if extraFilter != nil {
|
||||
// if there is an extra filter we need to fetch and decode the event to determine a
|
||||
// match.
|
||||
err = r.View(func(txn *badger.Txn) (err error) {
|
||||
opts := badger.IteratorOptions{Reverse: true}
|
||||
it := txn.NewIterator(opts)
|
||||
defer it.Close()
|
||||
for it.Seek(eventKey); it.ValidForPrefix(eventKey); it.Next() {
|
||||
item := it.Item()
|
||||
if r.HasL2 && item.ValueSize() == sha256.Size {
|
||||
// we will count this though it may not match in fact. for general,
|
||||
// simple filters there isn't likely to be an extrafilter anyway. the
|
||||
// count result can have an "approximate" flag so we flip this now.
|
||||
approx = true
|
||||
return
|
||||
}
|
||||
ev := &event.T{}
|
||||
var appr bool
|
||||
if err = item.Value(func(eventValue []byte) (err error) {
|
||||
var rem []byte
|
||||
if rem, err = r.Unmarshal(ev, eventValue); chk.E(err) {
|
||||
return
|
||||
}
|
||||
if len(rem) > 0 {
|
||||
log.T.S(rem)
|
||||
}
|
||||
if et := ev.Tags.GetFirst(tag.New("expiration")); et != nil {
|
||||
var exp uint64
|
||||
if exp, err = strconv.ParseUint(string(et.Value()), 10, 64); chk.E(err) {
|
||||
return
|
||||
}
|
||||
if int64(exp) > time.Now().Unix() {
|
||||
// this needs to be deleted
|
||||
delEvs = append(delEvs, ev.Id)
|
||||
return
|
||||
}
|
||||
}
|
||||
if ev.Kind.IsReplaceable() ||
|
||||
(ev.Kind.IsParameterizedReplaceable() &&
|
||||
ev.Tags.GetFirst(tag.New("d")) != nil) {
|
||||
// we aren't going to spend this extra time so this just flips the
|
||||
// approximate flag. generally clients are asking for counts to get
|
||||
// an outside estimate anyway, to avoid exceeding MaxLimit
|
||||
appr = true
|
||||
}
|
||||
return
|
||||
}); chk.E(err) {
|
||||
continue
|
||||
}
|
||||
if ev == nil {
|
||||
continue
|
||||
}
|
||||
if extraFilter.Matches(ev) {
|
||||
count++
|
||||
if appr {
|
||||
approx = true
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
return
|
||||
})
|
||||
} else {
|
||||
count++
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
// func (r *T) CountEvents(c context.T, f *filter.T) (count int, approx bool, err error) {
|
||||
// log.T.F("QueryEvents,%s", f.Serialize())
|
||||
// var queries []query
|
||||
// var extraFilter *filter.T
|
||||
// var since uint64
|
||||
// if queries, extraFilter, since, err = PrepareQueries(f); chk.E(err) {
|
||||
// return
|
||||
// }
|
||||
// var delEvs [][]byte
|
||||
// defer func() {
|
||||
// // after the count delete any events that are expired as per NIP-40
|
||||
// for _, d := range delEvs {
|
||||
// chk.E(r.DeleteEvent(r.Ctx, eventid.NewWith(d)))
|
||||
// }
|
||||
// }()
|
||||
// // search for the keys generated from the filter
|
||||
// for _, q := range queries {
|
||||
// select {
|
||||
// case <-c.Done():
|
||||
// return
|
||||
// default:
|
||||
// }
|
||||
// var eventKey []byte
|
||||
// err = r.View(func(txn *badger.Txn) (err error) {
|
||||
// // iterate only through keys and in reverse order
|
||||
// opts := badger.IteratorOptions{
|
||||
// Reverse: true,
|
||||
// }
|
||||
// it := txn.NewIterator(opts)
|
||||
// defer it.Close()
|
||||
// for it.Seek(q.start); it.ValidForPrefix(q.searchPrefix); it.Next() {
|
||||
// select {
|
||||
// case <-r.Ctx.Done():
|
||||
// return
|
||||
// case <-c.Done():
|
||||
// return
|
||||
// default:
|
||||
// }
|
||||
// item := it.Item()
|
||||
// k := item.KeyCopy(nil)
|
||||
// if !q.skipTS {
|
||||
// if len(k) < createdat.Len+serial.Len {
|
||||
// continue
|
||||
// }
|
||||
// createdAt := createdat.FromKey(k)
|
||||
// if createdAt.Val.U64() < since {
|
||||
// break
|
||||
// }
|
||||
// }
|
||||
// // todo: here we should get the kind field from the key and and collate the
|
||||
// // todo: matches that are replaceable/parameterized replaceable ones to decode
|
||||
// // todo: to check for replacements so we can actually not set the approx flag.
|
||||
// ser := serial.FromKey(k)
|
||||
// eventKey = prefixes.Event.Key(ser)
|
||||
// // eventKeys = append(eventKeys, idx)
|
||||
// }
|
||||
// return
|
||||
// })
|
||||
// if chk.E(err) {
|
||||
// // this means shutdown, probably
|
||||
// if errors.Is(err, badger.ErrDBClosed) {
|
||||
// return
|
||||
// }
|
||||
// }
|
||||
// // todo: here we should decode replaceable events and discard the outdated versions
|
||||
// if extraFilter != nil {
|
||||
// // if there is an extra filter we need to fetch and decode the event to determine a
|
||||
// // match.
|
||||
// err = r.View(func(txn *badger.Txn) (err error) {
|
||||
// opts := badger.IteratorOptions{Reverse: true}
|
||||
// it := txn.NewIterator(opts)
|
||||
// defer it.Close()
|
||||
// for it.Seek(eventKey); it.ValidForPrefix(eventKey); it.Next() {
|
||||
// item := it.Item()
|
||||
// if r.HasL2 && item.ValueSize() == sha256.Size {
|
||||
// // we will count this though it may not match in fact. for general,
|
||||
// // simple filters there isn't likely to be an extrafilter anyway. the
|
||||
// // count result can have an "approximate" flag so we flip this now.
|
||||
// approx = true
|
||||
// return
|
||||
// }
|
||||
// ev := &event.T{}
|
||||
// var appr bool
|
||||
// if err = item.Value(func(eventValue []byte) (err error) {
|
||||
// var rem []byte
|
||||
// if rem, err = r.Unmarshal(ev, eventValue); chk.E(err) {
|
||||
// return
|
||||
// }
|
||||
// if len(rem) > 0 {
|
||||
// log.T.S(rem)
|
||||
// }
|
||||
// if et := ev.Tags.GetFirst(tag.New("expiration")); et != nil {
|
||||
// var exp uint64
|
||||
// if exp, err = strconv.ParseUint(string(et.Value()), 10, 64); chk.E(err) {
|
||||
// return
|
||||
// }
|
||||
// if int64(exp) > time.Now().Unix() {
|
||||
// // this needs to be deleted
|
||||
// delEvs = append(delEvs, ev.Id)
|
||||
// return
|
||||
// }
|
||||
// }
|
||||
// if ev.Kind.IsReplaceable() ||
|
||||
// (ev.Kind.IsParameterizedReplaceable() &&
|
||||
// ev.Tags.GetFirst(tag.New("d")) != nil) {
|
||||
// // we aren't going to spend this extra time so this just flips the
|
||||
// // approximate flag. generally clients are asking for counts to get
|
||||
// // an outside estimate anyway, to avoid exceeding MaxLimit
|
||||
// appr = true
|
||||
// }
|
||||
// return
|
||||
// }); chk.E(err) {
|
||||
// continue
|
||||
// }
|
||||
// if ev == nil {
|
||||
// continue
|
||||
// }
|
||||
// if extraFilter.Matches(ev) {
|
||||
// count++
|
||||
// if appr {
|
||||
// approx = true
|
||||
// }
|
||||
// return
|
||||
// }
|
||||
// }
|
||||
// return
|
||||
// })
|
||||
// } else {
|
||||
// count++
|
||||
// }
|
||||
// }
|
||||
// return
|
||||
// }
|
||||
|
||||
@@ -16,9 +16,9 @@ import (
|
||||
"realy.lol/tag/atag"
|
||||
)
|
||||
|
||||
// GetTagKeyElements generates tag indexes from a tag key, tag value, created_at
|
||||
// Create_a_Tag generates tag indexes from a tag key, tag value, created_at
|
||||
// timestamp and the event serial.
|
||||
func GetTagKeyElements(tagKey, tagValue string, CA *createdat.T,
|
||||
func Create_a_Tag(tagKey, tagValue string, CA *createdat.T,
|
||||
ser *serial.T) (prf index.P, elems []keys.Element, err error) {
|
||||
|
||||
var pkb []byte
|
||||
@@ -38,7 +38,7 @@ func GetTagKeyElements(tagKey, tagValue string, CA *createdat.T,
|
||||
err = nil
|
||||
}
|
||||
}
|
||||
// check for a tag
|
||||
// check for `a` tag
|
||||
if tagKey == "a" && strings.Count(tagValue, ":") == 2 {
|
||||
a := &atag.T{}
|
||||
var rem []byte
|
||||
@@ -16,6 +16,8 @@ import (
|
||||
"realy.lol/timestamp"
|
||||
)
|
||||
|
||||
// DeleteEvent deletes an event if it exists, and writes a tombstone for the event unless
|
||||
// requested not to, so that the event can't be saved again.
|
||||
func (r *T) DeleteEvent(c context.T, eid *eventid.T, noTombstone ...bool) (err error) {
|
||||
var foundSerial []byte
|
||||
seri := serial.New(nil)
|
||||
|
||||
@@ -19,6 +19,8 @@ import (
|
||||
"realy.lol/tags"
|
||||
)
|
||||
|
||||
// Export the complete database of stored events to an io.Writer in line structured minified
|
||||
// JSON.
|
||||
func (r *T) Export(c context.T, w io.Writer, pubkeys ...[]byte) {
|
||||
var counter int
|
||||
var err error
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
"realy.lol/tag"
|
||||
)
|
||||
|
||||
// FetchIds retrieves events based on a list of event Ids that have been provided.
|
||||
func (r *T) FetchIds(c context.T, evIds *tag.T, out io.Writer) (err error) {
|
||||
// create an ample buffer for decoding events, 100kb should usually be enough, if
|
||||
// it needs to get bigger it will be reallocated.
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
package ratel
|
||||
|
||||
import (
|
||||
"realy.lol/units"
|
||||
"time"
|
||||
|
||||
"realy.lol/units"
|
||||
)
|
||||
|
||||
// GarbageCollector starts up a ticker that runs a check on space utilisation
|
||||
@@ -10,6 +11,9 @@ import (
|
||||
//
|
||||
// This function should be invoked as a goroutine, and will terminate when the
|
||||
// backend context is canceled.
|
||||
//
|
||||
// TODO: this needs to be updated and set to actually run by default specifically just for
|
||||
// TODO: pruning tombstones after they are a year or more old.
|
||||
func (r *T) GarbageCollector() {
|
||||
log.D.F("starting ratel back-end garbage collector,"+
|
||||
"max size %0.3fGb,"+
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
"realy.lol/sha256"
|
||||
)
|
||||
|
||||
// GCSweep runs the delete on all of the items that GCMark has determined should be deleted.
|
||||
func (r *T) GCSweep(evs, idxs DelItems) (err error) {
|
||||
// first we must gather all the indexes of the relevant events
|
||||
started := time.Now()
|
||||
|
||||
@@ -83,7 +83,8 @@ func GetIndexKeysForEvent(ev *event.T, ser *serial.T) (keyz [][]byte) {
|
||||
// get key prefix (with full length) and offset where to write the last
|
||||
// parts
|
||||
prf, elems := index.P(0), []keys.Element(nil)
|
||||
if prf, elems, err = GetTagKeyElements(string(t.F()[0]), string(t.F()[1]), CA, ser); chk.E(err) {
|
||||
if prf, elems, err = Create_a_Tag(string(t.F()[0]), string(t.F()[1]), CA,
|
||||
ser); chk.E(err) {
|
||||
log.I.F("%v", t.ToStringSlice())
|
||||
return
|
||||
}
|
||||
|
||||
@@ -9,7 +9,7 @@ import (
|
||||
|
||||
const maxLen = 500000000
|
||||
|
||||
// Import accepts an event
|
||||
// Import a collection of events in line structured minified JSON format (JSONL).
|
||||
func (r *T) Import(rr io.Reader) {
|
||||
r.Flatten = true
|
||||
var err error
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
"realy.lol/units"
|
||||
)
|
||||
|
||||
// Init sets up the database with the loaded configuration.
|
||||
func (r *T) Init(path string) (err error) {
|
||||
r.dataDir = path
|
||||
log.I.Ln("opening ratel event store at", r.Path())
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"realy.lol/lol"
|
||||
)
|
||||
|
||||
// NewLogger creates a new badger logger.
|
||||
func NewLogger(logLevel int, label string) (l *logger) {
|
||||
log.T.Ln("getting logger for", label)
|
||||
l = &logger{Label: label}
|
||||
@@ -21,10 +22,12 @@ type logger struct {
|
||||
Label string
|
||||
}
|
||||
|
||||
// SetLogLevel atomically adjusts the log level to the given log level code.
|
||||
func (l *logger) SetLogLevel(level int) {
|
||||
l.Level.Store(int32(level))
|
||||
}
|
||||
|
||||
// Errorf is a log printer for this level of message.
|
||||
func (l *logger) Errorf(s string, i ...interface{}) {
|
||||
if l.Level.Load() >= lol.Error {
|
||||
s = l.Label + ": " + s
|
||||
@@ -34,6 +37,7 @@ func (l *logger) Errorf(s string, i ...interface{}) {
|
||||
}
|
||||
}
|
||||
|
||||
// Warningf is a log printer for this level of message.
|
||||
func (l *logger) Warningf(s string, i ...interface{}) {
|
||||
if l.Level.Load() >= lol.Warn {
|
||||
s = l.Label + ": " + s
|
||||
@@ -43,6 +47,7 @@ func (l *logger) Warningf(s string, i ...interface{}) {
|
||||
}
|
||||
}
|
||||
|
||||
// Infof is a log printer for this level of message.
|
||||
func (l *logger) Infof(s string, i ...interface{}) {
|
||||
if l.Level.Load() >= lol.Info {
|
||||
s = l.Label + ": " + s
|
||||
@@ -52,6 +57,7 @@ func (l *logger) Infof(s string, i ...interface{}) {
|
||||
}
|
||||
}
|
||||
|
||||
// Debugf is a log printer for this level of message.
|
||||
func (l *logger) Debugf(s string, i ...interface{}) {
|
||||
if l.Level.Load() >= lol.Debug {
|
||||
s = l.Label + ": " + s
|
||||
|
||||
@@ -19,6 +19,7 @@ import (
|
||||
|
||||
const DefaultMaxLimit = 512
|
||||
|
||||
// T is a badger event store database with layer2 and garbage collection.
|
||||
type T struct {
|
||||
Ctx context.T
|
||||
WG *sync.WaitGroup
|
||||
@@ -60,6 +61,7 @@ type T struct {
|
||||
|
||||
var _ store.I = (*T)(nil)
|
||||
|
||||
// BackendParams is the configurations used in creating a new ratel.T.
|
||||
type BackendParams struct {
|
||||
Ctx context.T
|
||||
WG *sync.WaitGroup
|
||||
@@ -69,8 +71,10 @@ type BackendParams struct {
|
||||
Extra []int
|
||||
}
|
||||
|
||||
// New configures a a new ratel.T event store.
|
||||
func New(p BackendParams, params ...int) *T {
|
||||
return GetBackend(p.Ctx, p.WG, p.HasL2, p.UseCompact, p.BlockCacheSize, p.LogLevel, p.MaxLimit,
|
||||
return GetBackend(p.Ctx, p.WG, p.HasL2, p.UseCompact, p.BlockCacheSize, p.LogLevel,
|
||||
p.MaxLimit,
|
||||
p.Compression, params...)
|
||||
}
|
||||
|
||||
@@ -120,6 +124,7 @@ func GetBackend(Ctx context.T, WG *sync.WaitGroup, hasL2, useCompact bool,
|
||||
return
|
||||
}
|
||||
|
||||
// Path returns the path where the database files are stored.
|
||||
func (r *T) Path() string { return r.dataDir }
|
||||
|
||||
// SerialKey returns a key used for storing events, and the raw serial counter
|
||||
|
||||
@@ -131,6 +131,7 @@ var FilterPrefixes = [][]byte{
|
||||
{FullIndex.B()},
|
||||
}
|
||||
|
||||
// AllPrefixes is used to do a full database nuke.
|
||||
var AllPrefixes = [][]byte{
|
||||
{Event.B()},
|
||||
{CreatedAt.B()},
|
||||
|
||||
@@ -151,18 +151,18 @@ func PrepareQueries(f *filter.T) (
|
||||
}
|
||||
}
|
||||
// log.T.S("kinds", qs)
|
||||
default: // todo: this is appearing on queries with only since/until
|
||||
log.I.F("nothing in filter, returning latest events")
|
||||
// if len(qs) > 0 {
|
||||
qs = append(qs, query{index: 0, queryFilter: f, searchPrefix: []byte{1},
|
||||
start: []byte{1, 255, 255, 255, 255, 255, 255, 255, 255},
|
||||
// })
|
||||
// qs = append(qs, query{index: 0, queryFilter: f,
|
||||
// searchPrefix: prefixes.CreatedAt.Key(),
|
||||
skipTS: true})
|
||||
ext = nil
|
||||
// }
|
||||
// log.T.S("other", qs)
|
||||
// default: // todo: this is appearing on queries with only since/until
|
||||
// log.I.F("nothing in filter, returning latest events")
|
||||
// // if len(qs) > 0 {
|
||||
// qs = append(qs, query{index: 0, queryFilter: f, searchPrefix: []byte{1},
|
||||
// start: []byte{1, 255, 255, 255, 255, 255, 255, 255, 255},
|
||||
// // })
|
||||
// // qs = append(qs, query{index: 0, queryFilter: f,
|
||||
// // searchPrefix: prefixes.CreatedAt.Key(),
|
||||
// skipTS: true})
|
||||
// ext = nil
|
||||
// // }
|
||||
// // log.T.S("other", qs)
|
||||
}
|
||||
|
||||
// this is where we'll end the iteration
|
||||
|
||||
@@ -1,113 +1,98 @@
|
||||
package realy
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
|
||||
"realy.lol/context"
|
||||
"realy.lol/envelopes/authenvelope"
|
||||
"realy.lol/envelopes/closedenvelope"
|
||||
"realy.lol/envelopes/countenvelope"
|
||||
"realy.lol/kind"
|
||||
"realy.lol/normalize"
|
||||
"realy.lol/relay"
|
||||
"realy.lol/store"
|
||||
"realy.lol/tag"
|
||||
"realy.lol/web"
|
||||
)
|
||||
|
||||
func (s *Server) handleCount(c context.T, ws *web.Socket, req []byte, store store.I) (msg []byte) {
|
||||
counter, ok := store.(relay.EventCounter)
|
||||
if !ok {
|
||||
return normalize.Restricted.F("this relay does not support NIP-45")
|
||||
}
|
||||
var err error
|
||||
var rem []byte
|
||||
env := countenvelope.New()
|
||||
if rem, err = env.Unmarshal(req); chk.E(err) {
|
||||
return normalize.Error.F(err.Error())
|
||||
}
|
||||
if len(rem) > 0 {
|
||||
log.I.F("extra '%s'", rem)
|
||||
}
|
||||
if env.Subscription == nil || env.Subscription.String() == "" {
|
||||
return normalize.Error.F("COUNT has no <subscription id>")
|
||||
}
|
||||
allowed := env.Filters
|
||||
if accepter, ok := s.relay.(relay.ReqAcceptor); ok {
|
||||
var accepted, modified bool
|
||||
allowed, accepted, modified = accepter.AcceptReq(c, ws.Req(), env.Subscription.T, env.Filters,
|
||||
[]byte(ws.Authed()))
|
||||
if !accepted || allowed == nil || modified {
|
||||
var auther relay.Authenticator
|
||||
if auther, ok = s.relay.(relay.Authenticator); ok && auther.AuthEnabled() && !ws.AuthRequested() {
|
||||
ws.RequestAuth()
|
||||
if err = closedenvelope.NewFrom(env.Subscription,
|
||||
normalize.AuthRequired.F("auth required for count processing")).Write(ws); chk.E(err) {
|
||||
}
|
||||
log.I.F("requesting auth from client from %s", ws.RealRemote())
|
||||
if err = authenvelope.NewChallengeWith(ws.Challenge()).Write(ws); chk.E(err) {
|
||||
return
|
||||
}
|
||||
if !modified {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if allowed != env.Filters {
|
||||
defer func() {
|
||||
var auther relay.Authenticator
|
||||
var ok bool
|
||||
if auther, ok = s.relay.(relay.Authenticator); ok && auther.AuthEnabled() {
|
||||
// ws.RequestAuth()
|
||||
if err = closedenvelope.NewFrom(env.Subscription,
|
||||
normalize.AuthRequired.F("auth required for request processing")).Write(ws); chk.E(err) {
|
||||
}
|
||||
log.T.F("requesting auth from client from %s, challenge '%s'", ws.RealRemote(),
|
||||
ws.Challenge())
|
||||
if err = authenvelope.NewChallengeWith(ws.Challenge()).Write(ws); chk.E(err) {
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
}()
|
||||
}
|
||||
var total int
|
||||
var approx bool
|
||||
if allowed != nil {
|
||||
for _, f := range allowed.F {
|
||||
var auther relay.Authenticator
|
||||
if auther, ok = s.relay.(relay.Authenticator); ok && auther.AuthEnabled() {
|
||||
if f.Kinds.Contains(kind.EncryptedDirectMessage) || f.Kinds.Contains(kind.GiftWrap) {
|
||||
senders := f.Authors
|
||||
receivers := f.Tags.GetAll(tag.New("p"))
|
||||
switch {
|
||||
case len(ws.Authed()) == 0:
|
||||
return normalize.Restricted.F("this realy does not serve kind-4 to unauthenticated users," + " does your client implement NIP-42?")
|
||||
case senders.Len() == 1 && receivers.Len() < 2 && bytes.Equal(senders.F()[0],
|
||||
[]byte(ws.Authed())):
|
||||
case receivers.Len() == 1 && senders.Len() < 2 && bytes.Equal(receivers.N(0).Value(),
|
||||
[]byte(ws.Authed())):
|
||||
default:
|
||||
return normalize.Restricted.F("authenticated user does not have" + " authorization for requested filters")
|
||||
}
|
||||
}
|
||||
}
|
||||
var count int
|
||||
count, approx, err = counter.CountEvents(c, f)
|
||||
if err != nil {
|
||||
log.E.F("store: %v", err)
|
||||
continue
|
||||
}
|
||||
total += count
|
||||
}
|
||||
}
|
||||
var res *countenvelope.Response
|
||||
if res, err = countenvelope.NewResponseFrom(env.Subscription.T, total, approx); chk.E(err) {
|
||||
return
|
||||
}
|
||||
if err = res.Write(ws); chk.E(err) {
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
// func (s *Server) handleCount(c context.T, ws *web.Socket, req []byte, store store.I) (msg []byte) {
|
||||
// counter, ok := store.(relay.EventCounter)
|
||||
// if !ok {
|
||||
// return normalize.Restricted.F("this relay does not support NIP-45")
|
||||
// }
|
||||
// var err error
|
||||
// var rem []byte
|
||||
// env := countenvelope.New()
|
||||
// if rem, err = env.Unmarshal(req); chk.E(err) {
|
||||
// return normalize.Error.F(err.Error())
|
||||
// }
|
||||
// if len(rem) > 0 {
|
||||
// log.I.F("extra '%s'", rem)
|
||||
// }
|
||||
// if env.Subscription == nil || env.Subscription.String() == "" {
|
||||
// return normalize.Error.F("COUNT has no <subscription id>")
|
||||
// }
|
||||
// allowed := env.Filters
|
||||
// if accepter, ok := s.relay.(relay.ReqAcceptor); ok {
|
||||
// var accepted, modified bool
|
||||
// allowed, accepted, modified = accepter.AcceptReq(c, ws.Req(), env.Subscription.T, env.Filters,
|
||||
// []byte(ws.Authed()))
|
||||
// if !accepted || allowed == nil || modified {
|
||||
// var auther relay.Authenticator
|
||||
// if auther, ok = s.relay.(relay.Authenticator); ok && auther.AuthEnabled() && !ws.AuthRequested() {
|
||||
// ws.RequestAuth()
|
||||
// if err = closedenvelope.NewFrom(env.Subscription,
|
||||
// normalize.AuthRequired.F("auth required for count processing")).Write(ws); chk.E(err) {
|
||||
// }
|
||||
// log.I.F("requesting auth from client from %s", ws.RealRemote())
|
||||
// if err = authenvelope.NewChallengeWith(ws.Challenge()).Write(ws); chk.E(err) {
|
||||
// return
|
||||
// }
|
||||
// if !modified {
|
||||
// return
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// if allowed != env.Filters {
|
||||
// defer func() {
|
||||
// var auther relay.Authenticator
|
||||
// var ok bool
|
||||
// if auther, ok = s.relay.(relay.Authenticator); ok && auther.AuthEnabled() {
|
||||
// // ws.RequestAuth()
|
||||
// if err = closedenvelope.NewFrom(env.Subscription,
|
||||
// normalize.AuthRequired.F("auth required for request processing")).Write(ws); chk.E(err) {
|
||||
// }
|
||||
// log.T.F("requesting auth from client from %s, challenge '%s'", ws.RealRemote(),
|
||||
// ws.Challenge())
|
||||
// if err = authenvelope.NewChallengeWith(ws.Challenge()).Write(ws); chk.E(err) {
|
||||
// return
|
||||
// }
|
||||
// return
|
||||
// }
|
||||
// }()
|
||||
// }
|
||||
// var total int
|
||||
// var approx bool
|
||||
// if allowed != nil {
|
||||
// for _, f := range allowed.F {
|
||||
// var auther relay.Authenticator
|
||||
// if auther, ok = s.relay.(relay.Authenticator); ok && auther.AuthEnabled() {
|
||||
// if f.Kinds.Contains(kind.EncryptedDirectMessage) || f.Kinds.Contains(kind.GiftWrap) {
|
||||
// senders := f.Authors
|
||||
// receivers := f.Tags.GetAll(tag.New("p"))
|
||||
// switch {
|
||||
// case len(ws.Authed()) == 0:
|
||||
// return normalize.Restricted.F("this realy does not serve kind-4 to unauthenticated users," + " does your client implement NIP-42?")
|
||||
// case senders.Len() == 1 && receivers.Len() < 2 && bytes.Equal(senders.F()[0],
|
||||
// []byte(ws.Authed())):
|
||||
// case receivers.Len() == 1 && senders.Len() < 2 && bytes.Equal(receivers.N(0).Value(),
|
||||
// []byte(ws.Authed())):
|
||||
// default:
|
||||
// return normalize.Restricted.F("authenticated user does not have" + " authorization for requested filters")
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// var count int
|
||||
// count, approx, err = counter.CountEvents(c, f)
|
||||
// if err != nil {
|
||||
// log.E.F("store: %v", err)
|
||||
// continue
|
||||
// }
|
||||
// total += count
|
||||
// }
|
||||
// }
|
||||
// var res *countenvelope.Response
|
||||
// if res, err = countenvelope.NewResponseFrom(env.Subscription.T, total, approx); chk.E(err) {
|
||||
// return
|
||||
// }
|
||||
// if err = res.Write(ws); chk.E(err) {
|
||||
// return
|
||||
// }
|
||||
// return
|
||||
// }
|
||||
|
||||
@@ -11,7 +11,6 @@ import (
|
||||
"realy.lol/envelopes"
|
||||
"realy.lol/envelopes/authenvelope"
|
||||
"realy.lol/envelopes/closeenvelope"
|
||||
"realy.lol/envelopes/countenvelope"
|
||||
"realy.lol/envelopes/eventenvelope"
|
||||
"realy.lol/envelopes/noticeenvelope"
|
||||
"realy.lol/envelopes/reqenvelope"
|
||||
@@ -97,7 +96,8 @@ func (s *Server) handleWebsocket(w http.ResponseWriter, r *http.Request) {
|
||||
go s.pinger(ctx, ws, conn, ticker, cancel)
|
||||
}
|
||||
|
||||
func (s *Server) pinger(ctx context.T, ws *web.Socket, conn *websocket.Conn, ticker *time.Ticker, cancel context.F) {
|
||||
func (s *Server) pinger(ctx context.T, ws *web.Socket, conn *websocket.Conn,
|
||||
ticker *time.Ticker, cancel context.F) {
|
||||
defer func() {
|
||||
cancel()
|
||||
ticker.Stop()
|
||||
@@ -131,8 +131,8 @@ func (s *Server) handleMessage(c context.T, ws *web.Socket, msg []byte, sto stor
|
||||
switch t {
|
||||
case eventenvelope.L:
|
||||
notice = s.handleEvent(c, ws, rem, sto)
|
||||
case countenvelope.L:
|
||||
notice = s.handleCount(c, ws, rem, sto)
|
||||
// case countenvelope.L:
|
||||
// notice = s.handleCount(c, ws, rem, sto)
|
||||
case reqenvelope.L:
|
||||
notice = s.handleReq(c, ws, rem, sto)
|
||||
case closeenvelope.L:
|
||||
|
||||
@@ -61,7 +61,9 @@ func (x *Configuration) RegisterConfigurationSet(api huma.API) {
|
||||
if err = c.SetConfiguration(input.Body); chk.E(err) {
|
||||
return
|
||||
}
|
||||
x.ConfigurationMx.Lock()
|
||||
s.Configuration = input.Body
|
||||
x.ConfigurationMx.Unlock()
|
||||
}
|
||||
return
|
||||
})
|
||||
@@ -99,7 +101,9 @@ func (x *Configuration) RegisterConfigurationGet(api huma.API) {
|
||||
// if cfg, err = c.GetConfiguration(); chk.E(err) {
|
||||
// return
|
||||
// }
|
||||
x.ConfigurationMx.Lock()
|
||||
output = &ConfigurationOutput{Body: s.Configuration}
|
||||
x.ConfigurationMx.Unlock()
|
||||
// }
|
||||
return
|
||||
})
|
||||
|
||||
@@ -42,7 +42,8 @@ type Server struct {
|
||||
owners [][]byte
|
||||
Listeners *listeners.T
|
||||
huma.API
|
||||
Configuration *store.Configuration
|
||||
ConfigurationMx sync.Mutex
|
||||
Configuration *store.Configuration
|
||||
}
|
||||
|
||||
type ServerParams struct {
|
||||
@@ -103,8 +104,10 @@ func NewServer(sp *ServerParams, opts ...options.O) (s *Server, err error) {
|
||||
|
||||
// load configuration if it has been set
|
||||
if c, ok := s.relay.Storage().(store.Configurationer); ok {
|
||||
s.ConfigurationMx.Lock()
|
||||
if s.Configuration, err = c.GetConfiguration(); chk.E(err) {
|
||||
}
|
||||
s.ConfigurationMx.Unlock()
|
||||
}
|
||||
|
||||
go func() {
|
||||
|
||||
@@ -22,7 +22,6 @@ type I interface {
|
||||
Pather
|
||||
Nukener
|
||||
Querent
|
||||
Counter
|
||||
Deleter
|
||||
Saver
|
||||
Importer
|
||||
@@ -68,11 +67,11 @@ type GetIdsWriter interface {
|
||||
FetchIds(c context.T, evIds *tag.T, out io.Writer) (err error)
|
||||
}
|
||||
|
||||
type Counter interface {
|
||||
// CountEvents performs the same work as QueryEvents but instead of delivering
|
||||
// the events that were found it just returns the count of events
|
||||
CountEvents(c context.T, f *filter.T) (count int, approx bool, err error)
|
||||
}
|
||||
// type Counter interface {
|
||||
// // CountEvents performs the same work as QueryEvents but instead of delivering
|
||||
// // the events that were found it just returns the count of events
|
||||
// CountEvents(c context.T, f *filter.T) (count int, approx bool, err error)
|
||||
// }
|
||||
|
||||
type Deleter interface {
|
||||
// DeleteEvent is used to handle deletion events, as per NIP-09.
|
||||
|
||||
213
vendor/github.com/danielgtaylor/huma/v2/sse/sse.go
generated
vendored
Normal file
213
vendor/github.com/danielgtaylor/huma/v2/sse/sse.go
generated
vendored
Normal file
@@ -0,0 +1,213 @@
|
||||
// Package sse provides utilities for working with Server Sent Events (SSE).
|
||||
package sse
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
"reflect"
|
||||
"runtime/debug"
|
||||
"time"
|
||||
|
||||
"github.com/danielgtaylor/huma/v2"
|
||||
)
|
||||
|
||||
// WriteTimeout is the timeout for writing to the client.
|
||||
var WriteTimeout = 5 * time.Second
|
||||
|
||||
// deref follows pointers until it finds a non-pointer type.
|
||||
func deref(t reflect.Type) reflect.Type {
|
||||
for t.Kind() == reflect.Ptr {
|
||||
t = t.Elem()
|
||||
}
|
||||
return t
|
||||
}
|
||||
|
||||
type unwrapper interface {
|
||||
Unwrap() http.ResponseWriter
|
||||
}
|
||||
|
||||
type writeDeadliner interface {
|
||||
SetWriteDeadline(time.Time) error
|
||||
}
|
||||
|
||||
// Message is a single SSE message. There is no `event` field as this is
|
||||
// handled by the `eventTypeMap` when registering the operation.
|
||||
type Message struct {
|
||||
ID int
|
||||
Data any
|
||||
Retry int
|
||||
}
|
||||
|
||||
// Sender is a send function for sending SSE messages to the client. It is
|
||||
// callable but also provides a `sender.Data(...)` convenience method if
|
||||
// you don't need to set the other fields in the message.
|
||||
type Sender func(Message) error
|
||||
|
||||
// Data sends a message with the given data to the client. This is equivalent
|
||||
// to calling `sender(Message{Data: data})`.
|
||||
func (s Sender) Data(data any) error {
|
||||
return s(Message{Data: data})
|
||||
}
|
||||
|
||||
// Register a new SSE operation. The `eventTypeMap` maps from event name to
|
||||
// the type of the data that will be sent. The `f` function is called with
|
||||
// the context, input, and a `send` function that can be used to send messages
|
||||
// to the client. Flushing is handled automatically as long as the adapter's
|
||||
// `BodyWriter` implements `http.Flusher`.
|
||||
func Register[I any](api huma.API, op huma.Operation, eventTypeMap map[string]any, f func(ctx context.Context, input *I, send Sender)) {
|
||||
// Start by defining the SSE schema & operation response.
|
||||
if op.Responses == nil {
|
||||
op.Responses = map[string]*huma.Response{}
|
||||
}
|
||||
if op.Responses["200"] == nil {
|
||||
op.Responses["200"] = &huma.Response{}
|
||||
}
|
||||
if op.Responses["200"].Content == nil {
|
||||
op.Responses["200"].Content = map[string]*huma.MediaType{}
|
||||
}
|
||||
|
||||
typeToEvent := make(map[reflect.Type]string, len(eventTypeMap))
|
||||
dataSchemas := make([]*huma.Schema, 0, len(eventTypeMap))
|
||||
for k, v := range eventTypeMap {
|
||||
vt := deref(reflect.TypeOf(v))
|
||||
typeToEvent[vt] = k
|
||||
required := []string{"data"}
|
||||
if k != "" && k != "message" {
|
||||
required = append(required, "event")
|
||||
}
|
||||
s := &huma.Schema{
|
||||
Title: "Event " + k,
|
||||
Type: huma.TypeObject,
|
||||
Properties: map[string]*huma.Schema{
|
||||
"id": {
|
||||
Type: huma.TypeInteger,
|
||||
Description: "The event ID.",
|
||||
},
|
||||
"event": {
|
||||
Type: huma.TypeString,
|
||||
Description: "The event name.",
|
||||
Extensions: map[string]interface{}{
|
||||
"const": k,
|
||||
},
|
||||
},
|
||||
"data": api.OpenAPI().Components.Schemas.Schema(vt, true, k),
|
||||
"retry": {
|
||||
Type: huma.TypeInteger,
|
||||
Description: "The retry time in milliseconds.",
|
||||
},
|
||||
},
|
||||
Required: required,
|
||||
}
|
||||
|
||||
dataSchemas = append(dataSchemas, s)
|
||||
}
|
||||
|
||||
schema := &huma.Schema{
|
||||
Title: "Server Sent Events",
|
||||
Description: "Each oneOf object in the array represents one possible Server Sent Events (SSE) message, serialized as UTF-8 text according to the SSE specification.",
|
||||
Type: huma.TypeArray,
|
||||
Items: &huma.Schema{
|
||||
Extensions: map[string]interface{}{
|
||||
"oneOf": dataSchemas,
|
||||
},
|
||||
},
|
||||
}
|
||||
op.Responses["200"].Content["text/event-stream"] = &huma.MediaType{
|
||||
Schema: schema,
|
||||
}
|
||||
|
||||
// Register the operation with the API, using the built-in streaming
|
||||
// response callback functionality. This will call the user's `f` function
|
||||
// and provide a `send` function to simplify sending messages.
|
||||
huma.Register(api, op, func(ctx context.Context, input *I) (*huma.StreamResponse, error) {
|
||||
return &huma.StreamResponse{
|
||||
Body: func(ctx huma.Context) {
|
||||
ctx.SetHeader("Content-Type", "text/event-stream")
|
||||
bw := ctx.BodyWriter()
|
||||
encoder := json.NewEncoder(bw)
|
||||
|
||||
// Get the flusher/deadliner from the response writer if possible.
|
||||
var flusher http.Flusher
|
||||
flushCheck := bw
|
||||
for {
|
||||
if f, ok := flushCheck.(http.Flusher); ok {
|
||||
flusher = f
|
||||
break
|
||||
}
|
||||
if u, ok := flushCheck.(unwrapper); ok {
|
||||
flushCheck = u.Unwrap()
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
var deadliner writeDeadliner
|
||||
deadlineCheck := bw
|
||||
for {
|
||||
if d, ok := deadlineCheck.(writeDeadliner); ok {
|
||||
deadliner = d
|
||||
break
|
||||
}
|
||||
if u, ok := deadlineCheck.(unwrapper); ok {
|
||||
deadlineCheck = u.Unwrap()
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
send := func(msg Message) error {
|
||||
if deadliner != nil {
|
||||
if err := deadliner.SetWriteDeadline(time.Now().Add(WriteTimeout)); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "warning: unable to set write deadline: %v\n", err)
|
||||
}
|
||||
} else {
|
||||
fmt.Fprintln(os.Stderr, "write deadline not supported by underlying writer")
|
||||
}
|
||||
|
||||
// Write optional fields
|
||||
if msg.ID > 0 {
|
||||
bw.Write([]byte(fmt.Sprintf("id: %d\n", msg.ID)))
|
||||
}
|
||||
if msg.Retry > 0 {
|
||||
bw.Write([]byte(fmt.Sprintf("retry: %d\n", msg.Retry)))
|
||||
}
|
||||
|
||||
event, ok := typeToEvent[deref(reflect.TypeOf(msg.Data))]
|
||||
if !ok {
|
||||
fmt.Fprintf(os.Stderr, "error: unknown event type %v\n", reflect.TypeOf(msg.Data))
|
||||
debug.PrintStack()
|
||||
}
|
||||
if event != "" && event != "message" {
|
||||
// `message` is the default, so no need to transmit it.
|
||||
bw.Write([]byte("event: " + event + "\n"))
|
||||
}
|
||||
|
||||
// Write the message data.
|
||||
if _, err := bw.Write([]byte("data: ")); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := encoder.Encode(msg.Data); err != nil {
|
||||
bw.Write([]byte(`{"error": "encode error: `))
|
||||
bw.Write([]byte(err.Error()))
|
||||
bw.Write([]byte("\"}\n\n"))
|
||||
return err
|
||||
}
|
||||
bw.Write([]byte("\n"))
|
||||
if flusher != nil {
|
||||
flusher.Flush()
|
||||
} else {
|
||||
fmt.Fprintln(os.Stderr, "error: unable to flush")
|
||||
return fmt.Errorf("unable to flush: %w", http.ErrNotSupported)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Call the user-provided SSE handler.
|
||||
f(ctx.Context(), input, send)
|
||||
},
|
||||
}, nil
|
||||
})
|
||||
}
|
||||
Reference in New Issue
Block a user