From faa527756bf05fffb3e6869833754a212229ba23 Mon Sep 17 00:00:00 2001 From: mleku Date: Sat, 30 Aug 2025 14:43:32 +0100 Subject: [PATCH] Add `eventenvelope` codec with support for `Submission` and `Result` envelopes, implement `EstimateSize`, and increase buffer capacity --- .../envelopes/authenvelope/authenvelope.go | 6 + .../envelopes/eventenvelope/eventenvelope.go | 178 ++++++++++++++++++ .../eventenvelope/eventenvelope_test.go | 112 +++++++++++ pkg/encoders/event/event.go | 12 ++ pkg/utils/bufpool/bufpool.go | 2 +- 5 files changed, 309 insertions(+), 1 deletion(-) create mode 100644 pkg/encoders/envelopes/eventenvelope/eventenvelope.go create mode 100644 pkg/encoders/envelopes/eventenvelope/eventenvelope_test.go diff --git a/pkg/encoders/envelopes/authenvelope/authenvelope.go b/pkg/encoders/envelopes/authenvelope/authenvelope.go index 3acb549..bea0b82 100644 --- a/pkg/encoders/envelopes/authenvelope/authenvelope.go +++ b/pkg/encoders/envelopes/authenvelope/authenvelope.go @@ -12,6 +12,7 @@ import ( "next.orly.dev/pkg/encoders/event" text2 "next.orly.dev/pkg/encoders/text" "next.orly.dev/pkg/interfaces/codec" + "next.orly.dev/pkg/utils/units" ) // L is the label associated with this type of codec.Envelope. @@ -194,6 +195,11 @@ func (en *Response) Marshal(dst []byte) (b []byte) { err = errorf.E("nil event in response") return } + // if the destination capacity is not large enough, allocate a new + // destination slice. + if en.Event.EstimateSize() >= cap(dst) { + dst = make([]byte, 0, en.Event.EstimateSize()+units.Kb) + } b = dst b = envs.Marshal(b, L, en.Event.Marshal) _ = err diff --git a/pkg/encoders/envelopes/eventenvelope/eventenvelope.go b/pkg/encoders/envelopes/eventenvelope/eventenvelope.go new file mode 100644 index 0000000..8bf28ad --- /dev/null +++ b/pkg/encoders/envelopes/eventenvelope/eventenvelope.go @@ -0,0 +1,178 @@ +// Package eventenvelope is a codec for the event Submission request EVENT envelope +// (client) and event Result (to a REQ) from a relay. +package eventenvelope + +import ( + "io" + + "lol.mleku.dev/chk" + "lol.mleku.dev/errorf" + "next.orly.dev/pkg/encoders/envelopes" + "next.orly.dev/pkg/encoders/event" + "next.orly.dev/pkg/encoders/text" + "next.orly.dev/pkg/interfaces/codec" + "next.orly.dev/pkg/utils/bufpool" + "next.orly.dev/pkg/utils/units" +) + +// L is the label associated with this type of codec.Envelope. +const L = "EVENT" + +// Submission is a request from a client for a realy to store an event. +type Submission struct { + *event.E +} + +var _ codec.Envelope = (*Submission)(nil) + +// NewSubmission creates an empty new eventenvelope.Submission. +func NewSubmission() *Submission { return &Submission{E: &event.E{}} } + +// NewSubmissionWith creates a new eventenvelope.Submission with a provided event.E. +func NewSubmissionWith(ev *event.E) *Submission { return &Submission{E: ev} } + +// Label returns the label of a event eventenvelope.Submission envelope. +func (en *Submission) Label() string { return L } + +func (en *Submission) Id() []byte { return en.E.ID } + +// Write the Submission to a provided io.Writer. +func (en *Submission) Write(w io.Writer) (err error) { + _, err = w.Write(en.Marshal(nil)) + return +} + +// Marshal an event Submission envelope in minified JSON, appending to a +// provided destination slice. +func (en *Submission) Marshal(dst []byte) (b []byte) { + var err error + // if the destination capacity is not large enough, allocate a new + // destination slice. + if en.EstimateSize() >= cap(dst) { + dst = make([]byte, 0, en.EstimateSize()+units.Kb) + } + b = dst + b = envelopes.Marshal( + b, L, + func(bst []byte) (o []byte) { + o = bst + o = en.E.Marshal(o) + return + }, + ) + _ = err + return +} + +// Unmarshal an event eventenvelope.Submission from minified JSON, returning the +// remainder after the end of the envelope. +func (en *Submission) Unmarshal(b []byte) (r []byte, err error) { + r = b + en.E = event.New() + if r, err = en.E.Unmarshal(r); chk.T(err) { + return + } + buf := bufpool.Get() + r = en.E.Marshal(buf) + if r, err = envelopes.SkipToTheEnd(r); chk.E(err) { + return + } + return +} + +// ParseSubmission reads an event envelope Submission from minified JSON into a newly +// allocated eventenvelope.Submission. +func ParseSubmission(b []byte) (t *Submission, rem []byte, err error) { + t = NewSubmission() + if rem, err = t.Unmarshal(b); chk.E(err) { + return + } + return +} + +// Result is an event matching a filter associated with a subscription. +type Result struct { + Subscription []byte + Event *event.E +} + +var _ codec.Envelope = (*Result)(nil) + +// NewResult creates a new empty eventenvelope.Result. +func NewResult() *Result { return &Result{} } + +// NewResultWith creates a new eventenvelope.Result with a provided +// subscription.Id string and event.E. +func NewResultWith[V string | []byte](s V, ev *event.E) ( + res *Result, err error, +) { + if len(s) < 0 || len(s) > 64 { + err = errorf.E("subscription id must be length > 0 and <= 64") + return + } + return &Result{[]byte(s), ev}, nil +} + +func (en *Result) Id() []byte { return en.Event.ID } + +// Label returns the label of a event eventenvelope.Result envelope. +func (en *Result) Label() string { return L } + +// Write the eventenvelope.Result to a provided io.Writer. +func (en *Result) Write(w io.Writer) (err error) { + _, err = w.Write(en.Marshal(nil)) + return +} + +// Marshal an eventenvelope.Result envelope in minified JSON, appending to a +// provided destination slice. +func (en *Result) Marshal(dst []byte) (b []byte) { + // if the destination capacity is not large enough, allocate a new + // destination slice. + if en.Event.EstimateSize() >= cap(dst) { + dst = make([]byte, 0, en.Event.EstimateSize()+units.Kb) + } + b = dst + var err error + b = envelopes.Marshal( + b, L, + func(bst []byte) (o []byte) { + o = bst + o = append(o, '"') + o = append(o, en.Subscription...) + o = append(o, '"') + o = append(o, ',') + o = en.Event.Marshal(o) + return + }, + ) + _ = err + return +} + +// Unmarshal an event Result envelope from minified JSON, returning the +// remainder after the end of the envelope. +func (en *Result) Unmarshal(b []byte) (r []byte, err error) { + r = b + if en.Subscription, r, err = text.UnmarshalQuoted(r); chk.E(err) { + return + } + en.Event = event.New() + if r, err = en.Event.Unmarshal(r); err != nil { + return + } + if r, err = envelopes.SkipToTheEnd(r); chk.E(err) { + return + } + return +} + +// ParseResult allocates a new eventenvelope.Result and unmarshalls an EVENT +// envelope into it. +func ParseResult(b []byte) (t *Result, rem []byte, err error) { + t = NewResult() + if rem, err = t.Unmarshal(b); err != nil { + return + } + return +} diff --git a/pkg/encoders/envelopes/eventenvelope/eventenvelope_test.go b/pkg/encoders/envelopes/eventenvelope/eventenvelope_test.go new file mode 100644 index 0000000..44c130d --- /dev/null +++ b/pkg/encoders/envelopes/eventenvelope/eventenvelope_test.go @@ -0,0 +1,112 @@ +package eventenvelope + +import ( + "bufio" + "bytes" + "testing" + + "lol.mleku.dev/chk" + "next.orly.dev/pkg/encoders/envelopes" + "next.orly.dev/pkg/encoders/event" + "next.orly.dev/pkg/encoders/event/examples" + "next.orly.dev/pkg/utils" + "next.orly.dev/pkg/utils/bufpool" +) + +func TestSubmission(t *testing.T) { + scanner := bufio.NewScanner(bytes.NewBuffer(examples.Cache)) + var err error + for scanner.Scan() { + c, rem, out := bufpool.Get(), bufpool.Get(), bufpool.Get() + b := scanner.Bytes() + ev := event.New() + if _, err = ev.Unmarshal(b); chk.E(err) { + t.Fatal(err) + } + if len(rem) != 0 { + t.Fatalf( + "some of input remaining after marshal/unmarshal: '%s'", + rem, + ) + } + rem = rem[:0] + ea := NewSubmissionWith(ev) + rem = ea.Marshal(rem) + c = append(c, rem...) + var l string + if l, rem, err = envelopes.Identify(rem); chk.E(err) { + t.Fatal(err) + } + if l != L { + t.Fatalf("invalid sentinel %s, expect %s", l, L) + } + if rem, err = ea.Unmarshal(rem); chk.E(err) { + t.Fatal(err) + } + if len(rem) != 0 { + t.Fatalf( + "some of input remaining after marshal/unmarshal: '%s'", + rem, + ) + } + out = ea.Marshal(out) + if !utils.FastEqual(out, c) { + t.Fatalf("mismatched output\n%s\n\n%s\n", c, out) + } + bufpool.Put(c) + bufpool.Put(rem) + bufpool.Put(out) + } +} + +func TestResult(t *testing.T) { + scanner := bufio.NewScanner(bytes.NewBuffer(examples.Cache)) + var err error + var count int + for scanner.Scan() { + c, rem, out := bufpool.Get(), bufpool.Get(), bufpool.Get() + count++ + b := scanner.Bytes() + ev := event.New() + if _, err = ev.Unmarshal(b); chk.E(err) { + t.Fatal(err) + } + if len(rem) != 0 { + t.Fatalf( + "some of input remaining after marshal/unmarshal: '%s'", + rem, + ) + } + var ea *Result + if ea, err = NewResultWith( + utils.NewSubscription(count), ev, + ); chk.E(err) { + t.Fatal(err) + } + rem = ea.Marshal(rem) + c = append(c, rem...) + var l string + if l, rem, err = envelopes.Identify(rem); chk.E(err) { + t.Fatal(err) + } + if l != L { + t.Fatalf("invalid sentinel %s, expect %s", l, L) + } + if rem, err = ea.Unmarshal(rem); chk.E(err) { + t.Fatal(err) + } + if len(rem) != 0 { + t.Fatalf( + "some of input remaining after marshal/unmarshal: '%s'", + rem, + ) + } + out = ea.Marshal(out) + if !utils.FastEqual(out, c) { + t.Fatalf("mismatched output\n%s\n\n%s\n", c, out) + } + bufpool.Put(c) + bufpool.Put(rem) + bufpool.Put(out) + } +} diff --git a/pkg/encoders/event/event.go b/pkg/encoders/event/event.go index 0d220f9..0345dba 100644 --- a/pkg/encoders/event/event.go +++ b/pkg/encoders/event/event.go @@ -94,6 +94,18 @@ func (ev *E) Free() { ev.b = nil } +// EstimateSize returns a size for the event that allows for worst case scenario +// expansion of the escaped content and tags. +func (ev *E) EstimateSize() (size int) { + size = len(ev.ID)*2 + len(ev.Pubkey)*2 + len(ev.Sig)*2 + len(ev.Content)*2 + for _, v := range *ev.Tags { + for _, w := range (*v).T { + size += len(w) * 2 + } + } + return +} + func (ev *E) Marshal(dst []byte) (b []byte) { b = dst b = append(b, '{') diff --git a/pkg/utils/bufpool/bufpool.go b/pkg/utils/bufpool/bufpool.go index 5d82be2..6edfb4d 100644 --- a/pkg/utils/bufpool/bufpool.go +++ b/pkg/utils/bufpool/bufpool.go @@ -11,7 +11,7 @@ import ( const ( // BufferSize is the size of each buffer in the pool (1kb) - BufferSize = units.Kb / 2 + BufferSize = units.Kb ) type B []byte