Compare commits

...

15 Commits

Author SHA1 Message Date
9b7e8d28de remove redundant return 2025-07-22 15:02:46 +01:00
c16ee76638 add error message for failed fetch 2025-07-22 14:55:09 +01:00
132fdc9f36 Add test for NostrEscape and NostrUnescape handling in JSON tags
- Introduced a new test case `ExplicitlyEscapedJSON` in `json_tags_test.go`
- Validates the behavior of `NostrEscape` and `NostrUnescape` with explicitly escaped JSON
- Ensures proper marshaling, unmarshaling, and verification of nested and special characters in JSON tags
2025-07-22 13:16:09 +01:00
4f1d48c247 Refactor privilege check logs, enhance export functionality, and add new test coverage
- Updated privilege check log messages in `publisher.go` and `handleReq.go` for better clarity and consistency
- Improved event serialization by replacing `WriteTo` with `Serialize` in `export.go`
- Added unit tests for `Export` functionality in `export_test.go`
- Introduced tests for JSON tag handling in `json_tags_test.go`
- Simplified condition in `spider-fetch.go` by removing redundant checks
2025-07-22 12:47:03 +01:00
651791aec1 Add export functionality and fix privilege checks
- Added `Export` method in `database/database.go` to export events to an io.Writer
- Implemented detailed logic for exporting all or specific pubkeys' events
- Removed placeholder `Export` function with TODO comment from `database/database.go`
- Updated error handling in `handleReq.go` and `publisher.go` by using `err != nil` instead of `chk.E(err)`
- Added more detailed logging in privilege check conditions in both `publisher.go` and `handleReq.go`
- Introduced new imports such as `"fmt"` in `connection.go` for improved error message formatting
- Created a new file `export.go` under the `database` package with complete implementation of export functionality
2025-07-22 11:34:57 +01:00
53d649c64e Merge remote-tracking branch 'origin/main' 2025-07-21 15:05:19 +01:00
4dafab3fd6 Fix privilege check logic by inverting conditionals
- Inverted conditional in `publisher.go` to correctly skip unprivileged events
- Inverted conditional in `handleReq.go` to filter out unprivileged events properly
- Added fallback logic in `check-privilege.go` to mark as privileged if no conditions match
2025-07-21 15:05:07 +01:00
f2475c48b7 Fix privilege check logic in publisher and handleReq files
- Changed `auth.CheckPrivilege` condition from allowing to denying access if not privileged in `publisher.go`
- Updated `handleReq.go` to deny access if `auth.CheckPrivilege` returns false, improving consistency and security checks
2025-07-21 14:29:46 +01:00
b5448f4153 Add remote address to log and fix NIP20 prefix handling
- Updated `handleMessage.go` to include the real remote address in the log message when a message is received
- Removed `regexp` import and `NIP20prefixmatcher` variable from `publisher.go`
- Modified `server.go` to remove an unused parameter from the `AddEvent` method
- Added `NIP20prefixmatcher` variable and used it for checking error messages in `addEvent.go`
2025-07-21 13:35:10 +01:00
11d318d4e3 Update Challenge methods with detailed comments and improved functionality
- Added comprehensive documentation for Write, Marshal, Unmarshal, and ParseChallenge methods
- Improved method descriptions with parameters, return values, and expected behavior sections
- Enhanced clarity of implementation details in comments
- Standardized comment formatting across the file
2025-07-21 13:14:59 +01:00
53e8e160dd Fix check-privilege logic and remove redundant condition
- Removed unused `authedIsAuthor` variable and simplified privilege check logic in `check-privilege.go`
- Replaced conditional return based on `authedIsAuthor` with direct use of `privileged` flag
- Simplified the logic for checking if authed pubkey is mentioned in event tags
2025-07-21 11:36:57 +01:00
90c9198ebe Fix pluralization and add background fetch of metadata and relay lists
- Updated log message to use correct plurals for `owners`, `pubkey`, and related variables
- Added background fetching of profile metadata, relay list metadata, and DM relays list using `SpiderFetch` in the goroutine
- Modified `server.go` to import `"orly.dev/pkg/protocol/socketapi"` correctly and initialize `listeners` with the updated constructor
2025-07-20 23:57:41 +01:00
4bbbbb1bb6 Add DMRelaysList.K to kind.go and improve error messages with remote address in connection.go
- Added `DMRelaysList.K` constant to `pkg/encoders/kind/kind.go`
- Removed unused `"errors"` import from `pkg/protocol/ws/connection.go`
- Updated error messages in `WriteMessage`, `ReadMessage`, and related functions in `pkg/protocol/ws/connection.go` to include the remote address for better debugging
- Changed error handling from `chk.E(err)` to `chk.T(err)` in `pkg/encoders/envelopes/eventenvelope/eventenvelope.go`
- Updated ticker interval from 30 minutes to 1 hour in `pkg/app/relay/server.go`
2025-07-20 22:03:30 +01:00
56ab6eaa81 Remove redundant logging statements
- Removed log statement in `pkg/app/relay/spider-fetch.go` for batch processing
- Removed log level output in `main.go`
- Removed configuration logging in `pkg/app/config/config.go`
- Removed unused log import in `pkg/protocol/ws/subscription.go`
- Removed connection success log in `pkg/protocol/ws/client.go`
- Added early return condition for non-follow-list kind in `spider-fetch.go`
2025-07-20 21:52:04 +01:00
e3c931fcf9 Make spider fetch concurrent with mutex and waitgroup
- Added goroutines and sync primitives to handle concurrent relay connections in spider-fetch.go
- Added connection logging to client.go
2025-07-20 21:42:35 +01:00
29 changed files with 1183 additions and 268 deletions

View File

@@ -42,7 +42,6 @@ func main() {
config.PrintHelp(cfg, os.Stderr) config.PrintHelp(cfg, os.Stderr)
os.Exit(0) os.Exit(0)
} }
log.I.Ln("log level", cfg.LogLevel)
lol.SetLogLevel(cfg.LogLevel) lol.SetLogLevel(cfg.LogLevel)
if cfg.Pprof { if cfg.Pprof {
defer profile.Start(profile.MemProfile).Stop() defer profile.Start(profile.MemProfile).Stop()

View File

@@ -87,7 +87,6 @@ func New() (cfg *C, err error) {
lol.SetLogLevel(cfg.LogLevel) lol.SetLogLevel(cfg.LogLevel)
log.I.F("loaded configuration from %s", envPath) log.I.F("loaded configuration from %s", envPath)
} }
log.I.S(cfg)
return return
} }

View File

@@ -3,16 +3,20 @@ package relay
import ( import (
"errors" "errors"
"net/http" "net/http"
"regexp"
"strings" "strings"
"orly.dev/pkg/encoders/event" "orly.dev/pkg/encoders/event"
"orly.dev/pkg/interfaces/relay" "orly.dev/pkg/interfaces/relay"
"orly.dev/pkg/interfaces/store" "orly.dev/pkg/interfaces/store"
"orly.dev/pkg/protocol/socketapi"
"orly.dev/pkg/utils/context" "orly.dev/pkg/utils/context"
"orly.dev/pkg/utils/normalize" "orly.dev/pkg/utils/normalize"
) )
var (
NIP20prefixmatcher = regexp.MustCompile(`^\w+: `)
)
// AddEvent processes an incoming event, saves it if valid, and delivers it to // AddEvent processes an incoming event, saves it if valid, and delivers it to
// subscribers. // subscribers.
// //
@@ -50,9 +54,7 @@ import (
// - Returns a boolean indicating whether the event was accepted and any // - Returns a boolean indicating whether the event was accepted and any
// relevant message. // relevant message.
func (s *Server) AddEvent( func (s *Server) AddEvent(
c context.T, rl relay.I, ev *event.E, c context.T, rl relay.I, ev *event.E, hr *http.Request, origin string,
hr *http.Request, origin string,
authedPubkey []byte,
) (accepted bool, message []byte) { ) (accepted bool, message []byte) {
if ev == nil { if ev == nil {
@@ -65,9 +67,12 @@ func (s *Server) AddEvent(
return false, []byte(saveErr.Error()) return false, []byte(saveErr.Error())
} }
errmsg := saveErr.Error() errmsg := saveErr.Error()
if socketapi.NIP20prefixmatcher.MatchString(errmsg) { if NIP20prefixmatcher.MatchString(errmsg) {
if strings.Contains(errmsg, "tombstone") { if strings.Contains(errmsg, "tombstone") {
return false, normalize.Error.F("event was deleted, not storing it again") return false, normalize.Error.F(
"%s event was deleted, not storing it again",
origin,
)
} }
if strings.HasPrefix(errmsg, string(normalize.Blocked)) { if strings.HasPrefix(errmsg, string(normalize.Blocked)) {
return false, []byte(errmsg) return false, []byte(errmsg)

View File

@@ -6,6 +6,7 @@ import (
"fmt" "fmt"
"net" "net"
"net/http" "net/http"
"orly.dev/pkg/protocol/socketapi"
"strconv" "strconv"
"time" "time"
@@ -15,7 +16,6 @@ import (
"orly.dev/pkg/app/relay/publish" "orly.dev/pkg/app/relay/publish"
"orly.dev/pkg/interfaces/relay" "orly.dev/pkg/interfaces/relay"
"orly.dev/pkg/protocol/servemux" "orly.dev/pkg/protocol/servemux"
"orly.dev/pkg/protocol/socketapi"
"orly.dev/pkg/utils/chk" "orly.dev/pkg/utils/chk"
"orly.dev/pkg/utils/context" "orly.dev/pkg/utils/context"
"orly.dev/pkg/utils/log" "orly.dev/pkg/utils/log"
@@ -90,15 +90,15 @@ func NewServer(sp *ServerParams, opts ...options.O) (s *Server, err error) {
} }
serveMux := servemux.NewServeMux() serveMux := servemux.NewServeMux()
s = &Server{ s = &Server{
Ctx: sp.Ctx, Ctx: sp.Ctx,
Cancel: sp.Cancel, Cancel: sp.Cancel,
relay: sp.Rl, relay: sp.Rl,
mux: serveMux, mux: serveMux,
options: op, options: op,
listeners: publish.New(socketapi.New()), C: sp.C,
C: sp.C, Lists: new(Lists),
Lists: new(Lists),
} }
s.listeners = publish.New(socketapi.New(s))
go func() { go func() {
if err := s.relay.Init(); chk.E(err) { if err := s.relay.Init(); chk.E(err) {
s.Shutdown() s.Shutdown()
@@ -191,7 +191,7 @@ func (s *Server) Start(
} }
} }
// start up a spider run to trigger every 30 minutes // start up a spider run to trigger every 30 minutes
ticker := time.NewTicker(30 * time.Minute) ticker := time.NewTicker(time.Hour)
go func() { go func() {
for { for {
select { select {

View File

@@ -5,7 +5,6 @@ import (
"orly.dev/pkg/encoders/event" "orly.dev/pkg/encoders/event"
"orly.dev/pkg/encoders/filter" "orly.dev/pkg/encoders/filter"
"orly.dev/pkg/encoders/hex" "orly.dev/pkg/encoders/hex"
"orly.dev/pkg/encoders/kind"
"orly.dev/pkg/encoders/kinds" "orly.dev/pkg/encoders/kinds"
"orly.dev/pkg/encoders/tag" "orly.dev/pkg/encoders/tag"
"orly.dev/pkg/protocol/ws" "orly.dev/pkg/protocol/ws"
@@ -13,15 +12,16 @@ import (
"orly.dev/pkg/utils/context" "orly.dev/pkg/utils/context"
"orly.dev/pkg/utils/log" "orly.dev/pkg/utils/log"
"sort" "sort"
"sync"
) )
func (s *Server) SpiderFetch( func (s *Server) SpiderFetch(
k *kind.T, noFetch bool, pubkeys ...[]byte, k *kinds.T, noFetch, noExtract bool, pubkeys ...[]byte,
) (pks [][]byte, err error) { ) (pks [][]byte, err error) {
// first search the local database // first search the local database
pkList := tag.New(pubkeys...) pkList := tag.New(pubkeys...)
f := &filter.F{ f := &filter.F{
Kinds: kinds.New(k), Kinds: k,
Authors: pkList, Authors: pkList,
} }
var evs event.S var evs event.S
@@ -29,54 +29,83 @@ func (s *Server) SpiderFetch(
// none were found, so we need to scan the spiders // none were found, so we need to scan the spiders
err = nil err = nil
} }
if len(evs) < len(pubkeys) && !noFetch { var kindsList string
for i, kk := range k.K {
if i > 0 {
kindsList += ","
}
kindsList += kk.Name()
}
log.I.F("%d events found of type %s", len(evs), kindsList)
// for _, ev := range evs {
// o += fmt.Sprintf("%s\n\n", ev.Marshal(nil))
// }
// log.I.F("%s", o)
if !noFetch {
// we need to search the spider seeds. // we need to search the spider seeds.
// Break up pubkeys into batches of 512 // Break up pubkeys into batches of 128
log.I.F("breaking up %d pubkeys into batches of 512", len(pubkeys)) for i := 0; i < len(pubkeys); i += 128 {
for i := 0; i < len(pubkeys); i += 512 { end := i + 128
end := i + 512
if end > len(pubkeys) { if end > len(pubkeys) {
end = len(pubkeys) end = len(pubkeys)
} }
batchPubkeys := pubkeys[i:end] batchPubkeys := pubkeys[i:end]
log.I.F( log.I.F(
"processing batch %d to %d of %d pubkeys", i, end, len(pubkeys), "processing batch %d to %d of %d for kind %s",
i, end, len(pubkeys), kindsList,
) )
batchPkList := tag.New(batchPubkeys...) batchPkList := tag.New(batchPubkeys...)
lim := uint(batchPkList.Len())
batchFilter := &filter.F{ batchFilter := &filter.F{
Kinds: kinds.New(k), Kinds: k,
Authors: batchPkList, Authors: batchPkList,
Limit: &lim,
} }
var mx sync.Mutex
var wg sync.WaitGroup
for _, seed := range s.C.SpiderSeeds { for _, seed := range s.C.SpiderSeeds {
select { wg.Add(1)
case <-s.Ctx.Done(): go func() {
return defer wg.Done()
default: select {
} case <-s.Ctx.Done():
var evss event.S return
var cli *ws.Client default:
if cli, err = ws.RelayConnect(context.Bg(), seed); chk.E(err) { }
err = nil var evss event.S
continue var cli *ws.Client
} if cli, err = ws.RelayConnect(
if evss, err = cli.QuerySync( context.Bg(), seed,
context.Bg(), batchFilter, ); chk.E(err) {
); chk.E(err) { err = nil
err = nil return
continue }
} if evss, err = cli.QuerySync(
for _, ev := range evss { context.Bg(), batchFilter,
evs = append(evs, ev) ); chk.E(err) {
} err = nil
} return
} }
// save the events to the database mx.Lock()
for _, ev := range evs { // save the events to the database
if _, _, err = s.Storage().SaveEvent(s.Ctx, ev); chk.E(err) { for _, ev := range evss {
err = nil log.I.F("saving event:\n%s", ev.Marshal(nil))
continue if _, _, err = s.Storage().SaveEvent(
s.Ctx, ev,
); chk.E(err) {
err = nil
continue
}
}
for _, ev := range evss {
evs = append(evs, ev)
}
mx.Unlock()
}()
} }
wg.Wait()
} }
} }
// deduplicate and take the newest // deduplicate and take the newest
@@ -95,7 +124,10 @@ func (s *Server) SpiderFetch(
tmp = append(tmp, evm[0]) tmp = append(tmp, evm[0])
} }
evs = tmp evs = tmp
// we have all we're going to get now // we have all we're going to get now, extract the p tags
if noExtract {
return
}
pkMap := make(map[string]struct{}) pkMap := make(map[string]struct{})
for _, ev := range evs { for _, ev := range evs {
t := ev.Tags.GetAll(tag.New("p")) t := ev.Tags.GetAll(tag.New("p"))
@@ -105,7 +137,7 @@ func (s *Server) SpiderFetch(
continue continue
} }
pk := make([]byte, schnorr.PubKeyBytesLen) pk := make([]byte, schnorr.PubKeyBytesLen)
if _, err = hex.DecBytes(pk, pkh); chk.E(err) { if _, err = hex.DecBytes(pk, pkh); err != nil {
err = nil err = nil
continue continue
} }

View File

@@ -6,6 +6,7 @@ import (
"orly.dev/pkg/encoders/bech32encoding" "orly.dev/pkg/encoders/bech32encoding"
"orly.dev/pkg/encoders/hex" "orly.dev/pkg/encoders/hex"
"orly.dev/pkg/encoders/kind" "orly.dev/pkg/encoders/kind"
"orly.dev/pkg/encoders/kinds"
"orly.dev/pkg/utils/chk" "orly.dev/pkg/utils/chk"
"orly.dev/pkg/utils/log" "orly.dev/pkg/utils/log"
) )
@@ -44,90 +45,91 @@ func (s *Server) Spider(noFetch ...bool) (err error) {
// there is no OwnersPubkeys, so there is nothing to do. // there is no OwnersPubkeys, so there is nothing to do.
return return
} }
dontFetch := false go func() {
if len(noFetch) > 0 && noFetch[0] { dontFetch := false
dontFetch = true if len(noFetch) > 0 && noFetch[0] {
} dontFetch = true
log.I.F("getting ownersFollowed")
var ownersFollowed [][]byte
if ownersFollowed, err = s.SpiderFetch(
kind.FollowList, dontFetch, ownersPubkeys...,
); chk.E(err) {
return
}
log.I.F("getting followedFollows")
var followedFollows [][]byte
if followedFollows, err = s.SpiderFetch(
kind.FollowList, dontFetch, ownersFollowed...,
); chk.E(err) {
return
}
log.I.F("getting ownersMuted")
var ownersMuted [][]byte
if ownersMuted, err = s.SpiderFetch(
kind.MuteList, dontFetch, ownersPubkeys...,
); chk.E(err) {
return
}
// remove the ownersFollowed and ownersMuted items from the followedFollows
// list
filteredFollows := make([][]byte, 0, len(followedFollows))
for _, follow := range followedFollows {
found := false
for _, owner := range ownersFollowed {
if bytes.Equal(follow, owner) {
found = true
break
}
} }
for _, owner := range ownersMuted { log.I.F("getting ownersFollowed")
if bytes.Equal(follow, owner) { var ownersFollowed [][]byte
found = true if ownersFollowed, err = s.SpiderFetch(
break kinds.New(kind.FollowList), dontFetch, false, ownersPubkeys...,
} ); chk.E(err) {
return
} }
if !found { // log.I.S(ownersFollowed)
log.I.F("getting followedFollows")
var followedFollows [][]byte
if followedFollows, err = s.SpiderFetch(
kinds.New(kind.FollowList), dontFetch, false, ownersFollowed...,
); chk.E(err) {
return
}
log.I.F("getting ownersMuted")
var ownersMuted [][]byte
if ownersMuted, err = s.SpiderFetch(
kinds.New(kind.MuteList), dontFetch, false, ownersPubkeys...,
); chk.E(err) {
return
}
// remove the ownersFollowed and ownersMuted items from the followedFollows
// list
filteredFollows := make([][]byte, 0, len(followedFollows))
for _, follow := range followedFollows {
for _, owner := range ownersFollowed {
if bytes.Equal(follow, owner) {
break
}
}
for _, owner := range ownersMuted {
if bytes.Equal(follow, owner) {
break
}
}
filteredFollows = append(filteredFollows, follow) filteredFollows = append(filteredFollows, follow)
} }
} followedFollows = filteredFollows
followedFollows = filteredFollows own := "owner"
own := "owner" if len(ownersPubkeys) > 1 {
if len(ownersPubkeys) > 1 { own = "owners"
own = "owners" }
} fol := "pubkey"
fol := "pubkey" if len(ownersFollowed) > 1 {
if len(ownersFollowed) > 1 { fol = "pubkeys"
fol = "pubkeys" }
} folfol := "pubkey"
folfol := "pubkey" if len(followedFollows) > 1 {
if len(followedFollows) > 1 { folfol = "pubkeys"
folfol = "pubkeys" }
} mut := "pubkey"
mut := "pubkey" if len(ownersMuted) > 1 {
if len(ownersMuted) > 1 { mut = "pubkeys"
mut = "pubkeys" }
} log.T.F(
log.T.F( "found %d %s with a total of %d followed %s and %d followed's follows %s, and excluding %d owner muted %s",
"found %d %s with a total of %d followed %s and %d followed's follows %s, and excluding %d owner muted %s", len(ownersPubkeys), own,
len(ownersPubkeys), own, len(ownersFollowed), fol,
len(ownersFollowed), fol, len(followedFollows), folfol,
len(followedFollows), folfol, len(ownersMuted), mut,
len(ownersMuted), mut, )
) // add the owners to the ownersFollowed
// add the owners ownersFollowed = append(ownersFollowed, ownersPubkeys...)
ownersFollowed = append(ownersFollowed, ownersPubkeys...) s.SetOwnersPubkeys(ownersPubkeys)
s.SetOwnersPubkeys(ownersPubkeys) s.SetOwnersFollowed(ownersFollowed)
s.SetOwnersFollowed(ownersFollowed) s.SetFollowedFollows(followedFollows)
s.SetFollowedFollows(followedFollows) s.SetOwnersMuted(ownersMuted)
s.SetOwnersMuted(ownersMuted) // lastly, update users profile metadata and relay lists in the background
// lastly, update users profile metadata and relay lists in the background if !dontFetch {
if !dontFetch { go func() {
go func() { everyone := append(ownersFollowed, followedFollows...)
everyone := append(ownersFollowed, followedFollows...) s.SpiderFetch(
s.SpiderFetch(kind.ProfileMetadata, false, everyone...) kinds.New(
s.SpiderFetch(kind.RelayListMetadata, false, everyone...) kind.ProfileMetadata, kind.RelayListMetadata,
s.SpiderFetch(kind.DMRelaysList, false, everyone...) kind.DMRelaysList,
}() ), false, true, everyone...,
} )
}()
}
}()
return return
} }

View File

@@ -80,11 +80,6 @@ func (d *D) Import(r io.Reader) {
panic("implement me") panic("implement me")
} }
func (d *D) Export(c context.T, w io.Writer, pubkeys ...[]byte) {
// TODO implement me
panic("implement me")
}
func (d *D) SetLogLevel(level string) { func (d *D) SetLogLevel(level string) {
d.Logger.SetLogLevel(lol.GetLogLevel(level)) d.Logger.SetLogLevel(lol.GetLogLevel(level))
} }

98
pkg/database/export.go Normal file
View File

@@ -0,0 +1,98 @@
package database
import (
"bytes"
"github.com/dgraph-io/badger/v4"
"io"
"orly.dev/pkg/database/indexes"
"orly.dev/pkg/database/indexes/types"
"orly.dev/pkg/encoders/codecbuf"
"orly.dev/pkg/encoders/event"
"orly.dev/pkg/utils/chk"
"orly.dev/pkg/utils/context"
"orly.dev/pkg/utils/units"
)
// Export the complete database of stored events to an io.Writer in line structured minified
// JSON.
func (d *D) Export(c context.T, w io.Writer, pubkeys ...[]byte) {
var err error
if len(pubkeys) == 0 {
if err = d.View(
func(txn *badger.Txn) (err error) {
buf := codecbuf.Get()
defer codecbuf.Put(buf)
if err = indexes.EventEnc(nil).MarshalWrite(buf); chk.E(err) {
return
}
it := txn.NewIterator(badger.IteratorOptions{Prefix: buf.Bytes()})
evB := make([]byte, 0, units.Mb)
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
if evB, err = item.ValueCopy(evB); chk.E(err) {
continue
}
evBuf := bytes.NewBuffer(evB)
ev := event.New()
if err = ev.UnmarshalBinary(evBuf); chk.E(err) {
continue
}
// Serialize the event to JSON and write it to the output
if _, err = w.Write(ev.Serialize()); chk.E(err) {
continue
}
if _, err = w.Write([]byte{'\n'}); chk.E(err) {
continue
}
}
return
},
); err != nil {
return
}
} else {
for _, pubkey := range pubkeys {
if err = d.View(
func(txn *badger.Txn) (err error) {
pkBuf := codecbuf.Get()
defer codecbuf.Put(pkBuf)
ph := &types.PubHash{}
if err = ph.FromPubkey(pubkey); chk.E(err) {
return
}
if err = indexes.PubkeyEnc(
ph, nil, nil,
).MarshalWrite(pkBuf); chk.E(err) {
return
}
it := txn.NewIterator(badger.IteratorOptions{Prefix: pkBuf.Bytes()})
evB := make([]byte, 0, units.Mb)
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
if evB, err = item.ValueCopy(evB); chk.E(err) {
continue
}
evBuf := bytes.NewBuffer(evB)
ev := event.New()
if err = ev.UnmarshalBinary(evBuf); chk.E(err) {
continue
}
// Serialize the event to JSON and write it to the output
if _, err = w.Write(ev.Serialize()); chk.E(err) {
continue
}
if _, err = w.Write([]byte{'\n'}); chk.E(err) {
continue
}
}
return
},
); err != nil {
return
}
}
}
return
}

111
pkg/database/export_test.go Normal file
View File

@@ -0,0 +1,111 @@
package database
import (
"bufio"
"bytes"
"os"
"testing"
"orly.dev/pkg/encoders/event"
"orly.dev/pkg/encoders/event/examples"
"orly.dev/pkg/utils/chk"
"orly.dev/pkg/utils/context"
)
// TestExport tests the Export function by:
// 1. Creating a new database with events from examples.Cache
// 2. Checking that all event IDs in the cache are found in the export
// 3. Verifying this also works when only a few pubkeys are requested
func TestExport(t *testing.T) {
// Create a temporary directory for the database
tempDir, err := os.MkdirTemp("", "test-db-*")
if err != nil {
t.Fatalf("Failed to create temporary directory: %v", err)
}
defer os.RemoveAll(tempDir) // Clean up after the test
// Create a context and cancel function for the database
ctx, cancel := context.Cancel(context.Bg())
defer cancel()
// Initialize the database
db, err := New(ctx, cancel, tempDir, "info")
if err != nil {
t.Fatalf("Failed to create database: %v", err)
}
defer db.Close()
// Create a scanner to read events from examples.Cache
scanner := bufio.NewScanner(bytes.NewBuffer(examples.Cache))
scanner.Buffer(make([]byte, 0, 1_000_000_000), 1_000_000_000)
// Maps to store event IDs and their associated pubkeys
eventIDs := make(map[string]bool)
pubkeyToEventIDs := make(map[string][]string)
// Process each event
for scanner.Scan() {
chk.E(scanner.Err())
b := scanner.Bytes()
ev := event.New()
// Unmarshal the event
if _, err = ev.Unmarshal(b); chk.E(err) {
t.Fatal(err)
}
// Save the event to the database
if _, _, err = db.SaveEvent(ctx, ev); err != nil {
t.Fatalf("Failed to save event: %v", err)
}
// Store the event ID
eventID := ev.IdString()
eventIDs[eventID] = true
// Store the event ID by pubkey
pubkey := ev.PubKeyString()
pubkeyToEventIDs[pubkey] = append(pubkeyToEventIDs[pubkey], eventID)
}
// Check for scanner errors
if err = scanner.Err(); err != nil {
t.Fatalf("Scanner error: %v", err)
}
t.Logf("Saved %d events to the database", len(eventIDs))
// Test 1: Export all events and verify all IDs are in the export
var exportBuffer bytes.Buffer
db.Export(ctx, &exportBuffer)
// Parse the exported events and check that all IDs are present
exportedIDs := make(map[string]bool)
exportScanner := bufio.NewScanner(&exportBuffer)
exportScanner.Buffer(make([]byte, 0, 1_000_000_000), 1_000_000_000)
exportCount := 0
for exportScanner.Scan() {
b := exportScanner.Bytes()
ev := event.New()
if _, err = ev.Unmarshal(b); chk.E(err) {
t.Fatal(err)
}
exportedIDs[ev.IdString()] = true
exportCount++
}
// Check for scanner errors
if err = exportScanner.Err(); err != nil {
t.Fatalf("Scanner error: %v", err)
}
t.Logf("Found %d events in the export", exportCount)
// Check that all original event IDs are in the export
for id := range eventIDs {
if !exportedIDs[id] {
t.Errorf("Event ID %s not found in export", id)
}
}
t.Logf("All %d event IDs found in export", len(eventIDs))
}

View File

@@ -8,6 +8,7 @@ import (
"orly.dev/pkg/encoders/codecbuf" "orly.dev/pkg/encoders/codecbuf"
"orly.dev/pkg/interfaces/store" "orly.dev/pkg/interfaces/store"
"orly.dev/pkg/utils/chk" "orly.dev/pkg/utils/chk"
"orly.dev/pkg/utils/errorf"
) )
func (d *D) GetFullIdPubkeyBySerial(ser *types.Uint40) ( func (d *D) GetFullIdPubkeyBySerial(ser *types.Uint40) (
@@ -53,5 +54,11 @@ func (d *D) GetFullIdPubkeyBySerial(ser *types.Uint40) (
); chk.E(err) { ); chk.E(err) {
return return
} }
if fidpk != nil {
err = errorf.E(
"failed to fetch full id pubkey by serial %d",
ser.Get(),
)
}
return return
} }

View File

@@ -32,7 +32,7 @@ func (d *D) QueryEvents(c context.T, f *filter.F) (evs event.S, err error) {
} }
// fetch the events // fetch the events
var ev *event.E var ev *event.E
if ev, err = d.FetchEventBySerial(ser); chk.E(err) { if ev, err = d.FetchEventBySerial(ser); err != nil {
continue continue
} }
evs = append(evs, ev) evs = append(evs, ev)
@@ -46,7 +46,6 @@ func (d *D) QueryEvents(c context.T, f *filter.F) (evs event.S, err error) {
} else { } else {
var idPkTs []store.IdPkTs var idPkTs []store.IdPkTs
if idPkTs, err = d.QueryForIds(c, f); chk.E(err) { if idPkTs, err = d.QueryForIds(c, f); chk.E(err) {
return
} }
// Create a map to store the latest version of replaceable events // Create a map to store the latest version of replaceable events

View File

@@ -16,7 +16,6 @@ import (
"orly.dev/pkg/utils/chk" "orly.dev/pkg/utils/chk"
"orly.dev/pkg/utils/context" "orly.dev/pkg/utils/context"
"orly.dev/pkg/utils/errorf" "orly.dev/pkg/utils/errorf"
"orly.dev/pkg/utils/log"
"sort" "sort"
) )
@@ -38,7 +37,6 @@ func (d *D) SaveEvent(c context.T, ev *event.E) (kc, vc int, err error) {
DTag: t.Value(), DTag: t.Value(),
} }
at := a.Marshal(nil) at := a.Marshal(nil)
log.I.S(at)
if idxs, err = GetIndexesFromFilter( if idxs, err = GetIndexesFromFilter(
&filter.F{ &filter.F{
Authors: tag.New(ev.Pubkey), Authors: tag.New(ev.Pubkey),

View File

@@ -35,7 +35,20 @@ func NewChallengeWith[V string | []byte](challenge V) *Challenge {
// Label returns the label of a authenvelope.Challenge. // Label returns the label of a authenvelope.Challenge.
func (en *Challenge) Label() string { return L } func (en *Challenge) Label() string { return L }
// Write the authenvelope.Challenge to a provided io.Writer. // Write encodes and writes the Challenge instance to the provided writer.
//
// # Parameters
//
// - w (io.Writer): The destination where the encoded data will be written.
//
// # Return Values
//
// - err (error): An error if writing to the writer fails.
//
// # Expected behaviour
//
// Encodes the Challenge instance into a byte slice using Marshal, logs the
// encoded challenge, and writes it to the provided io.Writer.
func (en *Challenge) Write(w io.Writer) (err error) { func (en *Challenge) Write(w io.Writer) (err error) {
var b []byte var b []byte
b = en.Marshal(b) b = en.Marshal(b)
@@ -44,8 +57,26 @@ func (en *Challenge) Write(w io.Writer) (err error) {
return return
} }
// Marshal a authenvelope.Challenge to minified JSON, appending to a provided destination // Marshal encodes the Challenge instance into a byte slice, formatting it as
// slice. Note that this ensures correct string escaping on the challenge field. // a JSON-like structure with a specific label and escaping rules applied to
// its content.
//
// # Parameters
//
// - dst ([]byte): The destination buffer where the encoded data will be written.
//
// # Return Values
//
// - b ([]byte): The byte slice containing the encoded Challenge data.
//
// # Expected behaviour
//
// - Prepares the destination buffer and applies a label to it.
//
// - Escapes the challenge content according to Nostr-specific rules before
// appending it to the output.
//
// - Returns the resulting byte slice with the complete encoded structure.
func (en *Challenge) Marshal(dst []byte) (b []byte) { func (en *Challenge) Marshal(dst []byte) (b []byte) {
b = dst b = dst
var err error var err error
@@ -63,9 +94,24 @@ func (en *Challenge) Marshal(dst []byte) (b []byte) {
return return
} }
// Unmarshal a authenvelope.Challenge from minified JSON, returning the remainder after the // Unmarshal parses the provided byte slice and extracts the challenge value,
// end of the envelope. Note that this ensures the challenge string was // leaving any remaining bytes after parsing.
// correctly escaped by NIP-01 escaping rules. //
// # Parameters
//
// - b ([]byte): The byte slice containing the encoded challenge data.
//
// # Return Values
//
// - r ([]byte): Any remaining bytes after parsing the challenge.
//
// - err (error): An error if parsing fails.
//
// # Expected behaviour
//
// - Extracts the quoted challenge string from the input byte slice.
//
// - Trims any trailing characters following the closing quote.
func (en *Challenge) Unmarshal(b []byte) (r []byte, err error) { func (en *Challenge) Unmarshal(b []byte) (r []byte, err error) {
r = b r = b
if en.Challenge, r, err = text2.UnmarshalQuoted(r); chk.E(err) { if en.Challenge, r, err = text2.UnmarshalQuoted(r); chk.E(err) {
@@ -80,8 +126,26 @@ func (en *Challenge) Unmarshal(b []byte) (r []byte, err error) {
return return
} }
// ParseChallenge reads a authenvelope.Challenge encoded in minified JSON and unpacks it to // ParseChallenge parses the provided byte slice into a new Challenge instance,
// the runtime format. // extracting the challenge value and returning any remaining bytes after parsing.
//
// # Parameters
//
// - b ([]byte): The byte slice containing the encoded challenge data.
//
// # Return Values
//
// - t (*Challenge): A pointer to the newly created and populated Challenge
// instance.
//
// - rem ([]byte): Any remaining bytes in the input slice after parsing.
//
// - err (error): An error if parsing fails.
//
// # Expected behaviour
//
// Parses the byte slice into a new Challenge instance using Unmarshal,
// returning any remaining bytes and an error if parsing fails.
func ParseChallenge(b []byte) (t *Challenge, rem []byte, err error) { func ParseChallenge(b []byte) (t *Challenge, rem []byte, err error) {
t = NewChallenge() t = NewChallenge()
if rem, err = t.Unmarshal(b); chk.E(err) { if rem, err = t.Unmarshal(b); chk.E(err) {

View File

@@ -145,7 +145,7 @@ func (en *Result) Unmarshal(b []byte) (r []byte, err error) {
return return
} }
en.Event = event.New() en.Event = event.New()
if r, err = en.Event.Unmarshal(r); chk.E(err) { if r, err = en.Event.Unmarshal(r); err != nil {
return return
} }
if r, err = envelopes.SkipToTheEnd(r); chk.E(err) { if r, err = envelopes.SkipToTheEnd(r); chk.E(err) {
@@ -158,7 +158,7 @@ func (en *Result) Unmarshal(b []byte) (r []byte, err error) {
// envelope into it. // envelope into it.
func ParseResult(b []byte) (t *Result, rem []byte, err error) { func ParseResult(b []byte) (t *Result, rem []byte, err error) {
t = NewResult() t = NewResult()
if rem, err = t.Unmarshal(b); chk.E(err) { if rem, err = t.Unmarshal(b); err != nil {
return return
} }
return return

View File

@@ -2,6 +2,7 @@ package event
import ( import (
"bytes" "bytes"
"fmt"
"github.com/minio/sha256-simd" "github.com/minio/sha256-simd"
"io" "io"
"orly.dev/pkg/crypto/ec/schnorr" "orly.dev/pkg/crypto/ec/schnorr"
@@ -300,7 +301,7 @@ AfterClose:
} }
return return
invalid: invalid:
err = errorf.E( err = fmt.Errorf(
"invalid key,\n'%s'\n'%s'\n'%s'", string(b), string(b[:len(r)]), "invalid key,\n'%s'\n'%s'\n'%s'", string(b), string(b[:len(r)]),
string(r), string(r),
) )

View File

@@ -0,0 +1,422 @@
package event
import (
"bytes"
"orly.dev/pkg/encoders/kind"
"orly.dev/pkg/encoders/tag"
"orly.dev/pkg/encoders/tags"
text2 "orly.dev/pkg/encoders/text"
"orly.dev/pkg/encoders/timestamp"
"testing"
)
// compareTags compares two tags and reports any differences
func compareTags(t *testing.T, expected, actual *tags.T, context string) {
if expected == nil && actual == nil {
return
}
if expected == nil || actual == nil {
t.Errorf("%s: One of the tags is nil", context)
return
}
expectedSlice := expected.ToStringsSlice()
actualSlice := actual.ToStringsSlice()
if len(expectedSlice) != len(actualSlice) {
t.Errorf(
"%s: Tags length mismatch: expected %d, got %d", context,
len(expectedSlice), len(actualSlice),
)
return
}
for i, expectedTag := range expectedSlice {
actualTag := actualSlice[i]
if len(expectedTag) != len(actualTag) {
t.Errorf(
"%s: Tag[%d] length mismatch: expected %d, got %d", context, i,
len(expectedTag), len(actualTag),
)
continue
}
for j, expectedElem := range expectedTag {
if expectedElem != actualTag[j] {
t.Errorf(
"%s: Tag[%d][%d] mismatch: expected '%s', got '%s'",
context, i, j, expectedElem, actualTag[j],
)
}
}
}
}
// TestUnmarshalEscapedJSONInTags tests that the Unmarshal function correctly handles
// tags with fields containing escaped JSON that has been escaped using NostrEscape.
func TestUnmarshalEscapedJSONInTags(t *testing.T) {
// Test 1: Tag with a field containing escaped JSON
t.Run("SimpleEscapedJSON", func(t *testing.T) {
// Create a tag with a field containing JSON that needs escaping
jsonContent := `{"key":"value","nested":{"array":[1,2,3]}}`
// Create the event with the tag containing JSON
originalEvent := &E{
Id: bytes.Repeat([]byte{0x01}, 32),
Pubkey: bytes.Repeat([]byte{0x02}, 32),
CreatedAt: timestamp.FromUnix(1609459200),
Kind: kind.TextNote,
Tags: tags.New(),
Content: []byte("Event with JSON in tag"),
Sig: bytes.Repeat([]byte{0x03}, 64),
}
// Add a tag with JSON content
jsonTag := tag.New("j", jsonContent)
originalEvent.Tags.AppendTags(jsonTag)
// Marshal the event
marshaled := originalEvent.Marshal(nil)
// Unmarshal back into a new event
unmarshaledEvent := &E{}
_, err := unmarshaledEvent.Unmarshal(marshaled)
if err != nil {
t.Fatalf("Failed to unmarshal event with JSON in tag: %v", err)
}
// Verify the tag was correctly unmarshaled
if unmarshaledEvent.Tags.Len() != 1 {
t.Fatalf("Expected 1 tag, got %d", unmarshaledEvent.Tags.Len())
}
unmarshaledTag := unmarshaledEvent.Tags.GetTagElement(0)
if unmarshaledTag.Len() != 2 {
t.Fatalf("Expected tag with 2 elements, got %d", unmarshaledTag.Len())
}
if string(unmarshaledTag.B(0)) != "j" {
t.Errorf("Expected tag key 'j', got '%s'", unmarshaledTag.B(0))
}
if string(unmarshaledTag.B(1)) != jsonContent {
t.Errorf("Expected tag value '%s', got '%s'", jsonContent, unmarshaledTag.B(1))
}
})
// Test 2: Tag with a field containing escaped JSON with special characters
t.Run("EscapedJSONWithSpecialChars", func(t *testing.T) {
// JSON with characters that need escaping: quotes, backslashes, control chars
jsonContent := `{"text":"This has \"quotes\" and \\ backslashes","newlines":"\n\r\t"}`
// Create the event with the tag containing JSON with special chars
originalEvent := &E{
Id: bytes.Repeat([]byte{0x01}, 32),
Pubkey: bytes.Repeat([]byte{0x02}, 32),
CreatedAt: timestamp.FromUnix(1609459200),
Kind: kind.TextNote,
Tags: tags.New(),
Content: []byte("Event with JSON containing special chars in tag"),
Sig: bytes.Repeat([]byte{0x03}, 64),
}
// Add a tag with JSON content containing special chars
jsonTag := tag.New("j", jsonContent)
originalEvent.Tags.AppendTags(jsonTag)
// Marshal the event
marshaled := originalEvent.Marshal(nil)
// Unmarshal back into a new event
unmarshaledEvent := &E{}
_, err := unmarshaledEvent.Unmarshal(marshaled)
if err != nil {
t.Fatalf("Failed to unmarshal event with JSON containing special chars: %v", err)
}
// Verify the tag was correctly unmarshaled
unmarshaledTag := unmarshaledEvent.Tags.GetTagElement(0)
if string(unmarshaledTag.B(1)) != jsonContent {
t.Errorf("Expected tag value '%s', got '%s'", jsonContent, unmarshaledTag.B(1))
}
})
// Test 3: Tag with nested JSON that contains already escaped content
t.Run("NestedEscapedJSON", func(t *testing.T) {
// JSON with already escaped content
jsonContent := `{"escaped":"This JSON contains \\\"already escaped\\\" content"}`
// Create the event with the tag containing nested escaped JSON
originalEvent := &E{
Id: bytes.Repeat([]byte{0x01}, 32),
Pubkey: bytes.Repeat([]byte{0x02}, 32),
CreatedAt: timestamp.FromUnix(1609459200),
Kind: kind.TextNote,
Tags: tags.New(),
Content: []byte("Event with nested escaped JSON in tag"),
Sig: bytes.Repeat([]byte{0x03}, 64),
}
// Add a tag with nested escaped JSON content
jsonTag := tag.New("j", jsonContent)
originalEvent.Tags.AppendTags(jsonTag)
// Marshal the event
marshaled := originalEvent.Marshal(nil)
// Unmarshal back into a new event
unmarshaledEvent := &E{}
_, err := unmarshaledEvent.Unmarshal(marshaled)
if err != nil {
t.Fatalf("Failed to unmarshal event with nested escaped JSON: %v", err)
}
// Verify the tag was correctly unmarshaled
unmarshaledTag := unmarshaledEvent.Tags.GetTagElement(0)
if string(unmarshaledTag.B(1)) != jsonContent {
t.Errorf("Expected tag value '%s', got '%s'", jsonContent, unmarshaledTag.B(1))
}
})
// Test 4: Tag with JSON that has been explicitly escaped using NostrEscape
t.Run("ExplicitlyEscapedJSON", func(t *testing.T) {
// Original JSON with characters that need escaping
originalJSON := []byte(`{"key":"value with "quotes"","nested":{"array":[1,2,3],"special":"\n\r\t"}}`)
// Explicitly escape the JSON using NostrEscape
escapedJSON := make([]byte, 0, len(originalJSON)*2)
escapedJSON = text2.NostrEscape(escapedJSON, originalJSON)
// Create the event with the tag containing explicitly escaped JSON
originalEvent := &E{
Id: bytes.Repeat([]byte{0x01}, 32),
Pubkey: bytes.Repeat([]byte{0x02}, 32),
CreatedAt: timestamp.FromUnix(1609459200),
Kind: kind.TextNote,
Tags: tags.New(),
Content: []byte("Event with explicitly escaped JSON in tag"),
Sig: bytes.Repeat([]byte{0x03}, 64),
}
// Add a tag with the explicitly escaped JSON content
jsonTag := tag.New("j", string(escapedJSON))
originalEvent.Tags.AppendTags(jsonTag)
// Marshal the event
marshaled := originalEvent.Marshal(nil)
// Unmarshal back into a new event
unmarshaledEvent := &E{}
_, err := unmarshaledEvent.Unmarshal(marshaled)
if err != nil {
t.Fatalf("Failed to unmarshal event with explicitly escaped JSON: %v", err)
}
// Verify the tag was correctly unmarshaled
unmarshaledTag := unmarshaledEvent.Tags.GetTagElement(0)
if string(unmarshaledTag.B(1)) != string(escapedJSON) {
t.Errorf("Expected tag value '%s', got '%s'", string(escapedJSON), unmarshaledTag.B(1))
}
// Unescape the unmarshaled JSON to verify it matches the original
unescapedJSON := make([]byte, len(unmarshaledTag.B(1)))
copy(unescapedJSON, unmarshaledTag.B(1))
unescapedJSON = text2.NostrUnescape(unescapedJSON)
if string(unescapedJSON) != string(originalJSON) {
t.Errorf("Unescaped JSON doesn't match original. Expected '%s', got '%s'", string(originalJSON), string(unescapedJSON))
}
})
}
func TestUnmarshalTags(t *testing.T) {
// Test 1: Simple event with empty tags
t.Run(
"EmptyTags", func(t *testing.T) {
jsonWithEmptyTags := []byte(`{"id":"0101010101010101010101010101010101010101010101010101010101010101","pubkey":"0202020202020202020202020202020202020202020202020202020202020202","created_at":1609459200,"kind":1,"tags":[],"content":"This is a test event","sig":"03030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303"}`)
expected := &E{
Id: bytes.Repeat([]byte{0x01}, 32),
Pubkey: bytes.Repeat([]byte{0x02}, 32),
CreatedAt: timestamp.FromUnix(1609459200),
Kind: kind.TextNote,
Tags: tags.New(),
Content: []byte("This is a test event"),
Sig: bytes.Repeat([]byte{0x03}, 64),
}
actual := &E{}
_, err := actual.Unmarshal(jsonWithEmptyTags)
if err != nil {
t.Fatalf("Failed to unmarshal JSON with empty tags: %v", err)
}
compareTags(t, expected.Tags, actual.Tags, "EmptyTags")
},
)
// Test 2: Event with simple tags
t.Run(
"SimpleTags", func(t *testing.T) {
jsonWithSimpleTags := []byte(`{"id":"0101010101010101010101010101010101010101010101010101010101010101","pubkey":"0202020202020202020202020202020202020202020202020202020202020202","created_at":1609459200,"kind":1,"tags":[["e","1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef"],["p","abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890"]],"content":"This is a test event","sig":"03030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303"}`)
expected := &E{
Id: bytes.Repeat([]byte{0x01}, 32),
Pubkey: bytes.Repeat([]byte{0x02}, 32),
CreatedAt: timestamp.FromUnix(1609459200),
Kind: kind.TextNote,
Tags: tags.New(),
Content: []byte("This is a test event"),
Sig: bytes.Repeat([]byte{0x03}, 64),
}
// Add tags
eTag := tag.New(
"e",
"1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef",
)
pTag := tag.New(
"p",
"abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890",
)
expected.Tags.AppendTags(eTag, pTag)
actual := &E{}
_, err := actual.Unmarshal(jsonWithSimpleTags)
if err != nil {
t.Fatalf("Failed to unmarshal JSON with simple tags: %v", err)
}
compareTags(t, expected.Tags, actual.Tags, "SimpleTags")
},
)
// Test 3: Event with complex tags (more elements per tag)
t.Run(
"ComplexTags", func(t *testing.T) {
jsonWithComplexTags := []byte(`{"id":"0101010101010101010101010101010101010101010101010101010101010101","pubkey":"0202020202020202020202020202020202020202020202020202020202020202","created_at":1609459200,"kind":1,"tags":[["e","1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef","wss://relay.example.com","root"],["p","abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890","wss://relay.example.com"],["t","hashtag","topic"]],"content":"This is a test event","sig":"03030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303"}`)
expected := &E{
Id: bytes.Repeat([]byte{0x01}, 32),
Pubkey: bytes.Repeat([]byte{0x02}, 32),
CreatedAt: timestamp.FromUnix(1609459200),
Kind: kind.TextNote,
Tags: tags.New(),
Content: []byte("This is a test event"),
Sig: bytes.Repeat([]byte{0x03}, 64),
}
// Add tags
eTag := tag.New(
"e",
"1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef",
"wss://relay.example.com", "root",
)
pTag := tag.New(
"p",
"abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890",
"wss://relay.example.com",
)
tTag := tag.New("t", "hashtag", "topic")
expected.Tags.AppendTags(eTag, pTag, tTag)
actual := &E{}
_, err := actual.Unmarshal(jsonWithComplexTags)
if err != nil {
t.Fatalf("Failed to unmarshal JSON with complex tags: %v", err)
}
compareTags(t, expected.Tags, actual.Tags, "ComplexTags")
},
)
// Test 4: Test using the Unmarshal function (not the method)
t.Run(
"UnmarshalFunction", func(t *testing.T) {
jsonWithTags := []byte(`{
"id": "0101010101010101010101010101010101010101010101010101010101010101",
"pubkey": "0202020202020202020202020202020202020202020202020202020202020202",
"created_at": 1609459200,
"kind": 1,
"tags": [["e", "1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef"], ["p", "abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890"]],
"content": "This is a test event",
"sig": "03030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303"
}`)
expected := &E{
Id: bytes.Repeat([]byte{0x01}, 32),
Pubkey: bytes.Repeat([]byte{0x02}, 32),
CreatedAt: timestamp.FromUnix(1609459200),
Kind: kind.TextNote,
Tags: tags.New(),
Content: []byte("This is a test event"),
Sig: bytes.Repeat([]byte{0x03}, 64),
}
// Add tags
eTag := tag.New(
"e",
"1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef",
)
pTag := tag.New(
"p",
"abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890",
)
expected.Tags.AppendTags(eTag, pTag)
actual := &E{}
_, err := Unmarshal(actual, jsonWithTags)
if err != nil {
t.Fatalf(
"Failed to unmarshal JSON with tags using Unmarshal function: %v",
err,
)
}
compareTags(t, expected.Tags, actual.Tags, "UnmarshalFunction")
},
)
// Test 5: Event with nested empty tags
t.Run(
"NestedEmptyTags", func(t *testing.T) {
jsonWithNestedEmptyTags := []byte(`{
"id": "0101010101010101010101010101010101010101010101010101010101010101",
"pubkey": "0202020202020202020202020202020202020202020202020202020202020202",
"created_at": 1609459200,
"kind": 1,
"tags": [[], ["e"], ["p", ""]],
"content": "This is a test event",
"sig": "03030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303"
}`)
expected := &E{
Id: bytes.Repeat([]byte{0x01}, 32),
Pubkey: bytes.Repeat([]byte{0x02}, 32),
CreatedAt: timestamp.FromUnix(1609459200),
Kind: kind.TextNote,
Tags: tags.New(),
Content: []byte("This is a test event"),
Sig: bytes.Repeat([]byte{0x03}, 64),
}
// Add tags
emptyTag := tag.New[string]()
eTag := tag.New("e")
pTag := tag.New("p", "")
expected.Tags.AppendTags(emptyTag, eTag, pTag)
actual := &E{}
_, err := actual.Unmarshal(jsonWithNestedEmptyTags)
if err != nil {
t.Fatalf(
"Failed to unmarshal JSON with nested empty tags: %v", err,
)
}
compareTags(t, expected.Tags, actual.Tags, "NestedEmptyTags")
},
)
}

View File

@@ -378,6 +378,7 @@ var Map = map[uint16]string{
SearchRelaysList.K: "SearchRelaysList", SearchRelaysList.K: "SearchRelaysList",
InterestsList.K: "InterestsList", InterestsList.K: "InterestsList",
UserEmojiList.K: "UserEmojiList", UserEmojiList.K: "UserEmojiList",
DMRelaysList.K: "DMRelaysList",
FileStorageServerList.K: "FileStorageServerList", FileStorageServerList.K: "FileStorageServerList",
NWCWalletInfo.K: "NWCWalletInfo", NWCWalletInfo.K: "NWCWalletInfo",
LightningPubRPC.K: "LightningPubRPC", LightningPubRPC.K: "LightningPubRPC",

View File

@@ -20,12 +20,8 @@ type I interface {
authedPubkey []byte, remote string, authedPubkey []byte, remote string,
) (allowed *filters.T, accept bool, modified bool) ) (allowed *filters.T, accept bool, modified bool)
AddEvent( AddEvent(
c context.T, rl relay.I, ev *event.E, hr *http.Request, c context.T, rl relay.I, ev *event.E, hr *http.Request, origin string,
origin string, authedPubkey []byte, ) (accepted bool, message []byte)
) (
accepted bool,
message []byte,
)
Context() context.T Context() context.T
Publisher() *publish.S Publisher() *publish.S
Publish(c context.T, evt *event.E) (err error) Publish(c context.T, evt *event.E) (err error)

View File

@@ -0,0 +1,43 @@
package auth
import (
"bytes"
"orly.dev/pkg/encoders/event"
"orly.dev/pkg/encoders/hex"
"orly.dev/pkg/encoders/tag"
)
func CheckPrivilege(authedPubkey []byte, ev *event.E) (privileged bool) {
if ev.Kind.IsPrivileged() {
if len(authedPubkey) == 0 {
// this is a shortcut because none of the following
// tests would return true.
return
}
// authed users when auth is required must be present in the
// event if it is privileged.
privileged = bytes.Equal(ev.Pubkey, authedPubkey)
// if the authed pubkey matches the event author, it is
// allowed.
if !privileged {
// check whether one of the p (mention) tags is
// present designating the authed pubkey, as this means
// the author wants the designated pubkey to be able to
// access the event. this is the case for nip-4, nip-44
// DMs, and gift-wraps. The query would usually have
// been for precisely a p tag with their pubkey.
eTags := ev.Tags.GetAll(tag.New("p"))
var hexAuthedKey []byte
hex.EncAppend(hexAuthedKey, authedPubkey)
for _, e := range eTags.ToSliceOfTags() {
if bytes.Equal(e.Value(), hexAuthedKey) {
privileged = true
break
}
}
}
} else {
privileged = true
}
return
}

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,62 @@
package openapi
import (
"net/http"
"github.com/danielgtaylor/huma/v2"
"github.com/danielgtaylor/huma/v2/adapters/humago"
"orly.dev/pkg/protocol/servemux"
"orly.dev/pkg/utils/lol"
)
// ExposeMiddleware adds the http.Request and http.ResponseWriter to the context
// for the Operations handler.
func ExposeMiddleware(ctx huma.Context, next func(huma.Context)) {
lol.Tracer("ExposeMiddleware")
defer func() { lol.Tracer("end ExposeMiddleware") }()
// Unwrap the request and response objects.
r, w := humago.Unwrap(ctx)
ctx = huma.WithValue(ctx, "http-request", r)
ctx = huma.WithValue(ctx, "http-response", w)
next(ctx)
}
// NewHuma creates a new huma.API with a Scalar docs UI, and a middleware that allows methods to
// access the http.Request and http.ResponseWriter.
func NewHuma(
router *servemux.S, name, version, description string,
) (api huma.API) {
lol.Tracer("NewHuma", name, version, description)
defer func() { lol.Tracer("end NewHuma") }()
config := huma.DefaultConfig(name, version)
config.Info.Description = description
config.DocsPath = ""
config.OpenAPIPath = "/api/openapi"
router.HandleFunc(
"/api", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/html")
w.Write(
[]byte(`<!DOCTYPE html>
<html lang="en">
<head>
<title>realy HTTP API UI</title>
<meta charset="utf-8" />
<meta
name="viewport"
content="width=device-width, initial-scale=1" />
</head>
<body>
<script
id="api-reference"
data-url="/api/openapi.json"></script>
<script src="https://cdn.jsdelivr.net/npm/@scalar/api-reference"></script>
</body>
</html>`),
)
},
)
api = humago.New(router, config)
api.UseMiddleware(ExposeMiddleware)
return
}

View File

@@ -0,0 +1,27 @@
package openapi
import (
"github.com/danielgtaylor/huma/v2"
"orly.dev/pkg/interfaces/server"
"orly.dev/pkg/protocol/servemux"
"orly.dev/pkg/utils/lol"
)
type Operations struct {
server.I
path string
*servemux.S
}
// New creates a new openapi.Operations and registers its methods.
func New(
s server.I, name, version, description string, path string,
sm *servemux.S,
) {
lol.Tracer("New", name, version, description, path)
defer func() { lol.Tracer("end New") }()
a := NewHuma(sm, name, version, description)
huma.AutoRegister(a, &Operations{I: s, path: path})
return
}

View File

@@ -161,16 +161,6 @@ func (a *A) HandleEvent(
if len(split) != 3 { if len(split) != 3 {
continue continue
} }
// Check if the deletion event is trying to delete itself
if bytes.Equal(split[2], env.E.Id) {
if err = Ok.Blocked(
a, env,
"deletion event cannot reference its own ID",
); chk.E(err) {
return
}
return
}
var pk []byte var pk []byte
if pk, err = hex.DecAppend(nil, split[1]); chk.E(err) { if pk, err = hex.DecAppend(nil, split[1]); chk.E(err) {
if err = Ok.Invalid( if err = Ok.Invalid(
@@ -185,7 +175,8 @@ func (a *A) HandleEvent(
kin := ints.New(uint16(0)) kin := ints.New(uint16(0))
if _, err = kin.Unmarshal(split[0]); chk.E(err) { if _, err = kin.Unmarshal(split[0]); chk.E(err) {
if err = Ok.Invalid( if err = Ok.Invalid(
a, env, "delete event a tag kind value invalid: %s", a, env, "delete event a tag kind value "+
"invalid: %s",
t.Value(), t.Value(),
); chk.E(err) { ); chk.E(err) {
return return
@@ -195,7 +186,8 @@ func (a *A) HandleEvent(
kk := kind.New(kin.Uint16()) kk := kind.New(kin.Uint16())
if kk.Equal(kind.Deletion) { if kk.Equal(kind.Deletion) {
if err = Ok.Blocked( if err = Ok.Blocked(
a, env, "delete event kind may not be deleted", a, env, "delete event kind may not be "+
"deleted",
); chk.E(err) { ); chk.E(err) {
return return
} }
@@ -204,7 +196,8 @@ func (a *A) HandleEvent(
if !kk.IsParameterizedReplaceable() { if !kk.IsParameterizedReplaceable() {
if err = Ok.Error( if err = Ok.Error(
a, env, a, env,
"delete tags with a tags containing non-parameterized-replaceable events can't be processed", "delete tags with a tags containing "+
"non-parameterized-replaceable events can't be processed",
); chk.E(err) { ); chk.E(err) {
return return
} }
@@ -325,9 +318,7 @@ func (a *A) HandleEvent(
} }
} }
var reason []byte var reason []byte
ok, reason = srv.AddEvent( ok, reason = srv.AddEvent(c, rl, env.E, a.Req(), a.RealRemote())
c, rl, env.E, a.Req(), a.RealRemote(), a.Listener.AuthedPubkey(),
)
log.I.F("event %0x added %v, %s", env.E.Id, ok, reason) log.I.F("event %0x added %v, %s", env.E.Id, ok, reason)
if err = okenvelope.NewFrom(env.E.Id, ok).Write(a.Listener); chk.E(err) { if err = okenvelope.NewFrom(env.E.Id, ok).Write(a.Listener); chk.E(err) {
return return

View File

@@ -26,6 +26,7 @@ import (
// corresponding handler method, generates a notice for errors or unknown types, // corresponding handler method, generates a notice for errors or unknown types,
// logs the notice, and writes it back to the listener if required. // logs the notice, and writes it back to the listener if required.
func (a *A) HandleMessage(msg []byte) { func (a *A) HandleMessage(msg []byte) {
log.T.F("%s received message:\n%s", a.Listener.RealRemote(), string(msg))
var notice []byte var notice []byte
var err error var err error
var t string var t string

View File

@@ -1,7 +1,6 @@
package socketapi package socketapi
import ( import (
"bytes"
"errors" "errors"
"github.com/dgraph-io/badger/v4" "github.com/dgraph-io/badger/v4"
"orly.dev/pkg/encoders/envelopes/closedenvelope" "orly.dev/pkg/encoders/envelopes/closedenvelope"
@@ -9,9 +8,8 @@ import (
"orly.dev/pkg/encoders/envelopes/eventenvelope" "orly.dev/pkg/encoders/envelopes/eventenvelope"
"orly.dev/pkg/encoders/envelopes/reqenvelope" "orly.dev/pkg/encoders/envelopes/reqenvelope"
"orly.dev/pkg/encoders/event" "orly.dev/pkg/encoders/event"
"orly.dev/pkg/encoders/hex"
"orly.dev/pkg/encoders/tag"
"orly.dev/pkg/interfaces/server" "orly.dev/pkg/interfaces/server"
"orly.dev/pkg/protocol/auth"
"orly.dev/pkg/utils/chk" "orly.dev/pkg/utils/chk"
"orly.dev/pkg/utils/context" "orly.dev/pkg/utils/context"
"orly.dev/pkg/utils/log" "orly.dev/pkg/utils/log"
@@ -80,8 +78,7 @@ func (a *A) HandleReq(
continue continue
} }
} }
if events, err = sto.QueryEvents(c, f); chk.E(err) { if events, err = sto.QueryEvents(c, f); err != nil {
log.E.F("eventstore: %v", err)
if errors.Is(err, badger.ErrDBClosed) { if errors.Is(err, badger.ErrDBClosed) {
return return
} }
@@ -91,39 +88,14 @@ func (a *A) HandleReq(
if srv.AuthRequired() { if srv.AuthRequired() {
var tmp event.S var tmp event.S
for _, ev := range events { for _, ev := range events {
if ev.Kind.IsPrivileged() { if !auth.CheckPrivilege(a.Listener.AuthedPubkey(), ev) {
authedPubkey := a.Listener.AuthedPubkey() log.W.F(
if len(authedPubkey) == 0 { "not privileged %0x ev pubkey %0x ev pubkey %0x kind %s privileged: %v",
// this is a shortcut because none of the following a.Listener.AuthedPubkey(), ev.Pubkey,
// tests would return true. a.Listener.AuthedPubkey(), ev.Kind.Name(),
continue ev.Kind.IsPrivileged(),
} )
// authed users when auth is required must be present in the continue
// event if it is privileged.
authedIsAuthor := bytes.Equal(ev.Pubkey, authedPubkey)
// if the authed pubkey matches the event author, it is
// allowed.
if !authedIsAuthor {
// check whether one of the p (mention) tags is
// present designating the authed pubkey, as this means
// the author wants the designated pubkey to be able to
// access the event. this is the case for nip-4, nip-44
// DMs, and gift-wraps. The query would usually have
// been for precisely a p tag with their pubkey.
eTags := ev.Tags.GetAll(tag.New("p"))
var hexAuthedKey []byte
hex.EncAppend(hexAuthedKey, authedPubkey)
var authedIsMentioned bool
for _, e := range eTags.ToSliceOfTags() {
if bytes.Equal(e.Value(), hexAuthedKey) {
authedIsMentioned = true
break
}
}
if !authedIsMentioned {
continue
}
}
} }
tmp = append(tmp, ev) tmp = append(tmp, ev)
} }

View File

@@ -5,19 +5,16 @@ import (
"orly.dev/pkg/encoders/event" "orly.dev/pkg/encoders/event"
"orly.dev/pkg/encoders/filters" "orly.dev/pkg/encoders/filters"
"orly.dev/pkg/interfaces/publisher" "orly.dev/pkg/interfaces/publisher"
"orly.dev/pkg/interfaces/server"
"orly.dev/pkg/protocol/auth"
"orly.dev/pkg/protocol/ws" "orly.dev/pkg/protocol/ws"
"orly.dev/pkg/utils/chk" "orly.dev/pkg/utils/chk"
"orly.dev/pkg/utils/log" "orly.dev/pkg/utils/log"
"regexp"
"sync" "sync"
) )
const Type = "socketapi" const Type = "socketapi"
var (
NIP20prefixmatcher = regexp.MustCompile(`^\w+: `)
)
// Map is a map of filters associated with a collection of ws.Listener // Map is a map of filters associated with a collection of ws.Listener
// connections. // connections.
type Map map[*ws.Listener]map[string]*filters.T type Map map[*ws.Listener]map[string]*filters.T
@@ -57,11 +54,13 @@ type S struct {
Mx sync.Mutex Mx sync.Mutex
// Map is the map of subscribers and subscriptions from the websocket api. // Map is the map of subscribers and subscriptions from the websocket api.
Map Map
// Server is an interface to the server.
Server server.I
} }
var _ publisher.I = &S{} var _ publisher.I = &S{}
func New() (publisher *S) { return &S{Map: make(Map)} } func New(s server.I) (publisher *S) { return &S{Map: make(Map), Server: s} }
func (p *S) Type() (typeName string) { return Type } func (p *S) Type() (typeName string) { return Type }
@@ -98,6 +97,7 @@ func (p *S) Receive(msg publisher.Message) {
return return
} }
p.Mx.Lock() p.Mx.Lock()
defer p.Mx.Unlock()
if subs, ok := p.Map[m.Listener]; !ok { if subs, ok := p.Map[m.Listener]; !ok {
subs = make(map[string]*filters.T) subs = make(map[string]*filters.T)
subs[m.Id] = m.Filters subs[m.Id] = m.Filters
@@ -112,54 +112,56 @@ func (p *S) Receive(msg publisher.Message) {
"added subscription %s for %s", m.Id, m.Listener.RealRemote(), "added subscription %s for %s", m.Id, m.Listener.RealRemote(),
) )
} }
p.Mx.Unlock()
} }
} }
// Deliver sends an event to all subscribers whose filters match the event // Deliver processes and distributes an event to all matching subscribers based on their filter configurations.
// //
// # Parameters // # Parameters
// //
// - ev (*event.E): The event to deliver to matching subscribers // - ev (*event.E): The event to be delivered to subscribed clients.
// //
// # Expected behaviour // # Expected behaviour
// //
// # Locks the mutex to synchronize access to subscriber data // Delivers the event to all subscribers whose filters match the event. It
// // applies authentication checks if required by the server, and skips delivery
// # Iterates over all websocket connections and their associated subscriptions // for unauthenticated users when events are privileged.
//
// # Checks if each subscription's filter matches the event being delivered
//
// # Creates an event envelope result for matching subscriptions
//
// # Writes the result to the corresponding websocket connection
//
// Logs details about event delivery and any errors encountered
func (p *S) Deliver(ev *event.E) { func (p *S) Deliver(ev *event.E) {
log.T.F("delivering event %0x to subscribers", ev.Id) log.T.F("delivering event %0x to subscribers", ev.Id)
var err error var err error
p.Mx.Lock() p.Mx.Lock()
defer p.Mx.Unlock()
for w, subs := range p.Map { for w, subs := range p.Map {
log.I.F("%v %s", subs, w.RealRemote()) // log.I.F("%v %s", subs, w.RealRemote())
for id, subscriber := range subs { for id, subscriber := range subs {
log.T.F( // log.T.F(
"subscriber %s\n%s", w.RealRemote(), // "subscriber %s\n%s", w.RealRemote(),
subscriber.Marshal(nil), // subscriber.Marshal(nil),
) // )
if !subscriber.Match(ev) { if !subscriber.Match(ev) {
continue continue
} }
var res *eventenvelope.Result if p.Server.AuthRequired() {
if res, err = eventenvelope.NewResultWith(id, ev); chk.E(err) { if !auth.CheckPrivilege(w.AuthedPubkey(), ev) {
continue log.W.F(
"not privileged %0x ev pubkey %0x ev pubkey %0x kind %s privileged: %v",
w.AuthedPubkey(), ev.Pubkey,
w.AuthedPubkey(), ev.Kind.Name(),
ev.Kind.IsPrivileged(),
)
continue
}
var res *eventenvelope.Result
if res, err = eventenvelope.NewResultWith(id, ev); chk.E(err) {
continue
}
if err = res.Write(w); chk.E(err) {
continue
}
log.T.F("dispatched event %0x to subscription %s", ev.Id, id)
} }
if err = res.Write(w); chk.E(err) {
continue
}
log.T.F("dispatched event %0x to subscription %s", ev.Id, id)
} }
} }
p.Mx.Unlock()
} }
// removeSubscriberId removes a specific subscription from a subscriber // removeSubscriberId removes a specific subscription from a subscriber

View File

@@ -234,9 +234,10 @@ func (r *Client) ConnectWithTLS(ctx context.T, tlsConfig *tls.Config) error {
// general message reader loop // general message reader loop
go func() { go func() {
buf := new(bytes.Buffer) buf := new(bytes.Buffer)
var err error
for { for {
buf.Reset() buf.Reset()
if err := conn.ReadMessage(r.connectionContext, buf); chk.T(err) { if err = conn.ReadMessage(r.connectionContext, buf); err != nil {
r.ConnectionError = err r.ConnectionError = err
r.Close() r.Close()
break break
@@ -270,10 +271,12 @@ func (r *Client) ConnectWithTLS(ctx context.T, tlsConfig *tls.Config) error {
} }
r.challenge = env.Challenge r.challenge = env.Challenge
case eventenvelope.L: case eventenvelope.L:
// log.I.F("message: %s", message)
env := eventenvelope.NewResult() env := eventenvelope.NewResult()
if env, message, err = eventenvelope.ParseResult(message); chk.E(err) { if env, message, err = eventenvelope.ParseResult(message); err != nil {
continue continue
} }
// log.I.F("%s", env.Event.Marshal(nil))
if len(env.Subscription.T) == 0 { if len(env.Subscription.T) == 0 {
continue continue
} }
@@ -497,12 +500,13 @@ func (r *Client) PrepareSubscription(
return sub return sub
} }
// QuerySync is only used in tests. The realy query method is synchronous now // QuerySync is only used in tests. The relay query method is synchronous now
// anyway (it ensures sort order is respected). // anyway (it ensures sort order is respected).
func (r *Client) QuerySync( func (r *Client) QuerySync(
ctx context.T, f *filter.F, ctx context.T, f *filter.F,
opts ...SubscriptionOption, opts ...SubscriptionOption,
) ([]*event.E, error) { ) ([]*event.E, error) {
// log.T.F("QuerySync:\n%s", f.Marshal(nil))
sub, err := r.Subscribe(ctx, filters.New(f), opts...) sub, err := r.Subscribe(ctx, filters.New(f), opts...)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@@ -4,7 +4,7 @@ import (
"bytes" "bytes"
"compress/flate" "compress/flate"
"crypto/tls" "crypto/tls"
"errors" "fmt"
"github.com/gobwas/httphead" "github.com/gobwas/httphead"
"github.com/gobwas/ws" "github.com/gobwas/ws"
"github.com/gobwas/ws/wsflate" "github.com/gobwas/ws/wsflate"
@@ -113,7 +113,10 @@ func NewConnection(
func (cn *Connection) WriteMessage(c context.T, data []byte) (err error) { func (cn *Connection) WriteMessage(c context.T, data []byte) (err error) {
select { select {
case <-c.Done(): case <-c.Done():
return errors.New("context canceled") return errorf.E(
"%s context canceled",
cn.conn.RemoteAddr(),
)
default: default:
} }
if cn.msgStateW.IsCompressed() && cn.enableCompression { if cn.msgStateW.IsCompressed() && cn.enableCompression {
@@ -121,19 +124,35 @@ func (cn *Connection) WriteMessage(c context.T, data []byte) (err error) {
if _, err := io.Copy( if _, err := io.Copy(
cn.flateWriter, bytes.NewReader(data), cn.flateWriter, bytes.NewReader(data),
); chk.T(err) { ); chk.T(err) {
return errorf.E("failed to write message: %w", err) return errorf.E(
"%s failed to write message: %w",
cn.conn.RemoteAddr(),
err,
)
} }
if err := cn.flateWriter.Close(); chk.T(err) { if err := cn.flateWriter.Close(); chk.T(err) {
return errorf.E("failed to close flate writer: %w", err) return errorf.E(
"%s failed to close flate writer: %w",
cn.conn.RemoteAddr(),
err,
)
} }
} else { } else {
if _, err := io.Copy(cn.writer, bytes.NewReader(data)); chk.T(err) { if _, err := io.Copy(cn.writer, bytes.NewReader(data)); chk.T(err) {
return errorf.E("failed to write message: %w", err) return errorf.E(
"%s failed to write message: %w",
cn.conn.RemoteAddr(),
err,
)
} }
} }
if err := cn.writer.Flush(); chk.T(err) { if err := cn.writer.Flush(); chk.T(err) {
return errorf.E("failed to flush writer: %w", err) return errorf.E(
"%s failed to flush writer: %w",
cn.conn.RemoteAddr(),
err,
)
} }
return nil return nil
} }
@@ -143,34 +162,57 @@ func (cn *Connection) ReadMessage(c context.T, buf io.Writer) (err error) {
for { for {
select { select {
case <-c.Done(): case <-c.Done():
return errors.New("context canceled") return errorf.D(
"%s context canceled",
cn.conn.RemoteAddr(),
)
default: default:
} }
h, err := cn.reader.NextFrame() h, err := cn.reader.NextFrame()
if err != nil { if err != nil {
cn.conn.Close() cn.conn.Close()
return errorf.E("failed to advance frame: %w", err) return fmt.Errorf(
"%s failed to advance frame: %s",
cn.conn.RemoteAddr(),
err.Error(),
)
} }
if h.OpCode.IsControl() { if h.OpCode.IsControl() {
if err := cn.controlHandler(h, cn.reader); chk.T(err) { if err := cn.controlHandler(h, cn.reader); chk.T(err) {
return errorf.E("failed to handle control frame: %w", err) return errorf.E(
"%s failed to handle control frame: %w",
cn.conn.RemoteAddr(),
err,
)
} }
} else if h.OpCode == ws.OpBinary || } else if h.OpCode == ws.OpBinary ||
h.OpCode == ws.OpText { h.OpCode == ws.OpText {
break break
} }
if err := cn.reader.Discard(); chk.T(err) { if err := cn.reader.Discard(); chk.T(err) {
return errorf.E("failed to discard: %w", err) return errorf.E(
"%s failed to discard: %w",
cn.conn.RemoteAddr(),
err,
)
} }
} }
if cn.msgStateR.IsCompressed() && cn.enableCompression { if cn.msgStateR.IsCompressed() && cn.enableCompression {
cn.flateReader.Reset(cn.reader) cn.flateReader.Reset(cn.reader)
if _, err := io.Copy(buf, cn.flateReader); chk.T(err) { if _, err := io.Copy(buf, cn.flateReader); chk.T(err) {
return errorf.E("failed to read message: %w", err) return errorf.E(
"%s failed to read message: %w",
cn.conn.RemoteAddr(),
err,
)
} }
} else { } else {
if _, err := io.Copy(buf, cn.reader); chk.T(err) { if _, err := io.Copy(buf, cn.reader); chk.T(err) {
return errorf.E("failed to read message: %w", err) return errorf.E(
"%s failed to read message: %w",
cn.conn.RemoteAddr(),
err,
)
} }
} }
return nil return nil

View File

@@ -10,7 +10,6 @@ import (
"orly.dev/pkg/utils/chk" "orly.dev/pkg/utils/chk"
"orly.dev/pkg/utils/context" "orly.dev/pkg/utils/context"
"orly.dev/pkg/utils/errorf" "orly.dev/pkg/utils/errorf"
"orly.dev/pkg/utils/log"
"strconv" "strconv"
"sync" "sync"
"sync/atomic" "sync/atomic"
@@ -158,7 +157,6 @@ func (sub *Subscription) Close() {
closeMsg := closeenvelope.NewFrom(id) closeMsg := closeenvelope.NewFrom(id)
var b []byte var b []byte
b = closeMsg.Marshal(nil) b = closeMsg.Marshal(nil)
log.T.F("{%s} sending %s", sub.Relay.URL, b)
<-sub.Relay.Write(b) <-sub.Relay.Write(b)
} }
} }