diff --git a/pkg/encoders/envelopes/eoseenvelope/eoseenvelope.go b/pkg/encoders/envelopes/eoseenvelope/eoseenvelope.go new file mode 100644 index 0000000..bb348da --- /dev/null +++ b/pkg/encoders/envelopes/eoseenvelope/eoseenvelope.go @@ -0,0 +1,86 @@ +// Package eoseenvelope provides an encoder for the EOSE (End Of Stored +// Events) event that signifies that a REQ has found all stored events and +// from here on the request morphs into a subscription, until the limit, if +// requested, or until CLOSE or CLOSED. +package eoseenvelope + +import ( + "io" + + "lol.mleku.dev/chk" + "next.orly.dev/pkg/encoders/envelopes" + "next.orly.dev/pkg/encoders/text" + "next.orly.dev/pkg/interfaces/codec" +) + +// L is the label associated with this type of codec.Envelope. +const L = "EOSE" + +// T is an EOSE envelope (End of Stored Events), that signals the end of events +// that are stored and the beginning of a subscription. This is necessitated by +// the confusing multiplexing of websockets for multiple requests, and an ugly +// merging of two distinct API calls, filter and subscribe. +type T struct { + Subscription []byte +} + +var _ codec.Envelope = (*T)(nil) + +// New creates a new eoseenvelope.T with a standard form subscription.Id. +func New() *T { + return new(T) +} + +// NewFrom creates a new eoseenvelope.T using a provided subscription.Id. +func NewFrom[V []byte | string](id V) *T { return &T{Subscription: []byte(id)} } + +// Label returns the label of a EOSE envelope. +func (en *T) Label() string { return L } + +// Write the eoseenvelope.T to a provided io.Writer. +func (en *T) Write(w io.Writer) (err error) { + _, err = w.Write(en.Marshal(nil)) + return +} + +// Marshal a eoseenvelope.T envelope in minified JSON, appending to a provided +// destination slice. +func (en *T) Marshal(dst []byte) (b []byte) { + var err error + b = dst + b = envelopes.Marshal( + b, L, + func(bst []byte) (o []byte) { + o = bst + o = append(o, '"') + o = append(o, en.Subscription...) + o = append(o, '"') + return + }, + ) + _ = err + return +} + +// Unmarshal a eoseenvelope.T from minified JSON, returning the remainder after +// the end of the envelope. +func (en *T) Unmarshal(b []byte) (r []byte, err error) { + r = b + if en.Subscription, r, err = text.UnmarshalQuoted(r); chk.E(err) { + return + } + if r, err = envelopes.SkipToTheEnd(r); chk.E(err) { + return + } + return +} + +// Parse reads a EOSE envelope in minified JSON into a newly allocated +// eoseenvelope.T. +func Parse(b []byte) (t *T, rem []byte, err error) { + t = New() + if rem, err = t.Unmarshal(b); chk.E(err) { + return + } + return +} diff --git a/pkg/encoders/envelopes/eoseenvelope/eoseenvelope_test.go b/pkg/encoders/envelopes/eoseenvelope/eoseenvelope_test.go new file mode 100644 index 0000000..fd82c60 --- /dev/null +++ b/pkg/encoders/envelopes/eoseenvelope/eoseenvelope_test.go @@ -0,0 +1,66 @@ +package eoseenvelope + +import ( + "testing" + + "lol.mleku.dev/chk" + "next.orly.dev/pkg/encoders/envelopes" + "next.orly.dev/pkg/utils" +) + +func TestMarshalUnmarshal(t *testing.T) { + var err error + rb, rb1, rb2 := make([]byte, 0, 65535), make([]byte, 0, 65535), make( + []byte, 0, 65535, + ) + for i := range 1000 { + s := utils.NewSubscription(i) + req := NewFrom(s) + rb = req.Marshal(rb) + // log.I.Ln(req.ID) + rb1 = rb1[:len(rb)] + copy(rb1, rb) + var rem []byte + var l string + if l, rb, err = envelopes.Identify(rb); chk.E(err) { + t.Fatal(err) + } + if l != L { + t.Fatalf("invalid sentinel %s, expect %s", l, L) + } + req2 := New() + if rem, err = req2.Unmarshal(rb); chk.E(err) { + t.Fatal(err) + } + // log.I.Ln(req2.ID) + if len(rem) > 0 { + t.Fatalf( + "unmarshal failed, remainder\n%d %s", + len(rem), rem, + ) + } + rb2 = req2.Marshal(rb2) + if !utils.FastEqual(rb1, rb2) { + if len(rb1) != len(rb2) { + t.Fatalf( + "unmarshal failed, different lengths\n%d %s\n%d %s\n", + len(rb1), rb1, len(rb2), rb2, + ) + } + for i := range rb1 { + if rb1[i] != rb2[i] { + t.Fatalf( + "unmarshal failed, difference at position %d\n%d %s\n%s\n%d %s\n%s\n", + i, len(rb1), rb1[:i], rb1[i:], len(rb2), rb2[:i], + rb2[i:], + ) + } + } + t.Fatalf( + "unmarshal failed\n%d %s\n%d %s\n", + len(rb1), rb1, len(rb2), rb2, + ) + } + rb, rb1, rb2 = rb[:0], rb1[:0], rb2[:0] + } +} diff --git a/pkg/utils/subscription.go b/pkg/utils/subscription.go new file mode 100644 index 0000000..1f5fd4f --- /dev/null +++ b/pkg/utils/subscription.go @@ -0,0 +1,9 @@ +package utils + +import ( + "fmt" +) + +func NewSubscription(n int) []byte { + return []byte(fmt.Sprintf("sub:%d", n)) +}