diff --git a/pkg/encoders/envelopes/countenvelope/countenvelope.go b/pkg/encoders/envelopes/countenvelope/countenvelope.go new file mode 100644 index 0000000..29fbee5 --- /dev/null +++ b/pkg/encoders/envelopes/countenvelope/countenvelope.go @@ -0,0 +1,206 @@ +// Package countenvelope is an encoder for the COUNT request (client) and +// response (relay) message types. +package countenvelope + +import ( + "bytes" + "io" + + "lol.mleku.dev/chk" + "lol.mleku.dev/errorf" + "next.orly.dev/pkg/encoders/envelopes" + "next.orly.dev/pkg/encoders/filter" + "next.orly.dev/pkg/encoders/ints" + "next.orly.dev/pkg/encoders/text" + "next.orly.dev/pkg/interfaces/codec" +) + +// L is the label associated with this type of codec.Envelope. +const L = "COUNT" + +// Request is a COUNT envelope sent by a client to request a count of results. +// This is a stupid idea because it costs as much processing as fetching the +// events, but doesn't provide the means to actually get them (the HTTP API +// /filter does this by returning the actual event Ids). +type Request struct { + Subscription []byte + Filters filter.S +} + +var _ codec.Envelope = (*Request)(nil) + +// New creates a new Request with a standard style subscription.Id and empty filter. +func New() *Request { return new(Request) } + +// NewRequest creates a new Request with a provided subscription.Id and +// filter.T. +func NewRequest(id []byte, filters filter.S) *Request { + return &Request{ + Subscription: id, + Filters: filters, + } +} + +// Label returns the label of a CLOSED envelope. +func (en *Request) Label() string { return L } + +// Write the Request to a provided io.Writer. +func (en *Request) Write(w io.Writer) (err error) { + var b []byte + b = en.Marshal(b) + _, err = w.Write(b) + return +} + +// Marshal a Request appended to the provided destination slice as minified +// JSON. +func (en *Request) Marshal(dst []byte) (b []byte) { + var err error + b = dst + b = envelopes.Marshal( + b, L, + func(bst []byte) (o []byte) { + o = bst + o = append(o, '"') + o = append(o, en.Subscription...) + o = append(o, '"') + o = append(o, ',') + for _, f := range en.Filters { + o = append(o, ',') + o = f.Marshal(o) + } + return + }, + ) + _ = err + return +} + +// Unmarshal a Request from minified JSON, returning the remainder after the end +// of the envelope. +func (en *Request) Unmarshal(b []byte) (r []byte, err error) { + r = b + if en.Subscription, r, err = text.UnmarshalQuoted(r); chk.E(err) { + return + } + if r, err = en.Filters.Unmarshal(r); chk.E(err) { + return + } + if r, err = envelopes.SkipToTheEnd(r); chk.E(err) { + return + } + return +} + +// ParseRequest reads a Request in minified JSON into a newly allocated Request. +func ParseRequest(b []byte) (t *Request, rem []byte, err error) { + t = New() + if rem, err = t.Unmarshal(b); chk.E(err) { + return + } + return +} + +// Response is a COUNT Response returning a count and approximate flag +// associated with the REQ subscription.Id. +type Response struct { + Subscription []byte + Count int + Approximate bool +} + +var _ codec.Envelope = (*Response)(nil) + +// NewResponse creates a new empty countenvelope.Response with a standard formatted +// subscription.Id. +func NewResponse() *Response { return new(Response) } + +// NewResponseFrom creates a new countenvelope.Response with provided string for the +// subscription.Id, a count and optional variadic approximate flag, which is +// otherwise false and does not get rendered into the JSON. +func NewResponseFrom[V string | []byte]( + s V, cnt int, + approx ...bool, +) (res *Response, err error) { + var a bool + if len(approx) > 0 { + a = approx[0] + } + if len(s) < 0 || len(s) > 64 { + err = errorf.E("subscription id must be length > 0 and <= 64") + return + } + return &Response{[]byte(s), cnt, a}, nil +} + +// Label returns the COUNT label associated with a Response. +func (en *Response) Label() string { return L } + +// Write a Response to a provided io.Writer as minified JSON. +func (en *Response) Write(w io.Writer) (err error) { + _, err = w.Write(en.Marshal(nil)) + return +} + +// Marshal a countenvelope.Response envelope in minified JSON, appending to a +// provided destination slice. +func (en *Response) Marshal(dst []byte) (b []byte) { + b = dst + b = envelopes.Marshal( + b, L, + func(bst []byte) (o []byte) { + o = bst + o = append(o, '"') + o = append(o, en.Subscription...) + o = append(o, '"') + o = append(o, ',') + c := ints.New(en.Count) + o = c.Marshal(o) + if en.Approximate { + o = append(o, ',') + o = append(o, "true"...) + } + return + }, + ) + return +} + +// Unmarshal a COUNT Response from minified JSON, returning the remainder after +// the end of the envelope. +func (en *Response) Unmarshal(b []byte) (r []byte, err error) { + r = b + if en.Subscription, r, err = text.UnmarshalQuoted(r); chk.E(err) { + return + } + if r, err = text.Comma(r); chk.E(err) { + return + } + i := ints.New(0) + if r, err = i.Unmarshal(r); chk.E(err) { + return + } + en.Count = int(i.N) + if len(r) > 0 { + if r[0] == ',' { + r = r[1:] + if bytes.HasPrefix(r, []byte("true")) { + en.Approximate = true + } + } + } + if r, err = envelopes.SkipToTheEnd(r); chk.E(err) { + return + } + return +} + +// Parse reads a Count Response in minified JSON into a newly allocated +// countenvelope.Response. +func Parse(b []byte) (t *Response, rem []byte, err error) { + t = NewResponse() + if rem, err = t.Unmarshal(b); chk.E(err) { + return + } + return +} diff --git a/pkg/encoders/envelopes/countenvelope/countenvelope_test.go b/pkg/encoders/envelopes/countenvelope/countenvelope_test.go new file mode 100644 index 0000000..ae7b991 --- /dev/null +++ b/pkg/encoders/envelopes/countenvelope/countenvelope_test.go @@ -0,0 +1,135 @@ +package countenvelope + +import ( + "testing" + + "lol.mleku.dev/chk" + "lukechampine.com/frand" + "next.orly.dev/pkg/encoders/envelopes" + "next.orly.dev/pkg/encoders/filter" + "next.orly.dev/pkg/utils" + "next.orly.dev/pkg/utils/bufpool" +) + +func TestRequest(t *testing.T) { + var err error + for i := range 1000 { + rb, rb1, rb2 := bufpool.Get(), bufpool.Get(), bufpool.Get() + var f filter.S + if f, err = filter.GenFilters(); chk.E(err) { + t.Fatal(err) + } + s := utils.NewSubscription(i) + req := NewRequest(s, f) + rb = req.Marshal(rb) + rb1 = append(rb1, rb...) + var rem []byte + var l string + if l, rb, err = envelopes.Identify(rb); chk.E(err) { + t.Fatal(err) + } + if l != L { + t.Fatalf("invalid sentinel %s, expect %s", l, L) + } + req2 := New() + if rem, err = req2.Unmarshal(rb); chk.E(err) { + t.Fatal(err) + } + if len(rem) > 0 { + t.Fatalf( + "unmarshal failed, remainder\n%d %s", + len(rem), rem, + ) + } + rb2 = req2.Marshal(rb2) + if !utils.FastEqual(rb1, rb2) { + if len(rb1) != len(rb2) { + t.Fatalf( + "unmarshal failed, different lengths\n%d %s\n%d %s\n", + len(rb1), rb1, len(rb2), rb2, + ) + } + for i := range rb1 { + if rb1[i] != rb2[i] { + t.Fatalf( + "unmarshal failed, difference at position %d\n%d %s\n%s\n%d %s\n%s\n", + i, len(rb1), rb1[:i], rb1[i:], len(rb2), rb2[:i], + rb2[i:], + ) + } + } + t.Fatalf( + "unmarshal failed\n%d %s\n%d %s\n", + len(rb1), rb1, len(rb2), rb2, + ) + } + bufpool.Put(rb1) + bufpool.Put(rb2) + bufpool.Put(rb) + } +} + +func TestResponse(t *testing.T) { + var err error + for i := range 1000 { + rb, rb1, rb2 := bufpool.Get(), bufpool.Get(), bufpool.Get() + s := utils.NewSubscription(i) + var res *Response + if i&2 == 0 { + if res, err = NewResponseFrom( + s, frand.Intn(200), true, + ); chk.E(err) { + t.Fatal(err) + } + } else { + if res, err = NewResponseFrom(s, frand.Intn(200)); chk.E(err) { + t.Fatal(err) + } + } + rb = res.Marshal(rb) + rb1 = append(rb1, rb...) + var rem []byte + var l string + if l, rb, err = envelopes.Identify(rb); chk.E(err) { + t.Fatal(err) + } + if l != L { + t.Fatalf("invalid sentinel %s, expect %s", l, L) + } + res2 := NewResponse() + if rem, err = res2.Unmarshal(rb); chk.E(err) { + t.Fatal(err) + } + if len(rem) > 0 { + t.Fatalf( + "unmarshal failed, remainder\n%d %s", + len(rem), rem, + ) + } + rb2 = res2.Marshal(rb2) + if !utils.FastEqual(rb1, rb2) { + if len(rb1) != len(rb2) { + t.Fatalf( + "unmarshal failed, different lengths\n%d %s\n%d %s\n", + len(rb1), rb1, len(rb2), rb2, + ) + } + for i := range rb1 { + if rb1[i] != rb2[i] { + t.Fatalf( + "unmarshal failed, difference at position %d\n%d %s\n%s\n%d %s\n%s\n", + i, len(rb1), rb1[:i], rb1[i:], len(rb2), rb2[:i], + rb2[i:], + ) + } + } + t.Fatalf( + "unmarshal failed\n%d %s\n%d %s\n", + len(rb1), rb1, len(rb2), rb2, + ) + } + bufpool.Put(rb1) + bufpool.Put(rb2) + bufpool.Put(rb) + } +} diff --git a/pkg/encoders/envelopes/okenvelope/okenvelope.go b/pkg/encoders/envelopes/okenvelope/okenvelope.go index d4f4a23..8f6fc86 100644 --- a/pkg/encoders/envelopes/okenvelope/okenvelope.go +++ b/pkg/encoders/envelopes/okenvelope/okenvelope.go @@ -41,7 +41,7 @@ func NewFrom[V string | []byte](eid V, ok bool, msg ...V) *T { } if len(eid) != sha256.Size { log.W.F( - "event ID unexpected length, expect %d got %d", + "event Subscription unexpected length, expect %d got %d", len(eid), sha256.Size, ) } @@ -98,7 +98,7 @@ func (en *T) Unmarshal(b []byte) (r []byte, err error) { } if len(idBytes) != sha256.Size { err = errorf.E( - "invalid size for ID, require %d got %d", + "invalid size for Subscription, require %d got %d", sha256.Size, len(idBytes), ) return diff --git a/pkg/encoders/envelopes/reqenvelope/reqenvelope.go b/pkg/encoders/envelopes/reqenvelope/reqenvelope.go index f40ab18..302b462 100644 --- a/pkg/encoders/envelopes/reqenvelope/reqenvelope.go +++ b/pkg/encoders/envelopes/reqenvelope/reqenvelope.go @@ -87,7 +87,6 @@ func (en *T) Unmarshal(b []byte) (r []byte, err error) { if en.Subscription, r, err = text.UnmarshalQuoted(r); chk.E(err) { return } - if r, err = text.Comma(r); chk.E(err) { return } diff --git a/pkg/encoders/envelopes/reqenvelope/reqenvelope_test.go b/pkg/encoders/envelopes/reqenvelope/reqenvelope_test.go index 5081a91..223cdcc 100644 --- a/pkg/encoders/envelopes/reqenvelope/reqenvelope_test.go +++ b/pkg/encoders/envelopes/reqenvelope/reqenvelope_test.go @@ -7,14 +7,13 @@ import ( "next.orly.dev/pkg/encoders/envelopes" "next.orly.dev/pkg/encoders/filter" "next.orly.dev/pkg/utils" + "next.orly.dev/pkg/utils/bufpool" ) func TestMarshalUnmarshal(t *testing.T) { var err error - rb, rb1, rb2 := make([]byte, 0, 65535), make([]byte, 0, 65535), make( - []byte, 0, 65535, - ) for i := range 1000 { + rb, rb1, rb2 := bufpool.Get(), bufpool.Get(), bufpool.Get() var f filter.S if f, err = filter.GenFilters(); chk.E(err) { t.Fatal(err) @@ -22,8 +21,7 @@ func TestMarshalUnmarshal(t *testing.T) { s := utils.NewSubscription(i) req := NewFrom(s, f) rb = req.Marshal(rb) - rb1 = rb1[:len(rb)] - copy(rb1, rb) + rb1 = append(rb1, rb...) var rem []byte var l string if l, rb, err = envelopes.Identify(rb); chk.E(err) { @@ -64,6 +62,8 @@ func TestMarshalUnmarshal(t *testing.T) { len(rb1), rb1, len(rb2), rb2, ) } - rb, rb1, rb2 = rb[:0], rb1[:0], rb2[:0] + bufpool.Put(rb1) + bufpool.Put(rb2) + bufpool.Put(rb) } } diff --git a/pkg/encoders/event/event.go b/pkg/encoders/event/event.go index 0345dba..58b0af8 100644 --- a/pkg/encoders/event/event.go +++ b/pkg/encoders/event/event.go @@ -297,7 +297,7 @@ InVal: } if len(id) != sha256.Size { err = errorf.E( - "invalid ID, require %d got %d", sha256.Size, + "invalid Subscription, require %d got %d", sha256.Size, len(id), ) return diff --git a/pkg/encoders/event/signatures.go b/pkg/encoders/event/signatures.go index 96d2a2c..9fb5fc3 100644 --- a/pkg/encoders/event/signatures.go +++ b/pkg/encoders/event/signatures.go @@ -34,13 +34,13 @@ func (ev *E) Verify() (valid bool, err error) { // check that this isn't because of a bogus ID id := ev.GetIDBytes() if !utils.FastEqual(id, ev.ID) { - log.E.Ln("event ID incorrect") + log.E.Ln("event Subscription incorrect") ev.ID = id err = nil if valid, err = keys.Verify(ev.ID, ev.Sig); chk.E(err) { return } - err = errorf.W("event ID incorrect but signature is valid on correct ID") + err = errorf.W("event Subscription incorrect but signature is valid on correct Subscription") } return } diff --git a/pkg/encoders/filter/filter.go b/pkg/encoders/filter/filter.go index e2050ba..2203705 100644 --- a/pkg/encoders/filter/filter.go +++ b/pkg/encoders/filter/filter.go @@ -237,7 +237,7 @@ const ( // // todo: this may tolerate whitespace, not certain currently. func (f *F) Unmarshal(b []byte) (r []byte, err error) { - r = b[:] + r = b var key []byte var state int for ; len(r) > 0; r = r[1:] {