Compare commits
15 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
9b7e8d28de
|
|||
|
c16ee76638
|
|||
|
132fdc9f36
|
|||
|
4f1d48c247
|
|||
|
651791aec1
|
|||
|
53d649c64e
|
|||
|
4dafab3fd6
|
|||
|
f2475c48b7
|
|||
|
b5448f4153
|
|||
|
11d318d4e3
|
|||
|
53e8e160dd
|
|||
|
90c9198ebe
|
|||
|
4bbbbb1bb6
|
|||
|
56ab6eaa81
|
|||
|
e3c931fcf9
|
1
main.go
1
main.go
@@ -42,7 +42,6 @@ func main() {
|
|||||||
config.PrintHelp(cfg, os.Stderr)
|
config.PrintHelp(cfg, os.Stderr)
|
||||||
os.Exit(0)
|
os.Exit(0)
|
||||||
}
|
}
|
||||||
log.I.Ln("log level", cfg.LogLevel)
|
|
||||||
lol.SetLogLevel(cfg.LogLevel)
|
lol.SetLogLevel(cfg.LogLevel)
|
||||||
if cfg.Pprof {
|
if cfg.Pprof {
|
||||||
defer profile.Start(profile.MemProfile).Stop()
|
defer profile.Start(profile.MemProfile).Stop()
|
||||||
|
|||||||
@@ -87,7 +87,6 @@ func New() (cfg *C, err error) {
|
|||||||
lol.SetLogLevel(cfg.LogLevel)
|
lol.SetLogLevel(cfg.LogLevel)
|
||||||
log.I.F("loaded configuration from %s", envPath)
|
log.I.F("loaded configuration from %s", envPath)
|
||||||
}
|
}
|
||||||
log.I.S(cfg)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -3,16 +3,20 @@ package relay
|
|||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"regexp"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"orly.dev/pkg/encoders/event"
|
"orly.dev/pkg/encoders/event"
|
||||||
"orly.dev/pkg/interfaces/relay"
|
"orly.dev/pkg/interfaces/relay"
|
||||||
"orly.dev/pkg/interfaces/store"
|
"orly.dev/pkg/interfaces/store"
|
||||||
"orly.dev/pkg/protocol/socketapi"
|
|
||||||
"orly.dev/pkg/utils/context"
|
"orly.dev/pkg/utils/context"
|
||||||
"orly.dev/pkg/utils/normalize"
|
"orly.dev/pkg/utils/normalize"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
NIP20prefixmatcher = regexp.MustCompile(`^\w+: `)
|
||||||
|
)
|
||||||
|
|
||||||
// AddEvent processes an incoming event, saves it if valid, and delivers it to
|
// AddEvent processes an incoming event, saves it if valid, and delivers it to
|
||||||
// subscribers.
|
// subscribers.
|
||||||
//
|
//
|
||||||
@@ -50,9 +54,7 @@ import (
|
|||||||
// - Returns a boolean indicating whether the event was accepted and any
|
// - Returns a boolean indicating whether the event was accepted and any
|
||||||
// relevant message.
|
// relevant message.
|
||||||
func (s *Server) AddEvent(
|
func (s *Server) AddEvent(
|
||||||
c context.T, rl relay.I, ev *event.E,
|
c context.T, rl relay.I, ev *event.E, hr *http.Request, origin string,
|
||||||
hr *http.Request, origin string,
|
|
||||||
authedPubkey []byte,
|
|
||||||
) (accepted bool, message []byte) {
|
) (accepted bool, message []byte) {
|
||||||
|
|
||||||
if ev == nil {
|
if ev == nil {
|
||||||
@@ -65,9 +67,12 @@ func (s *Server) AddEvent(
|
|||||||
return false, []byte(saveErr.Error())
|
return false, []byte(saveErr.Error())
|
||||||
}
|
}
|
||||||
errmsg := saveErr.Error()
|
errmsg := saveErr.Error()
|
||||||
if socketapi.NIP20prefixmatcher.MatchString(errmsg) {
|
if NIP20prefixmatcher.MatchString(errmsg) {
|
||||||
if strings.Contains(errmsg, "tombstone") {
|
if strings.Contains(errmsg, "tombstone") {
|
||||||
return false, normalize.Error.F("event was deleted, not storing it again")
|
return false, normalize.Error.F(
|
||||||
|
"%s event was deleted, not storing it again",
|
||||||
|
origin,
|
||||||
|
)
|
||||||
}
|
}
|
||||||
if strings.HasPrefix(errmsg, string(normalize.Blocked)) {
|
if strings.HasPrefix(errmsg, string(normalize.Blocked)) {
|
||||||
return false, []byte(errmsg)
|
return false, []byte(errmsg)
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"orly.dev/pkg/protocol/socketapi"
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -15,7 +16,6 @@ import (
|
|||||||
"orly.dev/pkg/app/relay/publish"
|
"orly.dev/pkg/app/relay/publish"
|
||||||
"orly.dev/pkg/interfaces/relay"
|
"orly.dev/pkg/interfaces/relay"
|
||||||
"orly.dev/pkg/protocol/servemux"
|
"orly.dev/pkg/protocol/servemux"
|
||||||
"orly.dev/pkg/protocol/socketapi"
|
|
||||||
"orly.dev/pkg/utils/chk"
|
"orly.dev/pkg/utils/chk"
|
||||||
"orly.dev/pkg/utils/context"
|
"orly.dev/pkg/utils/context"
|
||||||
"orly.dev/pkg/utils/log"
|
"orly.dev/pkg/utils/log"
|
||||||
@@ -90,15 +90,15 @@ func NewServer(sp *ServerParams, opts ...options.O) (s *Server, err error) {
|
|||||||
}
|
}
|
||||||
serveMux := servemux.NewServeMux()
|
serveMux := servemux.NewServeMux()
|
||||||
s = &Server{
|
s = &Server{
|
||||||
Ctx: sp.Ctx,
|
Ctx: sp.Ctx,
|
||||||
Cancel: sp.Cancel,
|
Cancel: sp.Cancel,
|
||||||
relay: sp.Rl,
|
relay: sp.Rl,
|
||||||
mux: serveMux,
|
mux: serveMux,
|
||||||
options: op,
|
options: op,
|
||||||
listeners: publish.New(socketapi.New()),
|
C: sp.C,
|
||||||
C: sp.C,
|
Lists: new(Lists),
|
||||||
Lists: new(Lists),
|
|
||||||
}
|
}
|
||||||
|
s.listeners = publish.New(socketapi.New(s))
|
||||||
go func() {
|
go func() {
|
||||||
if err := s.relay.Init(); chk.E(err) {
|
if err := s.relay.Init(); chk.E(err) {
|
||||||
s.Shutdown()
|
s.Shutdown()
|
||||||
@@ -191,7 +191,7 @@ func (s *Server) Start(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
// start up a spider run to trigger every 30 minutes
|
// start up a spider run to trigger every 30 minutes
|
||||||
ticker := time.NewTicker(30 * time.Minute)
|
ticker := time.NewTicker(time.Hour)
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
|||||||
@@ -5,7 +5,6 @@ import (
|
|||||||
"orly.dev/pkg/encoders/event"
|
"orly.dev/pkg/encoders/event"
|
||||||
"orly.dev/pkg/encoders/filter"
|
"orly.dev/pkg/encoders/filter"
|
||||||
"orly.dev/pkg/encoders/hex"
|
"orly.dev/pkg/encoders/hex"
|
||||||
"orly.dev/pkg/encoders/kind"
|
|
||||||
"orly.dev/pkg/encoders/kinds"
|
"orly.dev/pkg/encoders/kinds"
|
||||||
"orly.dev/pkg/encoders/tag"
|
"orly.dev/pkg/encoders/tag"
|
||||||
"orly.dev/pkg/protocol/ws"
|
"orly.dev/pkg/protocol/ws"
|
||||||
@@ -13,15 +12,16 @@ import (
|
|||||||
"orly.dev/pkg/utils/context"
|
"orly.dev/pkg/utils/context"
|
||||||
"orly.dev/pkg/utils/log"
|
"orly.dev/pkg/utils/log"
|
||||||
"sort"
|
"sort"
|
||||||
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (s *Server) SpiderFetch(
|
func (s *Server) SpiderFetch(
|
||||||
k *kind.T, noFetch bool, pubkeys ...[]byte,
|
k *kinds.T, noFetch, noExtract bool, pubkeys ...[]byte,
|
||||||
) (pks [][]byte, err error) {
|
) (pks [][]byte, err error) {
|
||||||
// first search the local database
|
// first search the local database
|
||||||
pkList := tag.New(pubkeys...)
|
pkList := tag.New(pubkeys...)
|
||||||
f := &filter.F{
|
f := &filter.F{
|
||||||
Kinds: kinds.New(k),
|
Kinds: k,
|
||||||
Authors: pkList,
|
Authors: pkList,
|
||||||
}
|
}
|
||||||
var evs event.S
|
var evs event.S
|
||||||
@@ -29,54 +29,83 @@ func (s *Server) SpiderFetch(
|
|||||||
// none were found, so we need to scan the spiders
|
// none were found, so we need to scan the spiders
|
||||||
err = nil
|
err = nil
|
||||||
}
|
}
|
||||||
if len(evs) < len(pubkeys) && !noFetch {
|
var kindsList string
|
||||||
|
for i, kk := range k.K {
|
||||||
|
if i > 0 {
|
||||||
|
kindsList += ","
|
||||||
|
}
|
||||||
|
kindsList += kk.Name()
|
||||||
|
}
|
||||||
|
log.I.F("%d events found of type %s", len(evs), kindsList)
|
||||||
|
// for _, ev := range evs {
|
||||||
|
// o += fmt.Sprintf("%s\n\n", ev.Marshal(nil))
|
||||||
|
// }
|
||||||
|
// log.I.F("%s", o)
|
||||||
|
if !noFetch {
|
||||||
// we need to search the spider seeds.
|
// we need to search the spider seeds.
|
||||||
// Break up pubkeys into batches of 512
|
// Break up pubkeys into batches of 128
|
||||||
log.I.F("breaking up %d pubkeys into batches of 512", len(pubkeys))
|
for i := 0; i < len(pubkeys); i += 128 {
|
||||||
for i := 0; i < len(pubkeys); i += 512 {
|
end := i + 128
|
||||||
end := i + 512
|
|
||||||
if end > len(pubkeys) {
|
if end > len(pubkeys) {
|
||||||
end = len(pubkeys)
|
end = len(pubkeys)
|
||||||
}
|
}
|
||||||
batchPubkeys := pubkeys[i:end]
|
batchPubkeys := pubkeys[i:end]
|
||||||
log.I.F(
|
log.I.F(
|
||||||
"processing batch %d to %d of %d pubkeys", i, end, len(pubkeys),
|
"processing batch %d to %d of %d for kind %s",
|
||||||
|
i, end, len(pubkeys), kindsList,
|
||||||
)
|
)
|
||||||
batchPkList := tag.New(batchPubkeys...)
|
batchPkList := tag.New(batchPubkeys...)
|
||||||
|
lim := uint(batchPkList.Len())
|
||||||
batchFilter := &filter.F{
|
batchFilter := &filter.F{
|
||||||
Kinds: kinds.New(k),
|
Kinds: k,
|
||||||
Authors: batchPkList,
|
Authors: batchPkList,
|
||||||
|
Limit: &lim,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var mx sync.Mutex
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
for _, seed := range s.C.SpiderSeeds {
|
for _, seed := range s.C.SpiderSeeds {
|
||||||
select {
|
wg.Add(1)
|
||||||
case <-s.Ctx.Done():
|
go func() {
|
||||||
return
|
defer wg.Done()
|
||||||
default:
|
select {
|
||||||
}
|
case <-s.Ctx.Done():
|
||||||
var evss event.S
|
return
|
||||||
var cli *ws.Client
|
default:
|
||||||
if cli, err = ws.RelayConnect(context.Bg(), seed); chk.E(err) {
|
}
|
||||||
err = nil
|
var evss event.S
|
||||||
continue
|
var cli *ws.Client
|
||||||
}
|
if cli, err = ws.RelayConnect(
|
||||||
if evss, err = cli.QuerySync(
|
context.Bg(), seed,
|
||||||
context.Bg(), batchFilter,
|
); chk.E(err) {
|
||||||
); chk.E(err) {
|
err = nil
|
||||||
err = nil
|
return
|
||||||
continue
|
}
|
||||||
}
|
if evss, err = cli.QuerySync(
|
||||||
for _, ev := range evss {
|
context.Bg(), batchFilter,
|
||||||
evs = append(evs, ev)
|
); chk.E(err) {
|
||||||
}
|
err = nil
|
||||||
}
|
return
|
||||||
}
|
}
|
||||||
// save the events to the database
|
mx.Lock()
|
||||||
for _, ev := range evs {
|
// save the events to the database
|
||||||
if _, _, err = s.Storage().SaveEvent(s.Ctx, ev); chk.E(err) {
|
for _, ev := range evss {
|
||||||
err = nil
|
log.I.F("saving event:\n%s", ev.Marshal(nil))
|
||||||
continue
|
if _, _, err = s.Storage().SaveEvent(
|
||||||
|
s.Ctx, ev,
|
||||||
|
); chk.E(err) {
|
||||||
|
err = nil
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, ev := range evss {
|
||||||
|
evs = append(evs, ev)
|
||||||
|
}
|
||||||
|
mx.Unlock()
|
||||||
|
}()
|
||||||
}
|
}
|
||||||
|
wg.Wait()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// deduplicate and take the newest
|
// deduplicate and take the newest
|
||||||
@@ -95,7 +124,10 @@ func (s *Server) SpiderFetch(
|
|||||||
tmp = append(tmp, evm[0])
|
tmp = append(tmp, evm[0])
|
||||||
}
|
}
|
||||||
evs = tmp
|
evs = tmp
|
||||||
// we have all we're going to get now
|
// we have all we're going to get now, extract the p tags
|
||||||
|
if noExtract {
|
||||||
|
return
|
||||||
|
}
|
||||||
pkMap := make(map[string]struct{})
|
pkMap := make(map[string]struct{})
|
||||||
for _, ev := range evs {
|
for _, ev := range evs {
|
||||||
t := ev.Tags.GetAll(tag.New("p"))
|
t := ev.Tags.GetAll(tag.New("p"))
|
||||||
@@ -105,7 +137,7 @@ func (s *Server) SpiderFetch(
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
pk := make([]byte, schnorr.PubKeyBytesLen)
|
pk := make([]byte, schnorr.PubKeyBytesLen)
|
||||||
if _, err = hex.DecBytes(pk, pkh); chk.E(err) {
|
if _, err = hex.DecBytes(pk, pkh); err != nil {
|
||||||
err = nil
|
err = nil
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ import (
|
|||||||
"orly.dev/pkg/encoders/bech32encoding"
|
"orly.dev/pkg/encoders/bech32encoding"
|
||||||
"orly.dev/pkg/encoders/hex"
|
"orly.dev/pkg/encoders/hex"
|
||||||
"orly.dev/pkg/encoders/kind"
|
"orly.dev/pkg/encoders/kind"
|
||||||
|
"orly.dev/pkg/encoders/kinds"
|
||||||
"orly.dev/pkg/utils/chk"
|
"orly.dev/pkg/utils/chk"
|
||||||
"orly.dev/pkg/utils/log"
|
"orly.dev/pkg/utils/log"
|
||||||
)
|
)
|
||||||
@@ -44,90 +45,91 @@ func (s *Server) Spider(noFetch ...bool) (err error) {
|
|||||||
// there is no OwnersPubkeys, so there is nothing to do.
|
// there is no OwnersPubkeys, so there is nothing to do.
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
dontFetch := false
|
go func() {
|
||||||
if len(noFetch) > 0 && noFetch[0] {
|
dontFetch := false
|
||||||
dontFetch = true
|
if len(noFetch) > 0 && noFetch[0] {
|
||||||
}
|
dontFetch = true
|
||||||
log.I.F("getting ownersFollowed")
|
|
||||||
var ownersFollowed [][]byte
|
|
||||||
if ownersFollowed, err = s.SpiderFetch(
|
|
||||||
kind.FollowList, dontFetch, ownersPubkeys...,
|
|
||||||
); chk.E(err) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
log.I.F("getting followedFollows")
|
|
||||||
var followedFollows [][]byte
|
|
||||||
if followedFollows, err = s.SpiderFetch(
|
|
||||||
kind.FollowList, dontFetch, ownersFollowed...,
|
|
||||||
); chk.E(err) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
log.I.F("getting ownersMuted")
|
|
||||||
var ownersMuted [][]byte
|
|
||||||
if ownersMuted, err = s.SpiderFetch(
|
|
||||||
kind.MuteList, dontFetch, ownersPubkeys...,
|
|
||||||
); chk.E(err) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// remove the ownersFollowed and ownersMuted items from the followedFollows
|
|
||||||
// list
|
|
||||||
filteredFollows := make([][]byte, 0, len(followedFollows))
|
|
||||||
for _, follow := range followedFollows {
|
|
||||||
found := false
|
|
||||||
for _, owner := range ownersFollowed {
|
|
||||||
if bytes.Equal(follow, owner) {
|
|
||||||
found = true
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
for _, owner := range ownersMuted {
|
log.I.F("getting ownersFollowed")
|
||||||
if bytes.Equal(follow, owner) {
|
var ownersFollowed [][]byte
|
||||||
found = true
|
if ownersFollowed, err = s.SpiderFetch(
|
||||||
break
|
kinds.New(kind.FollowList), dontFetch, false, ownersPubkeys...,
|
||||||
}
|
); chk.E(err) {
|
||||||
|
return
|
||||||
}
|
}
|
||||||
if !found {
|
// log.I.S(ownersFollowed)
|
||||||
|
log.I.F("getting followedFollows")
|
||||||
|
var followedFollows [][]byte
|
||||||
|
if followedFollows, err = s.SpiderFetch(
|
||||||
|
kinds.New(kind.FollowList), dontFetch, false, ownersFollowed...,
|
||||||
|
); chk.E(err) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
log.I.F("getting ownersMuted")
|
||||||
|
var ownersMuted [][]byte
|
||||||
|
if ownersMuted, err = s.SpiderFetch(
|
||||||
|
kinds.New(kind.MuteList), dontFetch, false, ownersPubkeys...,
|
||||||
|
); chk.E(err) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// remove the ownersFollowed and ownersMuted items from the followedFollows
|
||||||
|
// list
|
||||||
|
filteredFollows := make([][]byte, 0, len(followedFollows))
|
||||||
|
for _, follow := range followedFollows {
|
||||||
|
for _, owner := range ownersFollowed {
|
||||||
|
if bytes.Equal(follow, owner) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, owner := range ownersMuted {
|
||||||
|
if bytes.Equal(follow, owner) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
filteredFollows = append(filteredFollows, follow)
|
filteredFollows = append(filteredFollows, follow)
|
||||||
}
|
}
|
||||||
}
|
followedFollows = filteredFollows
|
||||||
followedFollows = filteredFollows
|
own := "owner"
|
||||||
own := "owner"
|
if len(ownersPubkeys) > 1 {
|
||||||
if len(ownersPubkeys) > 1 {
|
own = "owners"
|
||||||
own = "owners"
|
}
|
||||||
}
|
fol := "pubkey"
|
||||||
fol := "pubkey"
|
if len(ownersFollowed) > 1 {
|
||||||
if len(ownersFollowed) > 1 {
|
fol = "pubkeys"
|
||||||
fol = "pubkeys"
|
}
|
||||||
}
|
folfol := "pubkey"
|
||||||
folfol := "pubkey"
|
if len(followedFollows) > 1 {
|
||||||
if len(followedFollows) > 1 {
|
folfol = "pubkeys"
|
||||||
folfol = "pubkeys"
|
}
|
||||||
}
|
mut := "pubkey"
|
||||||
mut := "pubkey"
|
if len(ownersMuted) > 1 {
|
||||||
if len(ownersMuted) > 1 {
|
mut = "pubkeys"
|
||||||
mut = "pubkeys"
|
}
|
||||||
}
|
log.T.F(
|
||||||
log.T.F(
|
"found %d %s with a total of %d followed %s and %d followed's follows %s, and excluding %d owner muted %s",
|
||||||
"found %d %s with a total of %d followed %s and %d followed's follows %s, and excluding %d owner muted %s",
|
len(ownersPubkeys), own,
|
||||||
len(ownersPubkeys), own,
|
len(ownersFollowed), fol,
|
||||||
len(ownersFollowed), fol,
|
len(followedFollows), folfol,
|
||||||
len(followedFollows), folfol,
|
len(ownersMuted), mut,
|
||||||
len(ownersMuted), mut,
|
)
|
||||||
)
|
// add the owners to the ownersFollowed
|
||||||
// add the owners
|
ownersFollowed = append(ownersFollowed, ownersPubkeys...)
|
||||||
ownersFollowed = append(ownersFollowed, ownersPubkeys...)
|
s.SetOwnersPubkeys(ownersPubkeys)
|
||||||
s.SetOwnersPubkeys(ownersPubkeys)
|
s.SetOwnersFollowed(ownersFollowed)
|
||||||
s.SetOwnersFollowed(ownersFollowed)
|
s.SetFollowedFollows(followedFollows)
|
||||||
s.SetFollowedFollows(followedFollows)
|
s.SetOwnersMuted(ownersMuted)
|
||||||
s.SetOwnersMuted(ownersMuted)
|
// lastly, update users profile metadata and relay lists in the background
|
||||||
// lastly, update users profile metadata and relay lists in the background
|
if !dontFetch {
|
||||||
if !dontFetch {
|
go func() {
|
||||||
go func() {
|
everyone := append(ownersFollowed, followedFollows...)
|
||||||
everyone := append(ownersFollowed, followedFollows...)
|
s.SpiderFetch(
|
||||||
s.SpiderFetch(kind.ProfileMetadata, false, everyone...)
|
kinds.New(
|
||||||
s.SpiderFetch(kind.RelayListMetadata, false, everyone...)
|
kind.ProfileMetadata, kind.RelayListMetadata,
|
||||||
s.SpiderFetch(kind.DMRelaysList, false, everyone...)
|
kind.DMRelaysList,
|
||||||
}()
|
), false, true, everyone...,
|
||||||
}
|
)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
}()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -80,11 +80,6 @@ func (d *D) Import(r io.Reader) {
|
|||||||
panic("implement me")
|
panic("implement me")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *D) Export(c context.T, w io.Writer, pubkeys ...[]byte) {
|
|
||||||
// TODO implement me
|
|
||||||
panic("implement me")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *D) SetLogLevel(level string) {
|
func (d *D) SetLogLevel(level string) {
|
||||||
d.Logger.SetLogLevel(lol.GetLogLevel(level))
|
d.Logger.SetLogLevel(lol.GetLogLevel(level))
|
||||||
}
|
}
|
||||||
|
|||||||
98
pkg/database/export.go
Normal file
98
pkg/database/export.go
Normal file
@@ -0,0 +1,98 @@
|
|||||||
|
package database
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"github.com/dgraph-io/badger/v4"
|
||||||
|
"io"
|
||||||
|
"orly.dev/pkg/database/indexes"
|
||||||
|
"orly.dev/pkg/database/indexes/types"
|
||||||
|
"orly.dev/pkg/encoders/codecbuf"
|
||||||
|
"orly.dev/pkg/encoders/event"
|
||||||
|
"orly.dev/pkg/utils/chk"
|
||||||
|
"orly.dev/pkg/utils/context"
|
||||||
|
"orly.dev/pkg/utils/units"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Export the complete database of stored events to an io.Writer in line structured minified
|
||||||
|
// JSON.
|
||||||
|
func (d *D) Export(c context.T, w io.Writer, pubkeys ...[]byte) {
|
||||||
|
var err error
|
||||||
|
if len(pubkeys) == 0 {
|
||||||
|
if err = d.View(
|
||||||
|
func(txn *badger.Txn) (err error) {
|
||||||
|
buf := codecbuf.Get()
|
||||||
|
defer codecbuf.Put(buf)
|
||||||
|
if err = indexes.EventEnc(nil).MarshalWrite(buf); chk.E(err) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
it := txn.NewIterator(badger.IteratorOptions{Prefix: buf.Bytes()})
|
||||||
|
evB := make([]byte, 0, units.Mb)
|
||||||
|
defer it.Close()
|
||||||
|
for it.Rewind(); it.Valid(); it.Next() {
|
||||||
|
item := it.Item()
|
||||||
|
if evB, err = item.ValueCopy(evB); chk.E(err) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
evBuf := bytes.NewBuffer(evB)
|
||||||
|
ev := event.New()
|
||||||
|
if err = ev.UnmarshalBinary(evBuf); chk.E(err) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// Serialize the event to JSON and write it to the output
|
||||||
|
if _, err = w.Write(ev.Serialize()); chk.E(err) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if _, err = w.Write([]byte{'\n'}); chk.E(err) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
},
|
||||||
|
); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
for _, pubkey := range pubkeys {
|
||||||
|
if err = d.View(
|
||||||
|
func(txn *badger.Txn) (err error) {
|
||||||
|
pkBuf := codecbuf.Get()
|
||||||
|
defer codecbuf.Put(pkBuf)
|
||||||
|
ph := &types.PubHash{}
|
||||||
|
if err = ph.FromPubkey(pubkey); chk.E(err) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err = indexes.PubkeyEnc(
|
||||||
|
ph, nil, nil,
|
||||||
|
).MarshalWrite(pkBuf); chk.E(err) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
it := txn.NewIterator(badger.IteratorOptions{Prefix: pkBuf.Bytes()})
|
||||||
|
evB := make([]byte, 0, units.Mb)
|
||||||
|
defer it.Close()
|
||||||
|
for it.Rewind(); it.Valid(); it.Next() {
|
||||||
|
item := it.Item()
|
||||||
|
if evB, err = item.ValueCopy(evB); chk.E(err) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
evBuf := bytes.NewBuffer(evB)
|
||||||
|
ev := event.New()
|
||||||
|
if err = ev.UnmarshalBinary(evBuf); chk.E(err) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// Serialize the event to JSON and write it to the output
|
||||||
|
if _, err = w.Write(ev.Serialize()); chk.E(err) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if _, err = w.Write([]byte{'\n'}); chk.E(err) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
},
|
||||||
|
); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
111
pkg/database/export_test.go
Normal file
111
pkg/database/export_test.go
Normal file
@@ -0,0 +1,111 @@
|
|||||||
|
package database
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"bytes"
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"orly.dev/pkg/encoders/event"
|
||||||
|
"orly.dev/pkg/encoders/event/examples"
|
||||||
|
"orly.dev/pkg/utils/chk"
|
||||||
|
"orly.dev/pkg/utils/context"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestExport tests the Export function by:
|
||||||
|
// 1. Creating a new database with events from examples.Cache
|
||||||
|
// 2. Checking that all event IDs in the cache are found in the export
|
||||||
|
// 3. Verifying this also works when only a few pubkeys are requested
|
||||||
|
func TestExport(t *testing.T) {
|
||||||
|
// Create a temporary directory for the database
|
||||||
|
tempDir, err := os.MkdirTemp("", "test-db-*")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to create temporary directory: %v", err)
|
||||||
|
}
|
||||||
|
defer os.RemoveAll(tempDir) // Clean up after the test
|
||||||
|
|
||||||
|
// Create a context and cancel function for the database
|
||||||
|
ctx, cancel := context.Cancel(context.Bg())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
// Initialize the database
|
||||||
|
db, err := New(ctx, cancel, tempDir, "info")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to create database: %v", err)
|
||||||
|
}
|
||||||
|
defer db.Close()
|
||||||
|
|
||||||
|
// Create a scanner to read events from examples.Cache
|
||||||
|
scanner := bufio.NewScanner(bytes.NewBuffer(examples.Cache))
|
||||||
|
scanner.Buffer(make([]byte, 0, 1_000_000_000), 1_000_000_000)
|
||||||
|
|
||||||
|
// Maps to store event IDs and their associated pubkeys
|
||||||
|
eventIDs := make(map[string]bool)
|
||||||
|
pubkeyToEventIDs := make(map[string][]string)
|
||||||
|
|
||||||
|
// Process each event
|
||||||
|
for scanner.Scan() {
|
||||||
|
chk.E(scanner.Err())
|
||||||
|
b := scanner.Bytes()
|
||||||
|
ev := event.New()
|
||||||
|
|
||||||
|
// Unmarshal the event
|
||||||
|
if _, err = ev.Unmarshal(b); chk.E(err) {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Save the event to the database
|
||||||
|
if _, _, err = db.SaveEvent(ctx, ev); err != nil {
|
||||||
|
t.Fatalf("Failed to save event: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Store the event ID
|
||||||
|
eventID := ev.IdString()
|
||||||
|
eventIDs[eventID] = true
|
||||||
|
|
||||||
|
// Store the event ID by pubkey
|
||||||
|
pubkey := ev.PubKeyString()
|
||||||
|
pubkeyToEventIDs[pubkey] = append(pubkeyToEventIDs[pubkey], eventID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check for scanner errors
|
||||||
|
if err = scanner.Err(); err != nil {
|
||||||
|
t.Fatalf("Scanner error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Logf("Saved %d events to the database", len(eventIDs))
|
||||||
|
|
||||||
|
// Test 1: Export all events and verify all IDs are in the export
|
||||||
|
var exportBuffer bytes.Buffer
|
||||||
|
db.Export(ctx, &exportBuffer)
|
||||||
|
|
||||||
|
// Parse the exported events and check that all IDs are present
|
||||||
|
exportedIDs := make(map[string]bool)
|
||||||
|
exportScanner := bufio.NewScanner(&exportBuffer)
|
||||||
|
exportScanner.Buffer(make([]byte, 0, 1_000_000_000), 1_000_000_000)
|
||||||
|
exportCount := 0
|
||||||
|
for exportScanner.Scan() {
|
||||||
|
b := exportScanner.Bytes()
|
||||||
|
ev := event.New()
|
||||||
|
if _, err = ev.Unmarshal(b); chk.E(err) {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
exportedIDs[ev.IdString()] = true
|
||||||
|
exportCount++
|
||||||
|
}
|
||||||
|
// Check for scanner errors
|
||||||
|
if err = exportScanner.Err(); err != nil {
|
||||||
|
t.Fatalf("Scanner error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Logf("Found %d events in the export", exportCount)
|
||||||
|
|
||||||
|
// Check that all original event IDs are in the export
|
||||||
|
for id := range eventIDs {
|
||||||
|
if !exportedIDs[id] {
|
||||||
|
t.Errorf("Event ID %s not found in export", id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Logf("All %d event IDs found in export", len(eventIDs))
|
||||||
|
}
|
||||||
@@ -8,6 +8,7 @@ import (
|
|||||||
"orly.dev/pkg/encoders/codecbuf"
|
"orly.dev/pkg/encoders/codecbuf"
|
||||||
"orly.dev/pkg/interfaces/store"
|
"orly.dev/pkg/interfaces/store"
|
||||||
"orly.dev/pkg/utils/chk"
|
"orly.dev/pkg/utils/chk"
|
||||||
|
"orly.dev/pkg/utils/errorf"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (d *D) GetFullIdPubkeyBySerial(ser *types.Uint40) (
|
func (d *D) GetFullIdPubkeyBySerial(ser *types.Uint40) (
|
||||||
@@ -53,5 +54,11 @@ func (d *D) GetFullIdPubkeyBySerial(ser *types.Uint40) (
|
|||||||
); chk.E(err) {
|
); chk.E(err) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if fidpk != nil {
|
||||||
|
err = errorf.E(
|
||||||
|
"failed to fetch full id pubkey by serial %d",
|
||||||
|
ser.Get(),
|
||||||
|
)
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -32,7 +32,7 @@ func (d *D) QueryEvents(c context.T, f *filter.F) (evs event.S, err error) {
|
|||||||
}
|
}
|
||||||
// fetch the events
|
// fetch the events
|
||||||
var ev *event.E
|
var ev *event.E
|
||||||
if ev, err = d.FetchEventBySerial(ser); chk.E(err) {
|
if ev, err = d.FetchEventBySerial(ser); err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
evs = append(evs, ev)
|
evs = append(evs, ev)
|
||||||
@@ -46,7 +46,6 @@ func (d *D) QueryEvents(c context.T, f *filter.F) (evs event.S, err error) {
|
|||||||
} else {
|
} else {
|
||||||
var idPkTs []store.IdPkTs
|
var idPkTs []store.IdPkTs
|
||||||
if idPkTs, err = d.QueryForIds(c, f); chk.E(err) {
|
if idPkTs, err = d.QueryForIds(c, f); chk.E(err) {
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create a map to store the latest version of replaceable events
|
// Create a map to store the latest version of replaceable events
|
||||||
|
|||||||
@@ -16,7 +16,6 @@ import (
|
|||||||
"orly.dev/pkg/utils/chk"
|
"orly.dev/pkg/utils/chk"
|
||||||
"orly.dev/pkg/utils/context"
|
"orly.dev/pkg/utils/context"
|
||||||
"orly.dev/pkg/utils/errorf"
|
"orly.dev/pkg/utils/errorf"
|
||||||
"orly.dev/pkg/utils/log"
|
|
||||||
"sort"
|
"sort"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -38,7 +37,6 @@ func (d *D) SaveEvent(c context.T, ev *event.E) (kc, vc int, err error) {
|
|||||||
DTag: t.Value(),
|
DTag: t.Value(),
|
||||||
}
|
}
|
||||||
at := a.Marshal(nil)
|
at := a.Marshal(nil)
|
||||||
log.I.S(at)
|
|
||||||
if idxs, err = GetIndexesFromFilter(
|
if idxs, err = GetIndexesFromFilter(
|
||||||
&filter.F{
|
&filter.F{
|
||||||
Authors: tag.New(ev.Pubkey),
|
Authors: tag.New(ev.Pubkey),
|
||||||
|
|||||||
@@ -35,7 +35,20 @@ func NewChallengeWith[V string | []byte](challenge V) *Challenge {
|
|||||||
// Label returns the label of a authenvelope.Challenge.
|
// Label returns the label of a authenvelope.Challenge.
|
||||||
func (en *Challenge) Label() string { return L }
|
func (en *Challenge) Label() string { return L }
|
||||||
|
|
||||||
// Write the authenvelope.Challenge to a provided io.Writer.
|
// Write encodes and writes the Challenge instance to the provided writer.
|
||||||
|
//
|
||||||
|
// # Parameters
|
||||||
|
//
|
||||||
|
// - w (io.Writer): The destination where the encoded data will be written.
|
||||||
|
//
|
||||||
|
// # Return Values
|
||||||
|
//
|
||||||
|
// - err (error): An error if writing to the writer fails.
|
||||||
|
//
|
||||||
|
// # Expected behaviour
|
||||||
|
//
|
||||||
|
// Encodes the Challenge instance into a byte slice using Marshal, logs the
|
||||||
|
// encoded challenge, and writes it to the provided io.Writer.
|
||||||
func (en *Challenge) Write(w io.Writer) (err error) {
|
func (en *Challenge) Write(w io.Writer) (err error) {
|
||||||
var b []byte
|
var b []byte
|
||||||
b = en.Marshal(b)
|
b = en.Marshal(b)
|
||||||
@@ -44,8 +57,26 @@ func (en *Challenge) Write(w io.Writer) (err error) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Marshal a authenvelope.Challenge to minified JSON, appending to a provided destination
|
// Marshal encodes the Challenge instance into a byte slice, formatting it as
|
||||||
// slice. Note that this ensures correct string escaping on the challenge field.
|
// a JSON-like structure with a specific label and escaping rules applied to
|
||||||
|
// its content.
|
||||||
|
//
|
||||||
|
// # Parameters
|
||||||
|
//
|
||||||
|
// - dst ([]byte): The destination buffer where the encoded data will be written.
|
||||||
|
//
|
||||||
|
// # Return Values
|
||||||
|
//
|
||||||
|
// - b ([]byte): The byte slice containing the encoded Challenge data.
|
||||||
|
//
|
||||||
|
// # Expected behaviour
|
||||||
|
//
|
||||||
|
// - Prepares the destination buffer and applies a label to it.
|
||||||
|
//
|
||||||
|
// - Escapes the challenge content according to Nostr-specific rules before
|
||||||
|
// appending it to the output.
|
||||||
|
//
|
||||||
|
// - Returns the resulting byte slice with the complete encoded structure.
|
||||||
func (en *Challenge) Marshal(dst []byte) (b []byte) {
|
func (en *Challenge) Marshal(dst []byte) (b []byte) {
|
||||||
b = dst
|
b = dst
|
||||||
var err error
|
var err error
|
||||||
@@ -63,9 +94,24 @@ func (en *Challenge) Marshal(dst []byte) (b []byte) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Unmarshal a authenvelope.Challenge from minified JSON, returning the remainder after the
|
// Unmarshal parses the provided byte slice and extracts the challenge value,
|
||||||
// end of the envelope. Note that this ensures the challenge string was
|
// leaving any remaining bytes after parsing.
|
||||||
// correctly escaped by NIP-01 escaping rules.
|
//
|
||||||
|
// # Parameters
|
||||||
|
//
|
||||||
|
// - b ([]byte): The byte slice containing the encoded challenge data.
|
||||||
|
//
|
||||||
|
// # Return Values
|
||||||
|
//
|
||||||
|
// - r ([]byte): Any remaining bytes after parsing the challenge.
|
||||||
|
//
|
||||||
|
// - err (error): An error if parsing fails.
|
||||||
|
//
|
||||||
|
// # Expected behaviour
|
||||||
|
//
|
||||||
|
// - Extracts the quoted challenge string from the input byte slice.
|
||||||
|
//
|
||||||
|
// - Trims any trailing characters following the closing quote.
|
||||||
func (en *Challenge) Unmarshal(b []byte) (r []byte, err error) {
|
func (en *Challenge) Unmarshal(b []byte) (r []byte, err error) {
|
||||||
r = b
|
r = b
|
||||||
if en.Challenge, r, err = text2.UnmarshalQuoted(r); chk.E(err) {
|
if en.Challenge, r, err = text2.UnmarshalQuoted(r); chk.E(err) {
|
||||||
@@ -80,8 +126,26 @@ func (en *Challenge) Unmarshal(b []byte) (r []byte, err error) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// ParseChallenge reads a authenvelope.Challenge encoded in minified JSON and unpacks it to
|
// ParseChallenge parses the provided byte slice into a new Challenge instance,
|
||||||
// the runtime format.
|
// extracting the challenge value and returning any remaining bytes after parsing.
|
||||||
|
//
|
||||||
|
// # Parameters
|
||||||
|
//
|
||||||
|
// - b ([]byte): The byte slice containing the encoded challenge data.
|
||||||
|
//
|
||||||
|
// # Return Values
|
||||||
|
//
|
||||||
|
// - t (*Challenge): A pointer to the newly created and populated Challenge
|
||||||
|
// instance.
|
||||||
|
//
|
||||||
|
// - rem ([]byte): Any remaining bytes in the input slice after parsing.
|
||||||
|
//
|
||||||
|
// - err (error): An error if parsing fails.
|
||||||
|
//
|
||||||
|
// # Expected behaviour
|
||||||
|
//
|
||||||
|
// Parses the byte slice into a new Challenge instance using Unmarshal,
|
||||||
|
// returning any remaining bytes and an error if parsing fails.
|
||||||
func ParseChallenge(b []byte) (t *Challenge, rem []byte, err error) {
|
func ParseChallenge(b []byte) (t *Challenge, rem []byte, err error) {
|
||||||
t = NewChallenge()
|
t = NewChallenge()
|
||||||
if rem, err = t.Unmarshal(b); chk.E(err) {
|
if rem, err = t.Unmarshal(b); chk.E(err) {
|
||||||
|
|||||||
@@ -145,7 +145,7 @@ func (en *Result) Unmarshal(b []byte) (r []byte, err error) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
en.Event = event.New()
|
en.Event = event.New()
|
||||||
if r, err = en.Event.Unmarshal(r); chk.E(err) {
|
if r, err = en.Event.Unmarshal(r); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if r, err = envelopes.SkipToTheEnd(r); chk.E(err) {
|
if r, err = envelopes.SkipToTheEnd(r); chk.E(err) {
|
||||||
@@ -158,7 +158,7 @@ func (en *Result) Unmarshal(b []byte) (r []byte, err error) {
|
|||||||
// envelope into it.
|
// envelope into it.
|
||||||
func ParseResult(b []byte) (t *Result, rem []byte, err error) {
|
func ParseResult(b []byte) (t *Result, rem []byte, err error) {
|
||||||
t = NewResult()
|
t = NewResult()
|
||||||
if rem, err = t.Unmarshal(b); chk.E(err) {
|
if rem, err = t.Unmarshal(b); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ package event
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"fmt"
|
||||||
"github.com/minio/sha256-simd"
|
"github.com/minio/sha256-simd"
|
||||||
"io"
|
"io"
|
||||||
"orly.dev/pkg/crypto/ec/schnorr"
|
"orly.dev/pkg/crypto/ec/schnorr"
|
||||||
@@ -300,7 +301,7 @@ AfterClose:
|
|||||||
}
|
}
|
||||||
return
|
return
|
||||||
invalid:
|
invalid:
|
||||||
err = errorf.E(
|
err = fmt.Errorf(
|
||||||
"invalid key,\n'%s'\n'%s'\n'%s'", string(b), string(b[:len(r)]),
|
"invalid key,\n'%s'\n'%s'\n'%s'", string(b), string(b[:len(r)]),
|
||||||
string(r),
|
string(r),
|
||||||
)
|
)
|
||||||
|
|||||||
422
pkg/encoders/event/json_tags_test.go
Normal file
422
pkg/encoders/event/json_tags_test.go
Normal file
@@ -0,0 +1,422 @@
|
|||||||
|
package event
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"orly.dev/pkg/encoders/kind"
|
||||||
|
"orly.dev/pkg/encoders/tag"
|
||||||
|
"orly.dev/pkg/encoders/tags"
|
||||||
|
text2 "orly.dev/pkg/encoders/text"
|
||||||
|
"orly.dev/pkg/encoders/timestamp"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
// compareTags compares two tags and reports any differences
|
||||||
|
func compareTags(t *testing.T, expected, actual *tags.T, context string) {
|
||||||
|
if expected == nil && actual == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if expected == nil || actual == nil {
|
||||||
|
t.Errorf("%s: One of the tags is nil", context)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
expectedSlice := expected.ToStringsSlice()
|
||||||
|
actualSlice := actual.ToStringsSlice()
|
||||||
|
|
||||||
|
if len(expectedSlice) != len(actualSlice) {
|
||||||
|
t.Errorf(
|
||||||
|
"%s: Tags length mismatch: expected %d, got %d", context,
|
||||||
|
len(expectedSlice), len(actualSlice),
|
||||||
|
)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, expectedTag := range expectedSlice {
|
||||||
|
actualTag := actualSlice[i]
|
||||||
|
|
||||||
|
if len(expectedTag) != len(actualTag) {
|
||||||
|
t.Errorf(
|
||||||
|
"%s: Tag[%d] length mismatch: expected %d, got %d", context, i,
|
||||||
|
len(expectedTag), len(actualTag),
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
for j, expectedElem := range expectedTag {
|
||||||
|
if expectedElem != actualTag[j] {
|
||||||
|
t.Errorf(
|
||||||
|
"%s: Tag[%d][%d] mismatch: expected '%s', got '%s'",
|
||||||
|
context, i, j, expectedElem, actualTag[j],
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestUnmarshalEscapedJSONInTags tests that the Unmarshal function correctly handles
|
||||||
|
// tags with fields containing escaped JSON that has been escaped using NostrEscape.
|
||||||
|
func TestUnmarshalEscapedJSONInTags(t *testing.T) {
|
||||||
|
// Test 1: Tag with a field containing escaped JSON
|
||||||
|
t.Run("SimpleEscapedJSON", func(t *testing.T) {
|
||||||
|
// Create a tag with a field containing JSON that needs escaping
|
||||||
|
jsonContent := `{"key":"value","nested":{"array":[1,2,3]}}`
|
||||||
|
|
||||||
|
// Create the event with the tag containing JSON
|
||||||
|
originalEvent := &E{
|
||||||
|
Id: bytes.Repeat([]byte{0x01}, 32),
|
||||||
|
Pubkey: bytes.Repeat([]byte{0x02}, 32),
|
||||||
|
CreatedAt: timestamp.FromUnix(1609459200),
|
||||||
|
Kind: kind.TextNote,
|
||||||
|
Tags: tags.New(),
|
||||||
|
Content: []byte("Event with JSON in tag"),
|
||||||
|
Sig: bytes.Repeat([]byte{0x03}, 64),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add a tag with JSON content
|
||||||
|
jsonTag := tag.New("j", jsonContent)
|
||||||
|
originalEvent.Tags.AppendTags(jsonTag)
|
||||||
|
|
||||||
|
// Marshal the event
|
||||||
|
marshaled := originalEvent.Marshal(nil)
|
||||||
|
|
||||||
|
// Unmarshal back into a new event
|
||||||
|
unmarshaledEvent := &E{}
|
||||||
|
_, err := unmarshaledEvent.Unmarshal(marshaled)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to unmarshal event with JSON in tag: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify the tag was correctly unmarshaled
|
||||||
|
if unmarshaledEvent.Tags.Len() != 1 {
|
||||||
|
t.Fatalf("Expected 1 tag, got %d", unmarshaledEvent.Tags.Len())
|
||||||
|
}
|
||||||
|
|
||||||
|
unmarshaledTag := unmarshaledEvent.Tags.GetTagElement(0)
|
||||||
|
if unmarshaledTag.Len() != 2 {
|
||||||
|
t.Fatalf("Expected tag with 2 elements, got %d", unmarshaledTag.Len())
|
||||||
|
}
|
||||||
|
|
||||||
|
if string(unmarshaledTag.B(0)) != "j" {
|
||||||
|
t.Errorf("Expected tag key 'j', got '%s'", unmarshaledTag.B(0))
|
||||||
|
}
|
||||||
|
|
||||||
|
if string(unmarshaledTag.B(1)) != jsonContent {
|
||||||
|
t.Errorf("Expected tag value '%s', got '%s'", jsonContent, unmarshaledTag.B(1))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
// Test 2: Tag with a field containing escaped JSON with special characters
|
||||||
|
t.Run("EscapedJSONWithSpecialChars", func(t *testing.T) {
|
||||||
|
// JSON with characters that need escaping: quotes, backslashes, control chars
|
||||||
|
jsonContent := `{"text":"This has \"quotes\" and \\ backslashes","newlines":"\n\r\t"}`
|
||||||
|
|
||||||
|
// Create the event with the tag containing JSON with special chars
|
||||||
|
originalEvent := &E{
|
||||||
|
Id: bytes.Repeat([]byte{0x01}, 32),
|
||||||
|
Pubkey: bytes.Repeat([]byte{0x02}, 32),
|
||||||
|
CreatedAt: timestamp.FromUnix(1609459200),
|
||||||
|
Kind: kind.TextNote,
|
||||||
|
Tags: tags.New(),
|
||||||
|
Content: []byte("Event with JSON containing special chars in tag"),
|
||||||
|
Sig: bytes.Repeat([]byte{0x03}, 64),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add a tag with JSON content containing special chars
|
||||||
|
jsonTag := tag.New("j", jsonContent)
|
||||||
|
originalEvent.Tags.AppendTags(jsonTag)
|
||||||
|
|
||||||
|
// Marshal the event
|
||||||
|
marshaled := originalEvent.Marshal(nil)
|
||||||
|
|
||||||
|
// Unmarshal back into a new event
|
||||||
|
unmarshaledEvent := &E{}
|
||||||
|
_, err := unmarshaledEvent.Unmarshal(marshaled)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to unmarshal event with JSON containing special chars: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify the tag was correctly unmarshaled
|
||||||
|
unmarshaledTag := unmarshaledEvent.Tags.GetTagElement(0)
|
||||||
|
if string(unmarshaledTag.B(1)) != jsonContent {
|
||||||
|
t.Errorf("Expected tag value '%s', got '%s'", jsonContent, unmarshaledTag.B(1))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
// Test 3: Tag with nested JSON that contains already escaped content
|
||||||
|
t.Run("NestedEscapedJSON", func(t *testing.T) {
|
||||||
|
// JSON with already escaped content
|
||||||
|
jsonContent := `{"escaped":"This JSON contains \\\"already escaped\\\" content"}`
|
||||||
|
|
||||||
|
// Create the event with the tag containing nested escaped JSON
|
||||||
|
originalEvent := &E{
|
||||||
|
Id: bytes.Repeat([]byte{0x01}, 32),
|
||||||
|
Pubkey: bytes.Repeat([]byte{0x02}, 32),
|
||||||
|
CreatedAt: timestamp.FromUnix(1609459200),
|
||||||
|
Kind: kind.TextNote,
|
||||||
|
Tags: tags.New(),
|
||||||
|
Content: []byte("Event with nested escaped JSON in tag"),
|
||||||
|
Sig: bytes.Repeat([]byte{0x03}, 64),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add a tag with nested escaped JSON content
|
||||||
|
jsonTag := tag.New("j", jsonContent)
|
||||||
|
originalEvent.Tags.AppendTags(jsonTag)
|
||||||
|
|
||||||
|
// Marshal the event
|
||||||
|
marshaled := originalEvent.Marshal(nil)
|
||||||
|
|
||||||
|
// Unmarshal back into a new event
|
||||||
|
unmarshaledEvent := &E{}
|
||||||
|
_, err := unmarshaledEvent.Unmarshal(marshaled)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to unmarshal event with nested escaped JSON: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify the tag was correctly unmarshaled
|
||||||
|
unmarshaledTag := unmarshaledEvent.Tags.GetTagElement(0)
|
||||||
|
if string(unmarshaledTag.B(1)) != jsonContent {
|
||||||
|
t.Errorf("Expected tag value '%s', got '%s'", jsonContent, unmarshaledTag.B(1))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
// Test 4: Tag with JSON that has been explicitly escaped using NostrEscape
|
||||||
|
t.Run("ExplicitlyEscapedJSON", func(t *testing.T) {
|
||||||
|
// Original JSON with characters that need escaping
|
||||||
|
originalJSON := []byte(`{"key":"value with "quotes"","nested":{"array":[1,2,3],"special":"\n\r\t"}}`)
|
||||||
|
|
||||||
|
// Explicitly escape the JSON using NostrEscape
|
||||||
|
escapedJSON := make([]byte, 0, len(originalJSON)*2)
|
||||||
|
escapedJSON = text2.NostrEscape(escapedJSON, originalJSON)
|
||||||
|
|
||||||
|
// Create the event with the tag containing explicitly escaped JSON
|
||||||
|
originalEvent := &E{
|
||||||
|
Id: bytes.Repeat([]byte{0x01}, 32),
|
||||||
|
Pubkey: bytes.Repeat([]byte{0x02}, 32),
|
||||||
|
CreatedAt: timestamp.FromUnix(1609459200),
|
||||||
|
Kind: kind.TextNote,
|
||||||
|
Tags: tags.New(),
|
||||||
|
Content: []byte("Event with explicitly escaped JSON in tag"),
|
||||||
|
Sig: bytes.Repeat([]byte{0x03}, 64),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add a tag with the explicitly escaped JSON content
|
||||||
|
jsonTag := tag.New("j", string(escapedJSON))
|
||||||
|
originalEvent.Tags.AppendTags(jsonTag)
|
||||||
|
|
||||||
|
// Marshal the event
|
||||||
|
marshaled := originalEvent.Marshal(nil)
|
||||||
|
|
||||||
|
// Unmarshal back into a new event
|
||||||
|
unmarshaledEvent := &E{}
|
||||||
|
_, err := unmarshaledEvent.Unmarshal(marshaled)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to unmarshal event with explicitly escaped JSON: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify the tag was correctly unmarshaled
|
||||||
|
unmarshaledTag := unmarshaledEvent.Tags.GetTagElement(0)
|
||||||
|
if string(unmarshaledTag.B(1)) != string(escapedJSON) {
|
||||||
|
t.Errorf("Expected tag value '%s', got '%s'", string(escapedJSON), unmarshaledTag.B(1))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unescape the unmarshaled JSON to verify it matches the original
|
||||||
|
unescapedJSON := make([]byte, len(unmarshaledTag.B(1)))
|
||||||
|
copy(unescapedJSON, unmarshaledTag.B(1))
|
||||||
|
unescapedJSON = text2.NostrUnescape(unescapedJSON)
|
||||||
|
|
||||||
|
if string(unescapedJSON) != string(originalJSON) {
|
||||||
|
t.Errorf("Unescaped JSON doesn't match original. Expected '%s', got '%s'", string(originalJSON), string(unescapedJSON))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestUnmarshalTags(t *testing.T) {
|
||||||
|
// Test 1: Simple event with empty tags
|
||||||
|
t.Run(
|
||||||
|
"EmptyTags", func(t *testing.T) {
|
||||||
|
jsonWithEmptyTags := []byte(`{"id":"0101010101010101010101010101010101010101010101010101010101010101","pubkey":"0202020202020202020202020202020202020202020202020202020202020202","created_at":1609459200,"kind":1,"tags":[],"content":"This is a test event","sig":"03030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303"}`)
|
||||||
|
|
||||||
|
expected := &E{
|
||||||
|
Id: bytes.Repeat([]byte{0x01}, 32),
|
||||||
|
Pubkey: bytes.Repeat([]byte{0x02}, 32),
|
||||||
|
CreatedAt: timestamp.FromUnix(1609459200),
|
||||||
|
Kind: kind.TextNote,
|
||||||
|
Tags: tags.New(),
|
||||||
|
Content: []byte("This is a test event"),
|
||||||
|
Sig: bytes.Repeat([]byte{0x03}, 64),
|
||||||
|
}
|
||||||
|
|
||||||
|
actual := &E{}
|
||||||
|
_, err := actual.Unmarshal(jsonWithEmptyTags)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to unmarshal JSON with empty tags: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
compareTags(t, expected.Tags, actual.Tags, "EmptyTags")
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
// Test 2: Event with simple tags
|
||||||
|
t.Run(
|
||||||
|
"SimpleTags", func(t *testing.T) {
|
||||||
|
jsonWithSimpleTags := []byte(`{"id":"0101010101010101010101010101010101010101010101010101010101010101","pubkey":"0202020202020202020202020202020202020202020202020202020202020202","created_at":1609459200,"kind":1,"tags":[["e","1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef"],["p","abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890"]],"content":"This is a test event","sig":"03030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303"}`)
|
||||||
|
|
||||||
|
expected := &E{
|
||||||
|
Id: bytes.Repeat([]byte{0x01}, 32),
|
||||||
|
Pubkey: bytes.Repeat([]byte{0x02}, 32),
|
||||||
|
CreatedAt: timestamp.FromUnix(1609459200),
|
||||||
|
Kind: kind.TextNote,
|
||||||
|
Tags: tags.New(),
|
||||||
|
Content: []byte("This is a test event"),
|
||||||
|
Sig: bytes.Repeat([]byte{0x03}, 64),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add tags
|
||||||
|
eTag := tag.New(
|
||||||
|
"e",
|
||||||
|
"1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef",
|
||||||
|
)
|
||||||
|
pTag := tag.New(
|
||||||
|
"p",
|
||||||
|
"abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890",
|
||||||
|
)
|
||||||
|
expected.Tags.AppendTags(eTag, pTag)
|
||||||
|
|
||||||
|
actual := &E{}
|
||||||
|
_, err := actual.Unmarshal(jsonWithSimpleTags)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to unmarshal JSON with simple tags: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
compareTags(t, expected.Tags, actual.Tags, "SimpleTags")
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
// Test 3: Event with complex tags (more elements per tag)
|
||||||
|
t.Run(
|
||||||
|
"ComplexTags", func(t *testing.T) {
|
||||||
|
jsonWithComplexTags := []byte(`{"id":"0101010101010101010101010101010101010101010101010101010101010101","pubkey":"0202020202020202020202020202020202020202020202020202020202020202","created_at":1609459200,"kind":1,"tags":[["e","1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef","wss://relay.example.com","root"],["p","abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890","wss://relay.example.com"],["t","hashtag","topic"]],"content":"This is a test event","sig":"03030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303"}`)
|
||||||
|
|
||||||
|
expected := &E{
|
||||||
|
Id: bytes.Repeat([]byte{0x01}, 32),
|
||||||
|
Pubkey: bytes.Repeat([]byte{0x02}, 32),
|
||||||
|
CreatedAt: timestamp.FromUnix(1609459200),
|
||||||
|
Kind: kind.TextNote,
|
||||||
|
Tags: tags.New(),
|
||||||
|
Content: []byte("This is a test event"),
|
||||||
|
Sig: bytes.Repeat([]byte{0x03}, 64),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add tags
|
||||||
|
eTag := tag.New(
|
||||||
|
"e",
|
||||||
|
"1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef",
|
||||||
|
"wss://relay.example.com", "root",
|
||||||
|
)
|
||||||
|
pTag := tag.New(
|
||||||
|
"p",
|
||||||
|
"abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890",
|
||||||
|
"wss://relay.example.com",
|
||||||
|
)
|
||||||
|
tTag := tag.New("t", "hashtag", "topic")
|
||||||
|
expected.Tags.AppendTags(eTag, pTag, tTag)
|
||||||
|
|
||||||
|
actual := &E{}
|
||||||
|
_, err := actual.Unmarshal(jsonWithComplexTags)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to unmarshal JSON with complex tags: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
compareTags(t, expected.Tags, actual.Tags, "ComplexTags")
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
// Test 4: Test using the Unmarshal function (not the method)
|
||||||
|
t.Run(
|
||||||
|
"UnmarshalFunction", func(t *testing.T) {
|
||||||
|
jsonWithTags := []byte(`{
|
||||||
|
"id": "0101010101010101010101010101010101010101010101010101010101010101",
|
||||||
|
"pubkey": "0202020202020202020202020202020202020202020202020202020202020202",
|
||||||
|
"created_at": 1609459200,
|
||||||
|
"kind": 1,
|
||||||
|
"tags": [["e", "1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef"], ["p", "abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890"]],
|
||||||
|
"content": "This is a test event",
|
||||||
|
"sig": "03030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303"
|
||||||
|
}`)
|
||||||
|
|
||||||
|
expected := &E{
|
||||||
|
Id: bytes.Repeat([]byte{0x01}, 32),
|
||||||
|
Pubkey: bytes.Repeat([]byte{0x02}, 32),
|
||||||
|
CreatedAt: timestamp.FromUnix(1609459200),
|
||||||
|
Kind: kind.TextNote,
|
||||||
|
Tags: tags.New(),
|
||||||
|
Content: []byte("This is a test event"),
|
||||||
|
Sig: bytes.Repeat([]byte{0x03}, 64),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add tags
|
||||||
|
eTag := tag.New(
|
||||||
|
"e",
|
||||||
|
"1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef",
|
||||||
|
)
|
||||||
|
pTag := tag.New(
|
||||||
|
"p",
|
||||||
|
"abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890",
|
||||||
|
)
|
||||||
|
expected.Tags.AppendTags(eTag, pTag)
|
||||||
|
|
||||||
|
actual := &E{}
|
||||||
|
_, err := Unmarshal(actual, jsonWithTags)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf(
|
||||||
|
"Failed to unmarshal JSON with tags using Unmarshal function: %v",
|
||||||
|
err,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
compareTags(t, expected.Tags, actual.Tags, "UnmarshalFunction")
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
// Test 5: Event with nested empty tags
|
||||||
|
t.Run(
|
||||||
|
"NestedEmptyTags", func(t *testing.T) {
|
||||||
|
jsonWithNestedEmptyTags := []byte(`{
|
||||||
|
"id": "0101010101010101010101010101010101010101010101010101010101010101",
|
||||||
|
"pubkey": "0202020202020202020202020202020202020202020202020202020202020202",
|
||||||
|
"created_at": 1609459200,
|
||||||
|
"kind": 1,
|
||||||
|
"tags": [[], ["e"], ["p", ""]],
|
||||||
|
"content": "This is a test event",
|
||||||
|
"sig": "03030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303"
|
||||||
|
}`)
|
||||||
|
|
||||||
|
expected := &E{
|
||||||
|
Id: bytes.Repeat([]byte{0x01}, 32),
|
||||||
|
Pubkey: bytes.Repeat([]byte{0x02}, 32),
|
||||||
|
CreatedAt: timestamp.FromUnix(1609459200),
|
||||||
|
Kind: kind.TextNote,
|
||||||
|
Tags: tags.New(),
|
||||||
|
Content: []byte("This is a test event"),
|
||||||
|
Sig: bytes.Repeat([]byte{0x03}, 64),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add tags
|
||||||
|
emptyTag := tag.New[string]()
|
||||||
|
eTag := tag.New("e")
|
||||||
|
pTag := tag.New("p", "")
|
||||||
|
expected.Tags.AppendTags(emptyTag, eTag, pTag)
|
||||||
|
|
||||||
|
actual := &E{}
|
||||||
|
_, err := actual.Unmarshal(jsonWithNestedEmptyTags)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf(
|
||||||
|
"Failed to unmarshal JSON with nested empty tags: %v", err,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
compareTags(t, expected.Tags, actual.Tags, "NestedEmptyTags")
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}
|
||||||
@@ -378,6 +378,7 @@ var Map = map[uint16]string{
|
|||||||
SearchRelaysList.K: "SearchRelaysList",
|
SearchRelaysList.K: "SearchRelaysList",
|
||||||
InterestsList.K: "InterestsList",
|
InterestsList.K: "InterestsList",
|
||||||
UserEmojiList.K: "UserEmojiList",
|
UserEmojiList.K: "UserEmojiList",
|
||||||
|
DMRelaysList.K: "DMRelaysList",
|
||||||
FileStorageServerList.K: "FileStorageServerList",
|
FileStorageServerList.K: "FileStorageServerList",
|
||||||
NWCWalletInfo.K: "NWCWalletInfo",
|
NWCWalletInfo.K: "NWCWalletInfo",
|
||||||
LightningPubRPC.K: "LightningPubRPC",
|
LightningPubRPC.K: "LightningPubRPC",
|
||||||
|
|||||||
@@ -20,12 +20,8 @@ type I interface {
|
|||||||
authedPubkey []byte, remote string,
|
authedPubkey []byte, remote string,
|
||||||
) (allowed *filters.T, accept bool, modified bool)
|
) (allowed *filters.T, accept bool, modified bool)
|
||||||
AddEvent(
|
AddEvent(
|
||||||
c context.T, rl relay.I, ev *event.E, hr *http.Request,
|
c context.T, rl relay.I, ev *event.E, hr *http.Request, origin string,
|
||||||
origin string, authedPubkey []byte,
|
) (accepted bool, message []byte)
|
||||||
) (
|
|
||||||
accepted bool,
|
|
||||||
message []byte,
|
|
||||||
)
|
|
||||||
Context() context.T
|
Context() context.T
|
||||||
Publisher() *publish.S
|
Publisher() *publish.S
|
||||||
Publish(c context.T, evt *event.E) (err error)
|
Publish(c context.T, evt *event.E) (err error)
|
||||||
|
|||||||
43
pkg/protocol/auth/check-privilege.go
Normal file
43
pkg/protocol/auth/check-privilege.go
Normal file
@@ -0,0 +1,43 @@
|
|||||||
|
package auth
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"orly.dev/pkg/encoders/event"
|
||||||
|
"orly.dev/pkg/encoders/hex"
|
||||||
|
"orly.dev/pkg/encoders/tag"
|
||||||
|
)
|
||||||
|
|
||||||
|
func CheckPrivilege(authedPubkey []byte, ev *event.E) (privileged bool) {
|
||||||
|
if ev.Kind.IsPrivileged() {
|
||||||
|
if len(authedPubkey) == 0 {
|
||||||
|
// this is a shortcut because none of the following
|
||||||
|
// tests would return true.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// authed users when auth is required must be present in the
|
||||||
|
// event if it is privileged.
|
||||||
|
privileged = bytes.Equal(ev.Pubkey, authedPubkey)
|
||||||
|
// if the authed pubkey matches the event author, it is
|
||||||
|
// allowed.
|
||||||
|
if !privileged {
|
||||||
|
// check whether one of the p (mention) tags is
|
||||||
|
// present designating the authed pubkey, as this means
|
||||||
|
// the author wants the designated pubkey to be able to
|
||||||
|
// access the event. this is the case for nip-4, nip-44
|
||||||
|
// DMs, and gift-wraps. The query would usually have
|
||||||
|
// been for precisely a p tag with their pubkey.
|
||||||
|
eTags := ev.Tags.GetAll(tag.New("p"))
|
||||||
|
var hexAuthedKey []byte
|
||||||
|
hex.EncAppend(hexAuthedKey, authedPubkey)
|
||||||
|
for _, e := range eTags.ToSliceOfTags() {
|
||||||
|
if bytes.Equal(e.Value(), hexAuthedKey) {
|
||||||
|
privileged = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
privileged = true
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
44
pkg/protocol/openapi/api-reference.js
Normal file
44
pkg/protocol/openapi/api-reference.js
Normal file
File diff suppressed because one or more lines are too long
62
pkg/protocol/openapi/huma.go
Normal file
62
pkg/protocol/openapi/huma.go
Normal file
@@ -0,0 +1,62 @@
|
|||||||
|
package openapi
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net/http"
|
||||||
|
|
||||||
|
"github.com/danielgtaylor/huma/v2"
|
||||||
|
"github.com/danielgtaylor/huma/v2/adapters/humago"
|
||||||
|
|
||||||
|
"orly.dev/pkg/protocol/servemux"
|
||||||
|
"orly.dev/pkg/utils/lol"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ExposeMiddleware adds the http.Request and http.ResponseWriter to the context
|
||||||
|
// for the Operations handler.
|
||||||
|
func ExposeMiddleware(ctx huma.Context, next func(huma.Context)) {
|
||||||
|
lol.Tracer("ExposeMiddleware")
|
||||||
|
defer func() { lol.Tracer("end ExposeMiddleware") }()
|
||||||
|
// Unwrap the request and response objects.
|
||||||
|
r, w := humago.Unwrap(ctx)
|
||||||
|
ctx = huma.WithValue(ctx, "http-request", r)
|
||||||
|
ctx = huma.WithValue(ctx, "http-response", w)
|
||||||
|
next(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewHuma creates a new huma.API with a Scalar docs UI, and a middleware that allows methods to
|
||||||
|
// access the http.Request and http.ResponseWriter.
|
||||||
|
func NewHuma(
|
||||||
|
router *servemux.S, name, version, description string,
|
||||||
|
) (api huma.API) {
|
||||||
|
lol.Tracer("NewHuma", name, version, description)
|
||||||
|
defer func() { lol.Tracer("end NewHuma") }()
|
||||||
|
config := huma.DefaultConfig(name, version)
|
||||||
|
config.Info.Description = description
|
||||||
|
config.DocsPath = ""
|
||||||
|
config.OpenAPIPath = "/api/openapi"
|
||||||
|
router.HandleFunc(
|
||||||
|
"/api", func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.Header().Set("Content-Type", "text/html")
|
||||||
|
w.Write(
|
||||||
|
[]byte(`<!DOCTYPE html>
|
||||||
|
<html lang="en">
|
||||||
|
<head>
|
||||||
|
<title>realy HTTP API UI</title>
|
||||||
|
<meta charset="utf-8" />
|
||||||
|
<meta
|
||||||
|
name="viewport"
|
||||||
|
content="width=device-width, initial-scale=1" />
|
||||||
|
</head>
|
||||||
|
<body>
|
||||||
|
<script
|
||||||
|
id="api-reference"
|
||||||
|
data-url="/api/openapi.json"></script>
|
||||||
|
<script src="https://cdn.jsdelivr.net/npm/@scalar/api-reference"></script>
|
||||||
|
</body>
|
||||||
|
</html>`),
|
||||||
|
)
|
||||||
|
},
|
||||||
|
)
|
||||||
|
api = humago.New(router, config)
|
||||||
|
api.UseMiddleware(ExposeMiddleware)
|
||||||
|
return
|
||||||
|
}
|
||||||
27
pkg/protocol/openapi/openapi.go
Normal file
27
pkg/protocol/openapi/openapi.go
Normal file
@@ -0,0 +1,27 @@
|
|||||||
|
package openapi
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/danielgtaylor/huma/v2"
|
||||||
|
|
||||||
|
"orly.dev/pkg/interfaces/server"
|
||||||
|
"orly.dev/pkg/protocol/servemux"
|
||||||
|
"orly.dev/pkg/utils/lol"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Operations struct {
|
||||||
|
server.I
|
||||||
|
path string
|
||||||
|
*servemux.S
|
||||||
|
}
|
||||||
|
|
||||||
|
// New creates a new openapi.Operations and registers its methods.
|
||||||
|
func New(
|
||||||
|
s server.I, name, version, description string, path string,
|
||||||
|
sm *servemux.S,
|
||||||
|
) {
|
||||||
|
lol.Tracer("New", name, version, description, path)
|
||||||
|
defer func() { lol.Tracer("end New") }()
|
||||||
|
a := NewHuma(sm, name, version, description)
|
||||||
|
huma.AutoRegister(a, &Operations{I: s, path: path})
|
||||||
|
return
|
||||||
|
}
|
||||||
@@ -161,16 +161,6 @@ func (a *A) HandleEvent(
|
|||||||
if len(split) != 3 {
|
if len(split) != 3 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// Check if the deletion event is trying to delete itself
|
|
||||||
if bytes.Equal(split[2], env.E.Id) {
|
|
||||||
if err = Ok.Blocked(
|
|
||||||
a, env,
|
|
||||||
"deletion event cannot reference its own ID",
|
|
||||||
); chk.E(err) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
var pk []byte
|
var pk []byte
|
||||||
if pk, err = hex.DecAppend(nil, split[1]); chk.E(err) {
|
if pk, err = hex.DecAppend(nil, split[1]); chk.E(err) {
|
||||||
if err = Ok.Invalid(
|
if err = Ok.Invalid(
|
||||||
@@ -185,7 +175,8 @@ func (a *A) HandleEvent(
|
|||||||
kin := ints.New(uint16(0))
|
kin := ints.New(uint16(0))
|
||||||
if _, err = kin.Unmarshal(split[0]); chk.E(err) {
|
if _, err = kin.Unmarshal(split[0]); chk.E(err) {
|
||||||
if err = Ok.Invalid(
|
if err = Ok.Invalid(
|
||||||
a, env, "delete event a tag kind value invalid: %s",
|
a, env, "delete event a tag kind value "+
|
||||||
|
"invalid: %s",
|
||||||
t.Value(),
|
t.Value(),
|
||||||
); chk.E(err) {
|
); chk.E(err) {
|
||||||
return
|
return
|
||||||
@@ -195,7 +186,8 @@ func (a *A) HandleEvent(
|
|||||||
kk := kind.New(kin.Uint16())
|
kk := kind.New(kin.Uint16())
|
||||||
if kk.Equal(kind.Deletion) {
|
if kk.Equal(kind.Deletion) {
|
||||||
if err = Ok.Blocked(
|
if err = Ok.Blocked(
|
||||||
a, env, "delete event kind may not be deleted",
|
a, env, "delete event kind may not be "+
|
||||||
|
"deleted",
|
||||||
); chk.E(err) {
|
); chk.E(err) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -204,7 +196,8 @@ func (a *A) HandleEvent(
|
|||||||
if !kk.IsParameterizedReplaceable() {
|
if !kk.IsParameterizedReplaceable() {
|
||||||
if err = Ok.Error(
|
if err = Ok.Error(
|
||||||
a, env,
|
a, env,
|
||||||
"delete tags with a tags containing non-parameterized-replaceable events can't be processed",
|
"delete tags with a tags containing "+
|
||||||
|
"non-parameterized-replaceable events can't be processed",
|
||||||
); chk.E(err) {
|
); chk.E(err) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -325,9 +318,7 @@ func (a *A) HandleEvent(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
var reason []byte
|
var reason []byte
|
||||||
ok, reason = srv.AddEvent(
|
ok, reason = srv.AddEvent(c, rl, env.E, a.Req(), a.RealRemote())
|
||||||
c, rl, env.E, a.Req(), a.RealRemote(), a.Listener.AuthedPubkey(),
|
|
||||||
)
|
|
||||||
log.I.F("event %0x added %v, %s", env.E.Id, ok, reason)
|
log.I.F("event %0x added %v, %s", env.E.Id, ok, reason)
|
||||||
if err = okenvelope.NewFrom(env.E.Id, ok).Write(a.Listener); chk.E(err) {
|
if err = okenvelope.NewFrom(env.E.Id, ok).Write(a.Listener); chk.E(err) {
|
||||||
return
|
return
|
||||||
|
|||||||
@@ -26,6 +26,7 @@ import (
|
|||||||
// corresponding handler method, generates a notice for errors or unknown types,
|
// corresponding handler method, generates a notice for errors or unknown types,
|
||||||
// logs the notice, and writes it back to the listener if required.
|
// logs the notice, and writes it back to the listener if required.
|
||||||
func (a *A) HandleMessage(msg []byte) {
|
func (a *A) HandleMessage(msg []byte) {
|
||||||
|
log.T.F("%s received message:\n%s", a.Listener.RealRemote(), string(msg))
|
||||||
var notice []byte
|
var notice []byte
|
||||||
var err error
|
var err error
|
||||||
var t string
|
var t string
|
||||||
|
|||||||
@@ -1,7 +1,6 @@
|
|||||||
package socketapi
|
package socketapi
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"errors"
|
"errors"
|
||||||
"github.com/dgraph-io/badger/v4"
|
"github.com/dgraph-io/badger/v4"
|
||||||
"orly.dev/pkg/encoders/envelopes/closedenvelope"
|
"orly.dev/pkg/encoders/envelopes/closedenvelope"
|
||||||
@@ -9,9 +8,8 @@ import (
|
|||||||
"orly.dev/pkg/encoders/envelopes/eventenvelope"
|
"orly.dev/pkg/encoders/envelopes/eventenvelope"
|
||||||
"orly.dev/pkg/encoders/envelopes/reqenvelope"
|
"orly.dev/pkg/encoders/envelopes/reqenvelope"
|
||||||
"orly.dev/pkg/encoders/event"
|
"orly.dev/pkg/encoders/event"
|
||||||
"orly.dev/pkg/encoders/hex"
|
|
||||||
"orly.dev/pkg/encoders/tag"
|
|
||||||
"orly.dev/pkg/interfaces/server"
|
"orly.dev/pkg/interfaces/server"
|
||||||
|
"orly.dev/pkg/protocol/auth"
|
||||||
"orly.dev/pkg/utils/chk"
|
"orly.dev/pkg/utils/chk"
|
||||||
"orly.dev/pkg/utils/context"
|
"orly.dev/pkg/utils/context"
|
||||||
"orly.dev/pkg/utils/log"
|
"orly.dev/pkg/utils/log"
|
||||||
@@ -80,8 +78,7 @@ func (a *A) HandleReq(
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if events, err = sto.QueryEvents(c, f); chk.E(err) {
|
if events, err = sto.QueryEvents(c, f); err != nil {
|
||||||
log.E.F("eventstore: %v", err)
|
|
||||||
if errors.Is(err, badger.ErrDBClosed) {
|
if errors.Is(err, badger.ErrDBClosed) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -91,39 +88,14 @@ func (a *A) HandleReq(
|
|||||||
if srv.AuthRequired() {
|
if srv.AuthRequired() {
|
||||||
var tmp event.S
|
var tmp event.S
|
||||||
for _, ev := range events {
|
for _, ev := range events {
|
||||||
if ev.Kind.IsPrivileged() {
|
if !auth.CheckPrivilege(a.Listener.AuthedPubkey(), ev) {
|
||||||
authedPubkey := a.Listener.AuthedPubkey()
|
log.W.F(
|
||||||
if len(authedPubkey) == 0 {
|
"not privileged %0x ev pubkey %0x ev pubkey %0x kind %s privileged: %v",
|
||||||
// this is a shortcut because none of the following
|
a.Listener.AuthedPubkey(), ev.Pubkey,
|
||||||
// tests would return true.
|
a.Listener.AuthedPubkey(), ev.Kind.Name(),
|
||||||
continue
|
ev.Kind.IsPrivileged(),
|
||||||
}
|
)
|
||||||
// authed users when auth is required must be present in the
|
continue
|
||||||
// event if it is privileged.
|
|
||||||
authedIsAuthor := bytes.Equal(ev.Pubkey, authedPubkey)
|
|
||||||
// if the authed pubkey matches the event author, it is
|
|
||||||
// allowed.
|
|
||||||
if !authedIsAuthor {
|
|
||||||
// check whether one of the p (mention) tags is
|
|
||||||
// present designating the authed pubkey, as this means
|
|
||||||
// the author wants the designated pubkey to be able to
|
|
||||||
// access the event. this is the case for nip-4, nip-44
|
|
||||||
// DMs, and gift-wraps. The query would usually have
|
|
||||||
// been for precisely a p tag with their pubkey.
|
|
||||||
eTags := ev.Tags.GetAll(tag.New("p"))
|
|
||||||
var hexAuthedKey []byte
|
|
||||||
hex.EncAppend(hexAuthedKey, authedPubkey)
|
|
||||||
var authedIsMentioned bool
|
|
||||||
for _, e := range eTags.ToSliceOfTags() {
|
|
||||||
if bytes.Equal(e.Value(), hexAuthedKey) {
|
|
||||||
authedIsMentioned = true
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if !authedIsMentioned {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
tmp = append(tmp, ev)
|
tmp = append(tmp, ev)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,19 +5,16 @@ import (
|
|||||||
"orly.dev/pkg/encoders/event"
|
"orly.dev/pkg/encoders/event"
|
||||||
"orly.dev/pkg/encoders/filters"
|
"orly.dev/pkg/encoders/filters"
|
||||||
"orly.dev/pkg/interfaces/publisher"
|
"orly.dev/pkg/interfaces/publisher"
|
||||||
|
"orly.dev/pkg/interfaces/server"
|
||||||
|
"orly.dev/pkg/protocol/auth"
|
||||||
"orly.dev/pkg/protocol/ws"
|
"orly.dev/pkg/protocol/ws"
|
||||||
"orly.dev/pkg/utils/chk"
|
"orly.dev/pkg/utils/chk"
|
||||||
"orly.dev/pkg/utils/log"
|
"orly.dev/pkg/utils/log"
|
||||||
"regexp"
|
|
||||||
"sync"
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
const Type = "socketapi"
|
const Type = "socketapi"
|
||||||
|
|
||||||
var (
|
|
||||||
NIP20prefixmatcher = regexp.MustCompile(`^\w+: `)
|
|
||||||
)
|
|
||||||
|
|
||||||
// Map is a map of filters associated with a collection of ws.Listener
|
// Map is a map of filters associated with a collection of ws.Listener
|
||||||
// connections.
|
// connections.
|
||||||
type Map map[*ws.Listener]map[string]*filters.T
|
type Map map[*ws.Listener]map[string]*filters.T
|
||||||
@@ -57,11 +54,13 @@ type S struct {
|
|||||||
Mx sync.Mutex
|
Mx sync.Mutex
|
||||||
// Map is the map of subscribers and subscriptions from the websocket api.
|
// Map is the map of subscribers and subscriptions from the websocket api.
|
||||||
Map
|
Map
|
||||||
|
// Server is an interface to the server.
|
||||||
|
Server server.I
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ publisher.I = &S{}
|
var _ publisher.I = &S{}
|
||||||
|
|
||||||
func New() (publisher *S) { return &S{Map: make(Map)} }
|
func New(s server.I) (publisher *S) { return &S{Map: make(Map), Server: s} }
|
||||||
|
|
||||||
func (p *S) Type() (typeName string) { return Type }
|
func (p *S) Type() (typeName string) { return Type }
|
||||||
|
|
||||||
@@ -98,6 +97,7 @@ func (p *S) Receive(msg publisher.Message) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
p.Mx.Lock()
|
p.Mx.Lock()
|
||||||
|
defer p.Mx.Unlock()
|
||||||
if subs, ok := p.Map[m.Listener]; !ok {
|
if subs, ok := p.Map[m.Listener]; !ok {
|
||||||
subs = make(map[string]*filters.T)
|
subs = make(map[string]*filters.T)
|
||||||
subs[m.Id] = m.Filters
|
subs[m.Id] = m.Filters
|
||||||
@@ -112,54 +112,56 @@ func (p *S) Receive(msg publisher.Message) {
|
|||||||
"added subscription %s for %s", m.Id, m.Listener.RealRemote(),
|
"added subscription %s for %s", m.Id, m.Listener.RealRemote(),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
p.Mx.Unlock()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Deliver sends an event to all subscribers whose filters match the event
|
// Deliver processes and distributes an event to all matching subscribers based on their filter configurations.
|
||||||
//
|
//
|
||||||
// # Parameters
|
// # Parameters
|
||||||
//
|
//
|
||||||
// - ev (*event.E): The event to deliver to matching subscribers
|
// - ev (*event.E): The event to be delivered to subscribed clients.
|
||||||
//
|
//
|
||||||
// # Expected behaviour
|
// # Expected behaviour
|
||||||
//
|
//
|
||||||
// # Locks the mutex to synchronize access to subscriber data
|
// Delivers the event to all subscribers whose filters match the event. It
|
||||||
//
|
// applies authentication checks if required by the server, and skips delivery
|
||||||
// # Iterates over all websocket connections and their associated subscriptions
|
// for unauthenticated users when events are privileged.
|
||||||
//
|
|
||||||
// # Checks if each subscription's filter matches the event being delivered
|
|
||||||
//
|
|
||||||
// # Creates an event envelope result for matching subscriptions
|
|
||||||
//
|
|
||||||
// # Writes the result to the corresponding websocket connection
|
|
||||||
//
|
|
||||||
// Logs details about event delivery and any errors encountered
|
|
||||||
func (p *S) Deliver(ev *event.E) {
|
func (p *S) Deliver(ev *event.E) {
|
||||||
log.T.F("delivering event %0x to subscribers", ev.Id)
|
log.T.F("delivering event %0x to subscribers", ev.Id)
|
||||||
var err error
|
var err error
|
||||||
p.Mx.Lock()
|
p.Mx.Lock()
|
||||||
|
defer p.Mx.Unlock()
|
||||||
for w, subs := range p.Map {
|
for w, subs := range p.Map {
|
||||||
log.I.F("%v %s", subs, w.RealRemote())
|
// log.I.F("%v %s", subs, w.RealRemote())
|
||||||
for id, subscriber := range subs {
|
for id, subscriber := range subs {
|
||||||
log.T.F(
|
// log.T.F(
|
||||||
"subscriber %s\n%s", w.RealRemote(),
|
// "subscriber %s\n%s", w.RealRemote(),
|
||||||
subscriber.Marshal(nil),
|
// subscriber.Marshal(nil),
|
||||||
)
|
// )
|
||||||
if !subscriber.Match(ev) {
|
if !subscriber.Match(ev) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
var res *eventenvelope.Result
|
if p.Server.AuthRequired() {
|
||||||
if res, err = eventenvelope.NewResultWith(id, ev); chk.E(err) {
|
if !auth.CheckPrivilege(w.AuthedPubkey(), ev) {
|
||||||
continue
|
log.W.F(
|
||||||
|
"not privileged %0x ev pubkey %0x ev pubkey %0x kind %s privileged: %v",
|
||||||
|
w.AuthedPubkey(), ev.Pubkey,
|
||||||
|
w.AuthedPubkey(), ev.Kind.Name(),
|
||||||
|
ev.Kind.IsPrivileged(),
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
var res *eventenvelope.Result
|
||||||
|
if res, err = eventenvelope.NewResultWith(id, ev); chk.E(err) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if err = res.Write(w); chk.E(err) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
log.T.F("dispatched event %0x to subscription %s", ev.Id, id)
|
||||||
}
|
}
|
||||||
if err = res.Write(w); chk.E(err) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
log.T.F("dispatched event %0x to subscription %s", ev.Id, id)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
p.Mx.Unlock()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// removeSubscriberId removes a specific subscription from a subscriber
|
// removeSubscriberId removes a specific subscription from a subscriber
|
||||||
|
|||||||
@@ -234,9 +234,10 @@ func (r *Client) ConnectWithTLS(ctx context.T, tlsConfig *tls.Config) error {
|
|||||||
// general message reader loop
|
// general message reader loop
|
||||||
go func() {
|
go func() {
|
||||||
buf := new(bytes.Buffer)
|
buf := new(bytes.Buffer)
|
||||||
|
var err error
|
||||||
for {
|
for {
|
||||||
buf.Reset()
|
buf.Reset()
|
||||||
if err := conn.ReadMessage(r.connectionContext, buf); chk.T(err) {
|
if err = conn.ReadMessage(r.connectionContext, buf); err != nil {
|
||||||
r.ConnectionError = err
|
r.ConnectionError = err
|
||||||
r.Close()
|
r.Close()
|
||||||
break
|
break
|
||||||
@@ -270,10 +271,12 @@ func (r *Client) ConnectWithTLS(ctx context.T, tlsConfig *tls.Config) error {
|
|||||||
}
|
}
|
||||||
r.challenge = env.Challenge
|
r.challenge = env.Challenge
|
||||||
case eventenvelope.L:
|
case eventenvelope.L:
|
||||||
|
// log.I.F("message: %s", message)
|
||||||
env := eventenvelope.NewResult()
|
env := eventenvelope.NewResult()
|
||||||
if env, message, err = eventenvelope.ParseResult(message); chk.E(err) {
|
if env, message, err = eventenvelope.ParseResult(message); err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
// log.I.F("%s", env.Event.Marshal(nil))
|
||||||
if len(env.Subscription.T) == 0 {
|
if len(env.Subscription.T) == 0 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@@ -497,12 +500,13 @@ func (r *Client) PrepareSubscription(
|
|||||||
return sub
|
return sub
|
||||||
}
|
}
|
||||||
|
|
||||||
// QuerySync is only used in tests. The realy query method is synchronous now
|
// QuerySync is only used in tests. The relay query method is synchronous now
|
||||||
// anyway (it ensures sort order is respected).
|
// anyway (it ensures sort order is respected).
|
||||||
func (r *Client) QuerySync(
|
func (r *Client) QuerySync(
|
||||||
ctx context.T, f *filter.F,
|
ctx context.T, f *filter.F,
|
||||||
opts ...SubscriptionOption,
|
opts ...SubscriptionOption,
|
||||||
) ([]*event.E, error) {
|
) ([]*event.E, error) {
|
||||||
|
// log.T.F("QuerySync:\n%s", f.Marshal(nil))
|
||||||
sub, err := r.Subscribe(ctx, filters.New(f), opts...)
|
sub, err := r.Subscribe(ctx, filters.New(f), opts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ import (
|
|||||||
"bytes"
|
"bytes"
|
||||||
"compress/flate"
|
"compress/flate"
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"errors"
|
"fmt"
|
||||||
"github.com/gobwas/httphead"
|
"github.com/gobwas/httphead"
|
||||||
"github.com/gobwas/ws"
|
"github.com/gobwas/ws"
|
||||||
"github.com/gobwas/ws/wsflate"
|
"github.com/gobwas/ws/wsflate"
|
||||||
@@ -113,7 +113,10 @@ func NewConnection(
|
|||||||
func (cn *Connection) WriteMessage(c context.T, data []byte) (err error) {
|
func (cn *Connection) WriteMessage(c context.T, data []byte) (err error) {
|
||||||
select {
|
select {
|
||||||
case <-c.Done():
|
case <-c.Done():
|
||||||
return errors.New("context canceled")
|
return errorf.E(
|
||||||
|
"%s context canceled",
|
||||||
|
cn.conn.RemoteAddr(),
|
||||||
|
)
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
if cn.msgStateW.IsCompressed() && cn.enableCompression {
|
if cn.msgStateW.IsCompressed() && cn.enableCompression {
|
||||||
@@ -121,19 +124,35 @@ func (cn *Connection) WriteMessage(c context.T, data []byte) (err error) {
|
|||||||
if _, err := io.Copy(
|
if _, err := io.Copy(
|
||||||
cn.flateWriter, bytes.NewReader(data),
|
cn.flateWriter, bytes.NewReader(data),
|
||||||
); chk.T(err) {
|
); chk.T(err) {
|
||||||
return errorf.E("failed to write message: %w", err)
|
return errorf.E(
|
||||||
|
"%s failed to write message: %w",
|
||||||
|
cn.conn.RemoteAddr(),
|
||||||
|
err,
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := cn.flateWriter.Close(); chk.T(err) {
|
if err := cn.flateWriter.Close(); chk.T(err) {
|
||||||
return errorf.E("failed to close flate writer: %w", err)
|
return errorf.E(
|
||||||
|
"%s failed to close flate writer: %w",
|
||||||
|
cn.conn.RemoteAddr(),
|
||||||
|
err,
|
||||||
|
)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if _, err := io.Copy(cn.writer, bytes.NewReader(data)); chk.T(err) {
|
if _, err := io.Copy(cn.writer, bytes.NewReader(data)); chk.T(err) {
|
||||||
return errorf.E("failed to write message: %w", err)
|
return errorf.E(
|
||||||
|
"%s failed to write message: %w",
|
||||||
|
cn.conn.RemoteAddr(),
|
||||||
|
err,
|
||||||
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err := cn.writer.Flush(); chk.T(err) {
|
if err := cn.writer.Flush(); chk.T(err) {
|
||||||
return errorf.E("failed to flush writer: %w", err)
|
return errorf.E(
|
||||||
|
"%s failed to flush writer: %w",
|
||||||
|
cn.conn.RemoteAddr(),
|
||||||
|
err,
|
||||||
|
)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -143,34 +162,57 @@ func (cn *Connection) ReadMessage(c context.T, buf io.Writer) (err error) {
|
|||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-c.Done():
|
case <-c.Done():
|
||||||
return errors.New("context canceled")
|
return errorf.D(
|
||||||
|
"%s context canceled",
|
||||||
|
cn.conn.RemoteAddr(),
|
||||||
|
)
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
h, err := cn.reader.NextFrame()
|
h, err := cn.reader.NextFrame()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cn.conn.Close()
|
cn.conn.Close()
|
||||||
return errorf.E("failed to advance frame: %w", err)
|
return fmt.Errorf(
|
||||||
|
"%s failed to advance frame: %s",
|
||||||
|
cn.conn.RemoteAddr(),
|
||||||
|
err.Error(),
|
||||||
|
)
|
||||||
}
|
}
|
||||||
if h.OpCode.IsControl() {
|
if h.OpCode.IsControl() {
|
||||||
if err := cn.controlHandler(h, cn.reader); chk.T(err) {
|
if err := cn.controlHandler(h, cn.reader); chk.T(err) {
|
||||||
return errorf.E("failed to handle control frame: %w", err)
|
return errorf.E(
|
||||||
|
"%s failed to handle control frame: %w",
|
||||||
|
cn.conn.RemoteAddr(),
|
||||||
|
err,
|
||||||
|
)
|
||||||
}
|
}
|
||||||
} else if h.OpCode == ws.OpBinary ||
|
} else if h.OpCode == ws.OpBinary ||
|
||||||
h.OpCode == ws.OpText {
|
h.OpCode == ws.OpText {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
if err := cn.reader.Discard(); chk.T(err) {
|
if err := cn.reader.Discard(); chk.T(err) {
|
||||||
return errorf.E("failed to discard: %w", err)
|
return errorf.E(
|
||||||
|
"%s failed to discard: %w",
|
||||||
|
cn.conn.RemoteAddr(),
|
||||||
|
err,
|
||||||
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if cn.msgStateR.IsCompressed() && cn.enableCompression {
|
if cn.msgStateR.IsCompressed() && cn.enableCompression {
|
||||||
cn.flateReader.Reset(cn.reader)
|
cn.flateReader.Reset(cn.reader)
|
||||||
if _, err := io.Copy(buf, cn.flateReader); chk.T(err) {
|
if _, err := io.Copy(buf, cn.flateReader); chk.T(err) {
|
||||||
return errorf.E("failed to read message: %w", err)
|
return errorf.E(
|
||||||
|
"%s failed to read message: %w",
|
||||||
|
cn.conn.RemoteAddr(),
|
||||||
|
err,
|
||||||
|
)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if _, err := io.Copy(buf, cn.reader); chk.T(err) {
|
if _, err := io.Copy(buf, cn.reader); chk.T(err) {
|
||||||
return errorf.E("failed to read message: %w", err)
|
return errorf.E(
|
||||||
|
"%s failed to read message: %w",
|
||||||
|
cn.conn.RemoteAddr(),
|
||||||
|
err,
|
||||||
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
@@ -10,7 +10,6 @@ import (
|
|||||||
"orly.dev/pkg/utils/chk"
|
"orly.dev/pkg/utils/chk"
|
||||||
"orly.dev/pkg/utils/context"
|
"orly.dev/pkg/utils/context"
|
||||||
"orly.dev/pkg/utils/errorf"
|
"orly.dev/pkg/utils/errorf"
|
||||||
"orly.dev/pkg/utils/log"
|
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
@@ -158,7 +157,6 @@ func (sub *Subscription) Close() {
|
|||||||
closeMsg := closeenvelope.NewFrom(id)
|
closeMsg := closeenvelope.NewFrom(id)
|
||||||
var b []byte
|
var b []byte
|
||||||
b = closeMsg.Marshal(nil)
|
b = closeMsg.Marshal(nil)
|
||||||
log.T.F("{%s} sending %s", sub.Relay.URL, b)
|
|
||||||
<-sub.Relay.Write(b)
|
<-sub.Relay.Write(b)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user