Add countenvelope encoder with marshal/unmarshal support, buffer pool integration, and tests; improve error messages for subscription validation
This commit is contained in:
206
pkg/encoders/envelopes/countenvelope/countenvelope.go
Normal file
206
pkg/encoders/envelopes/countenvelope/countenvelope.go
Normal file
@@ -0,0 +1,206 @@
|
|||||||
|
// Package countenvelope is an encoder for the COUNT request (client) and
|
||||||
|
// response (relay) message types.
|
||||||
|
package countenvelope
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"io"
|
||||||
|
|
||||||
|
"lol.mleku.dev/chk"
|
||||||
|
"lol.mleku.dev/errorf"
|
||||||
|
"next.orly.dev/pkg/encoders/envelopes"
|
||||||
|
"next.orly.dev/pkg/encoders/filter"
|
||||||
|
"next.orly.dev/pkg/encoders/ints"
|
||||||
|
"next.orly.dev/pkg/encoders/text"
|
||||||
|
"next.orly.dev/pkg/interfaces/codec"
|
||||||
|
)
|
||||||
|
|
||||||
|
// L is the label associated with this type of codec.Envelope.
|
||||||
|
const L = "COUNT"
|
||||||
|
|
||||||
|
// Request is a COUNT envelope sent by a client to request a count of results.
|
||||||
|
// This is a stupid idea because it costs as much processing as fetching the
|
||||||
|
// events, but doesn't provide the means to actually get them (the HTTP API
|
||||||
|
// /filter does this by returning the actual event Ids).
|
||||||
|
type Request struct {
|
||||||
|
Subscription []byte
|
||||||
|
Filters filter.S
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ codec.Envelope = (*Request)(nil)
|
||||||
|
|
||||||
|
// New creates a new Request with a standard style subscription.Id and empty filter.
|
||||||
|
func New() *Request { return new(Request) }
|
||||||
|
|
||||||
|
// NewRequest creates a new Request with a provided subscription.Id and
|
||||||
|
// filter.T.
|
||||||
|
func NewRequest(id []byte, filters filter.S) *Request {
|
||||||
|
return &Request{
|
||||||
|
Subscription: id,
|
||||||
|
Filters: filters,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Label returns the label of a CLOSED envelope.
|
||||||
|
func (en *Request) Label() string { return L }
|
||||||
|
|
||||||
|
// Write the Request to a provided io.Writer.
|
||||||
|
func (en *Request) Write(w io.Writer) (err error) {
|
||||||
|
var b []byte
|
||||||
|
b = en.Marshal(b)
|
||||||
|
_, err = w.Write(b)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Marshal a Request appended to the provided destination slice as minified
|
||||||
|
// JSON.
|
||||||
|
func (en *Request) Marshal(dst []byte) (b []byte) {
|
||||||
|
var err error
|
||||||
|
b = dst
|
||||||
|
b = envelopes.Marshal(
|
||||||
|
b, L,
|
||||||
|
func(bst []byte) (o []byte) {
|
||||||
|
o = bst
|
||||||
|
o = append(o, '"')
|
||||||
|
o = append(o, en.Subscription...)
|
||||||
|
o = append(o, '"')
|
||||||
|
o = append(o, ',')
|
||||||
|
for _, f := range en.Filters {
|
||||||
|
o = append(o, ',')
|
||||||
|
o = f.Marshal(o)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
},
|
||||||
|
)
|
||||||
|
_ = err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unmarshal a Request from minified JSON, returning the remainder after the end
|
||||||
|
// of the envelope.
|
||||||
|
func (en *Request) Unmarshal(b []byte) (r []byte, err error) {
|
||||||
|
r = b
|
||||||
|
if en.Subscription, r, err = text.UnmarshalQuoted(r); chk.E(err) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if r, err = en.Filters.Unmarshal(r); chk.E(err) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if r, err = envelopes.SkipToTheEnd(r); chk.E(err) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// ParseRequest reads a Request in minified JSON into a newly allocated Request.
|
||||||
|
func ParseRequest(b []byte) (t *Request, rem []byte, err error) {
|
||||||
|
t = New()
|
||||||
|
if rem, err = t.Unmarshal(b); chk.E(err) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Response is a COUNT Response returning a count and approximate flag
|
||||||
|
// associated with the REQ subscription.Id.
|
||||||
|
type Response struct {
|
||||||
|
Subscription []byte
|
||||||
|
Count int
|
||||||
|
Approximate bool
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ codec.Envelope = (*Response)(nil)
|
||||||
|
|
||||||
|
// NewResponse creates a new empty countenvelope.Response with a standard formatted
|
||||||
|
// subscription.Id.
|
||||||
|
func NewResponse() *Response { return new(Response) }
|
||||||
|
|
||||||
|
// NewResponseFrom creates a new countenvelope.Response with provided string for the
|
||||||
|
// subscription.Id, a count and optional variadic approximate flag, which is
|
||||||
|
// otherwise false and does not get rendered into the JSON.
|
||||||
|
func NewResponseFrom[V string | []byte](
|
||||||
|
s V, cnt int,
|
||||||
|
approx ...bool,
|
||||||
|
) (res *Response, err error) {
|
||||||
|
var a bool
|
||||||
|
if len(approx) > 0 {
|
||||||
|
a = approx[0]
|
||||||
|
}
|
||||||
|
if len(s) < 0 || len(s) > 64 {
|
||||||
|
err = errorf.E("subscription id must be length > 0 and <= 64")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return &Response{[]byte(s), cnt, a}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Label returns the COUNT label associated with a Response.
|
||||||
|
func (en *Response) Label() string { return L }
|
||||||
|
|
||||||
|
// Write a Response to a provided io.Writer as minified JSON.
|
||||||
|
func (en *Response) Write(w io.Writer) (err error) {
|
||||||
|
_, err = w.Write(en.Marshal(nil))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Marshal a countenvelope.Response envelope in minified JSON, appending to a
|
||||||
|
// provided destination slice.
|
||||||
|
func (en *Response) Marshal(dst []byte) (b []byte) {
|
||||||
|
b = dst
|
||||||
|
b = envelopes.Marshal(
|
||||||
|
b, L,
|
||||||
|
func(bst []byte) (o []byte) {
|
||||||
|
o = bst
|
||||||
|
o = append(o, '"')
|
||||||
|
o = append(o, en.Subscription...)
|
||||||
|
o = append(o, '"')
|
||||||
|
o = append(o, ',')
|
||||||
|
c := ints.New(en.Count)
|
||||||
|
o = c.Marshal(o)
|
||||||
|
if en.Approximate {
|
||||||
|
o = append(o, ',')
|
||||||
|
o = append(o, "true"...)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
},
|
||||||
|
)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unmarshal a COUNT Response from minified JSON, returning the remainder after
|
||||||
|
// the end of the envelope.
|
||||||
|
func (en *Response) Unmarshal(b []byte) (r []byte, err error) {
|
||||||
|
r = b
|
||||||
|
if en.Subscription, r, err = text.UnmarshalQuoted(r); chk.E(err) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if r, err = text.Comma(r); chk.E(err) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
i := ints.New(0)
|
||||||
|
if r, err = i.Unmarshal(r); chk.E(err) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
en.Count = int(i.N)
|
||||||
|
if len(r) > 0 {
|
||||||
|
if r[0] == ',' {
|
||||||
|
r = r[1:]
|
||||||
|
if bytes.HasPrefix(r, []byte("true")) {
|
||||||
|
en.Approximate = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if r, err = envelopes.SkipToTheEnd(r); chk.E(err) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse reads a Count Response in minified JSON into a newly allocated
|
||||||
|
// countenvelope.Response.
|
||||||
|
func Parse(b []byte) (t *Response, rem []byte, err error) {
|
||||||
|
t = NewResponse()
|
||||||
|
if rem, err = t.Unmarshal(b); chk.E(err) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
135
pkg/encoders/envelopes/countenvelope/countenvelope_test.go
Normal file
135
pkg/encoders/envelopes/countenvelope/countenvelope_test.go
Normal file
@@ -0,0 +1,135 @@
|
|||||||
|
package countenvelope
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"lol.mleku.dev/chk"
|
||||||
|
"lukechampine.com/frand"
|
||||||
|
"next.orly.dev/pkg/encoders/envelopes"
|
||||||
|
"next.orly.dev/pkg/encoders/filter"
|
||||||
|
"next.orly.dev/pkg/utils"
|
||||||
|
"next.orly.dev/pkg/utils/bufpool"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestRequest(t *testing.T) {
|
||||||
|
var err error
|
||||||
|
for i := range 1000 {
|
||||||
|
rb, rb1, rb2 := bufpool.Get(), bufpool.Get(), bufpool.Get()
|
||||||
|
var f filter.S
|
||||||
|
if f, err = filter.GenFilters(); chk.E(err) {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
s := utils.NewSubscription(i)
|
||||||
|
req := NewRequest(s, f)
|
||||||
|
rb = req.Marshal(rb)
|
||||||
|
rb1 = append(rb1, rb...)
|
||||||
|
var rem []byte
|
||||||
|
var l string
|
||||||
|
if l, rb, err = envelopes.Identify(rb); chk.E(err) {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if l != L {
|
||||||
|
t.Fatalf("invalid sentinel %s, expect %s", l, L)
|
||||||
|
}
|
||||||
|
req2 := New()
|
||||||
|
if rem, err = req2.Unmarshal(rb); chk.E(err) {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if len(rem) > 0 {
|
||||||
|
t.Fatalf(
|
||||||
|
"unmarshal failed, remainder\n%d %s",
|
||||||
|
len(rem), rem,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
rb2 = req2.Marshal(rb2)
|
||||||
|
if !utils.FastEqual(rb1, rb2) {
|
||||||
|
if len(rb1) != len(rb2) {
|
||||||
|
t.Fatalf(
|
||||||
|
"unmarshal failed, different lengths\n%d %s\n%d %s\n",
|
||||||
|
len(rb1), rb1, len(rb2), rb2,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
for i := range rb1 {
|
||||||
|
if rb1[i] != rb2[i] {
|
||||||
|
t.Fatalf(
|
||||||
|
"unmarshal failed, difference at position %d\n%d %s\n%s\n%d %s\n%s\n",
|
||||||
|
i, len(rb1), rb1[:i], rb1[i:], len(rb2), rb2[:i],
|
||||||
|
rb2[i:],
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
t.Fatalf(
|
||||||
|
"unmarshal failed\n%d %s\n%d %s\n",
|
||||||
|
len(rb1), rb1, len(rb2), rb2,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
bufpool.Put(rb1)
|
||||||
|
bufpool.Put(rb2)
|
||||||
|
bufpool.Put(rb)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestResponse(t *testing.T) {
|
||||||
|
var err error
|
||||||
|
for i := range 1000 {
|
||||||
|
rb, rb1, rb2 := bufpool.Get(), bufpool.Get(), bufpool.Get()
|
||||||
|
s := utils.NewSubscription(i)
|
||||||
|
var res *Response
|
||||||
|
if i&2 == 0 {
|
||||||
|
if res, err = NewResponseFrom(
|
||||||
|
s, frand.Intn(200), true,
|
||||||
|
); chk.E(err) {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if res, err = NewResponseFrom(s, frand.Intn(200)); chk.E(err) {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
rb = res.Marshal(rb)
|
||||||
|
rb1 = append(rb1, rb...)
|
||||||
|
var rem []byte
|
||||||
|
var l string
|
||||||
|
if l, rb, err = envelopes.Identify(rb); chk.E(err) {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if l != L {
|
||||||
|
t.Fatalf("invalid sentinel %s, expect %s", l, L)
|
||||||
|
}
|
||||||
|
res2 := NewResponse()
|
||||||
|
if rem, err = res2.Unmarshal(rb); chk.E(err) {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if len(rem) > 0 {
|
||||||
|
t.Fatalf(
|
||||||
|
"unmarshal failed, remainder\n%d %s",
|
||||||
|
len(rem), rem,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
rb2 = res2.Marshal(rb2)
|
||||||
|
if !utils.FastEqual(rb1, rb2) {
|
||||||
|
if len(rb1) != len(rb2) {
|
||||||
|
t.Fatalf(
|
||||||
|
"unmarshal failed, different lengths\n%d %s\n%d %s\n",
|
||||||
|
len(rb1), rb1, len(rb2), rb2,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
for i := range rb1 {
|
||||||
|
if rb1[i] != rb2[i] {
|
||||||
|
t.Fatalf(
|
||||||
|
"unmarshal failed, difference at position %d\n%d %s\n%s\n%d %s\n%s\n",
|
||||||
|
i, len(rb1), rb1[:i], rb1[i:], len(rb2), rb2[:i],
|
||||||
|
rb2[i:],
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
t.Fatalf(
|
||||||
|
"unmarshal failed\n%d %s\n%d %s\n",
|
||||||
|
len(rb1), rb1, len(rb2), rb2,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
bufpool.Put(rb1)
|
||||||
|
bufpool.Put(rb2)
|
||||||
|
bufpool.Put(rb)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -41,7 +41,7 @@ func NewFrom[V string | []byte](eid V, ok bool, msg ...V) *T {
|
|||||||
}
|
}
|
||||||
if len(eid) != sha256.Size {
|
if len(eid) != sha256.Size {
|
||||||
log.W.F(
|
log.W.F(
|
||||||
"event ID unexpected length, expect %d got %d",
|
"event Subscription unexpected length, expect %d got %d",
|
||||||
len(eid), sha256.Size,
|
len(eid), sha256.Size,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
@@ -98,7 +98,7 @@ func (en *T) Unmarshal(b []byte) (r []byte, err error) {
|
|||||||
}
|
}
|
||||||
if len(idBytes) != sha256.Size {
|
if len(idBytes) != sha256.Size {
|
||||||
err = errorf.E(
|
err = errorf.E(
|
||||||
"invalid size for ID, require %d got %d",
|
"invalid size for Subscription, require %d got %d",
|
||||||
sha256.Size, len(idBytes),
|
sha256.Size, len(idBytes),
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
|
|||||||
@@ -87,7 +87,6 @@ func (en *T) Unmarshal(b []byte) (r []byte, err error) {
|
|||||||
if en.Subscription, r, err = text.UnmarshalQuoted(r); chk.E(err) {
|
if en.Subscription, r, err = text.UnmarshalQuoted(r); chk.E(err) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if r, err = text.Comma(r); chk.E(err) {
|
if r, err = text.Comma(r); chk.E(err) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,14 +7,13 @@ import (
|
|||||||
"next.orly.dev/pkg/encoders/envelopes"
|
"next.orly.dev/pkg/encoders/envelopes"
|
||||||
"next.orly.dev/pkg/encoders/filter"
|
"next.orly.dev/pkg/encoders/filter"
|
||||||
"next.orly.dev/pkg/utils"
|
"next.orly.dev/pkg/utils"
|
||||||
|
"next.orly.dev/pkg/utils/bufpool"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestMarshalUnmarshal(t *testing.T) {
|
func TestMarshalUnmarshal(t *testing.T) {
|
||||||
var err error
|
var err error
|
||||||
rb, rb1, rb2 := make([]byte, 0, 65535), make([]byte, 0, 65535), make(
|
|
||||||
[]byte, 0, 65535,
|
|
||||||
)
|
|
||||||
for i := range 1000 {
|
for i := range 1000 {
|
||||||
|
rb, rb1, rb2 := bufpool.Get(), bufpool.Get(), bufpool.Get()
|
||||||
var f filter.S
|
var f filter.S
|
||||||
if f, err = filter.GenFilters(); chk.E(err) {
|
if f, err = filter.GenFilters(); chk.E(err) {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@@ -22,8 +21,7 @@ func TestMarshalUnmarshal(t *testing.T) {
|
|||||||
s := utils.NewSubscription(i)
|
s := utils.NewSubscription(i)
|
||||||
req := NewFrom(s, f)
|
req := NewFrom(s, f)
|
||||||
rb = req.Marshal(rb)
|
rb = req.Marshal(rb)
|
||||||
rb1 = rb1[:len(rb)]
|
rb1 = append(rb1, rb...)
|
||||||
copy(rb1, rb)
|
|
||||||
var rem []byte
|
var rem []byte
|
||||||
var l string
|
var l string
|
||||||
if l, rb, err = envelopes.Identify(rb); chk.E(err) {
|
if l, rb, err = envelopes.Identify(rb); chk.E(err) {
|
||||||
@@ -64,6 +62,8 @@ func TestMarshalUnmarshal(t *testing.T) {
|
|||||||
len(rb1), rb1, len(rb2), rb2,
|
len(rb1), rb1, len(rb2), rb2,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
rb, rb1, rb2 = rb[:0], rb1[:0], rb2[:0]
|
bufpool.Put(rb1)
|
||||||
|
bufpool.Put(rb2)
|
||||||
|
bufpool.Put(rb)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -297,7 +297,7 @@ InVal:
|
|||||||
}
|
}
|
||||||
if len(id) != sha256.Size {
|
if len(id) != sha256.Size {
|
||||||
err = errorf.E(
|
err = errorf.E(
|
||||||
"invalid ID, require %d got %d", sha256.Size,
|
"invalid Subscription, require %d got %d", sha256.Size,
|
||||||
len(id),
|
len(id),
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
|
|||||||
@@ -34,13 +34,13 @@ func (ev *E) Verify() (valid bool, err error) {
|
|||||||
// check that this isn't because of a bogus ID
|
// check that this isn't because of a bogus ID
|
||||||
id := ev.GetIDBytes()
|
id := ev.GetIDBytes()
|
||||||
if !utils.FastEqual(id, ev.ID) {
|
if !utils.FastEqual(id, ev.ID) {
|
||||||
log.E.Ln("event ID incorrect")
|
log.E.Ln("event Subscription incorrect")
|
||||||
ev.ID = id
|
ev.ID = id
|
||||||
err = nil
|
err = nil
|
||||||
if valid, err = keys.Verify(ev.ID, ev.Sig); chk.E(err) {
|
if valid, err = keys.Verify(ev.ID, ev.Sig); chk.E(err) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
err = errorf.W("event ID incorrect but signature is valid on correct ID")
|
err = errorf.W("event Subscription incorrect but signature is valid on correct Subscription")
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -237,7 +237,7 @@ const (
|
|||||||
//
|
//
|
||||||
// todo: this may tolerate whitespace, not certain currently.
|
// todo: this may tolerate whitespace, not certain currently.
|
||||||
func (f *F) Unmarshal(b []byte) (r []byte, err error) {
|
func (f *F) Unmarshal(b []byte) (r []byte, err error) {
|
||||||
r = b[:]
|
r = b
|
||||||
var key []byte
|
var key []byte
|
||||||
var state int
|
var state int
|
||||||
for ; len(r) > 0; r = r[1:] {
|
for ; len(r) > 0; r = r[1:] {
|
||||||
|
|||||||
Reference in New Issue
Block a user