117 lines
2.4 KiB
Go
117 lines
2.4 KiB
Go
package socketapi
|
|
|
|
import (
|
|
"errors"
|
|
|
|
"github.com/dgraph-io/badger/v4"
|
|
|
|
"realy.lol/chk"
|
|
"realy.lol/context"
|
|
"realy.lol/envelopes/eoseenvelope"
|
|
"realy.lol/envelopes/eventenvelope"
|
|
"realy.lol/envelopes/reqenvelope"
|
|
"realy.lol/event"
|
|
"realy.lol/log"
|
|
"realy.lol/publish"
|
|
"realy.lol/realy/interfaces"
|
|
"realy.lol/realy/pointers"
|
|
"realy.lol/reason"
|
|
)
|
|
|
|
func (a *A) HandleReq(c context.T, req []byte, srv interfaces.Server, remote string) (r []byte) {
|
|
|
|
sto := srv.Storage()
|
|
var err error
|
|
var rem []byte
|
|
env := reqenvelope.New()
|
|
if rem, err = env.Unmarshal(req); chk.E(err) {
|
|
return reason.Error.F(err.Error())
|
|
}
|
|
if len(rem) > 0 {
|
|
log.I.F("extra '%s'", rem)
|
|
}
|
|
allowed := env.Filters
|
|
var accepted, modified bool
|
|
allowed, accepted, modified = srv.AcceptReq(c, a.Listener.Req(), env.Subscription.T,
|
|
env.Filters, remote)
|
|
if !accepted || allowed == nil || modified {
|
|
}
|
|
var notice []byte
|
|
if allowed == nil {
|
|
return
|
|
}
|
|
for _, f := range allowed.F {
|
|
var i uint
|
|
if pointers.Present(f.Limit) {
|
|
if *f.Limit == 0 {
|
|
continue
|
|
}
|
|
i = *f.Limit
|
|
}
|
|
var events event.Ts
|
|
if events, err = sto.QueryEvents(c, f); err != nil {
|
|
log.E.F("eventstore: %v", err)
|
|
if errors.Is(err, badger.ErrDBClosed) {
|
|
return
|
|
}
|
|
continue
|
|
}
|
|
if len(notice) > 0 {
|
|
return notice
|
|
}
|
|
if len(events) == 0 {
|
|
continue
|
|
}
|
|
if err = a.WriteEvents(events, env, int(i)); chk.E(err) {
|
|
}
|
|
// write out the events to the socket
|
|
for _, ev := range events {
|
|
i--
|
|
if i < 0 {
|
|
break
|
|
}
|
|
var res *eventenvelope.Result
|
|
if res, err = eventenvelope.NewResultWith(env.Subscription.T,
|
|
ev); chk.E(err) {
|
|
return
|
|
}
|
|
if err = res.Write(a.Listener); chk.E(err) {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
if err = eoseenvelope.NewFrom(env.Subscription).Write(a.Listener); chk.E(err) {
|
|
return
|
|
}
|
|
if env.Filters != allowed {
|
|
return
|
|
}
|
|
receiver := make(event.C, 32)
|
|
publish.P.Receive(&W{
|
|
Listener: a.Listener,
|
|
Id: env.Subscription.String(),
|
|
Receiver: receiver,
|
|
Filters: env.Filters,
|
|
})
|
|
return
|
|
}
|
|
|
|
func (a *A) WriteEvents(events event.Ts, env *reqenvelope.T, i int) (err error) {
|
|
// write out the events to the socket
|
|
for _, ev := range events {
|
|
i--
|
|
if i < 0 {
|
|
break
|
|
}
|
|
var res *eventenvelope.Result
|
|
if res, err = eventenvelope.NewResultWith(env.Subscription.T,
|
|
ev); chk.E(err) {
|
|
return
|
|
}
|
|
if err = res.Write(a.Listener); chk.E(err) {
|
|
return
|
|
}
|
|
}
|
|
return
|
|
}
|