massive cleanup

moves one type into its own package, but no actual code changes, just grouping and sorting
This commit is contained in:
2024-11-30 12:10:39 +00:00
parent 3efd0ce9f5
commit 4f695ff517
14 changed files with 983 additions and 1212 deletions

0
_util.go Normal file
View File

1
go.mod
View File

@@ -4,6 +4,7 @@ go 1.23.1
require (
github.com/alexflint/go-arg v1.5.1
github.com/dave/dst v0.27.3
github.com/davecgh/go-spew v1.1.1
github.com/dgraph-io/badger/v4 v4.3.1
github.com/fasthttp/websocket v1.5.10

11
go.sum
View File

@@ -22,6 +22,8 @@ github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMn
github.com/chzyer/test v1.0.0/go.mod h1:2JlltgoNkt4TW/z9V/IzDdFaMTM2JPIi26O1pF38GC8=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/dave/dst v0.27.3 h1:P1HPoMza3cMEquVf9kKy8yXsFirry4zEnWOdYPOoIzY=
github.com/dave/dst v0.27.3/go.mod h1:jHh6EOibnHgcUW3WjKHisiooEkYwqpHLBSX1iOBhEyc=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@@ -81,8 +83,6 @@ github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/pprof v0.0.0-20211214055906-6f57359322fd/go.mod h1:KgnwoLYCZ8IQu3XUZ8Nc/bM9CCZFOyjUNOSygVozoDg=
github.com/google/pprof v0.0.0-20240227163752-401108e1b7e7/go.mod h1:czg5+yv1E0ZGTi6S6vVK1mke0fV+FaUhNGcd6VRS9Ik=
github.com/google/pprof v0.0.0-20241023014458-598669927662 h1:SKMkD83p7FwUqKmBsPdLHF5dNyxq3jOWwu9w9UyH5vA=
github.com/google/pprof v0.0.0-20241023014458-598669927662/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144=
github.com/google/pprof v0.0.0-20241122213907-cbe949e5a41b h1:SXO0REt4iu865upYCk8aKBBJQ4BqoE0ReP23ClMu60s=
github.com/google/pprof v0.0.0-20241122213907-cbe949e5a41b/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
@@ -95,8 +95,6 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc=
github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
github.com/klauspost/cpuid/v2 v2.2.8 h1:+StwCXwm9PdpiEkPyzBXIy+M9KUb4ODm0Zarf1kS5BM=
github.com/klauspost/cpuid/v2 v2.2.8/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
github.com/klauspost/cpuid/v2 v2.2.9 h1:66ze0taIn2H33fBvCkXuv9BmCwDfafmiIVpKV9kKGuY=
github.com/klauspost/cpuid/v2 v2.2.9/go.mod h1:rqkxqrZ1EhYM9G+hXH7YdowN5R5RGN6NK4QwQ3WMXF8=
github.com/kopoli/go-terminal-size v0.0.0-20170219200355-5c97524c8b54 h1:0SMHxjkLKNawqUjjnMlCtEdj6uWZjv0+qDZ3F6GOADI=
@@ -159,8 +157,6 @@ golang.org/x/exp/typeparams v0.0.0-20241009180824-f66d83c29e7c/go.mod h1:AbB0pIl
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 h1:VLliZ0d+/avPrXXH+OakdXhpJuEoBZuwh1m2j7U6Iug=
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/lint v0.0.0-20241112194109-818c5a804067 h1:adDmSQyFTCiv19j015EGKJBoaa7ElV0Q1Wovb/4G7NA=
golang.org/x/lint v0.0.0-20241112194109-818c5a804067/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
@@ -194,7 +190,6 @@ golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220310020820-b874c991c1a5/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo=
golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
@@ -238,8 +233,6 @@ google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA=
google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
google.golang.org/protobuf v1.35.2 h1:8Ar7bF+apOIoThw1EdZl0p1oWvMqTHmpA2fRTyZO8io=
google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=

View File

@@ -55,6 +55,8 @@ github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1 h1:q763qf9huN11kDQavWs
github.com/chzyer/test v1.0.0 h1:p3BQDXSxOhOG0P9z6/hGnII4LGiEPOYBhs8asl/fC04=
github.com/client9/misspell v0.3.4 h1:ta993UF76GwbvJcIo3Y68y/M3WxlpEHPWIGDkJYwzJI=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f h1:WBZRG4aNOuI15bLRrCgN8fCq8E5Xuty6jGbmSNEvSsU=
github.com/dave/jennifer v1.5.0 h1:HmgPN93bVDpkQyYbqhCHj5QlgvUkvEOzMyEvKLgCRrg=
github.com/dave/jennifer v1.5.0/go.mod h1:4MnyiFIlZS3l5tSDn8VnzE6ffAhYBMB2SZntBsZGUok=
github.com/dgraph-io/badger/v4 v4.2.0 h1:kJrlajbXXL9DFTNuhhu9yCx7JJa4qpYWxtE8BzuWsEs=
github.com/dgraph-io/badger/v4 v4.2.0/go.mod h1:qfCqhPoWDFJRx1gp5QwwyGo8xk1lbHUxvK9nK0OGAak=
github.com/dgraph-io/ristretto v0.1.1 h1:6CWw5tJNgpegArSHpNHJKldNeq03FQCwYvfMVWajOK8=
@@ -115,6 +117,8 @@ github.com/opensearch-project/opensearch-go/v4 v4.0.0 h1:Nrh30HhaknKcaPcIzlqA6Jf
github.com/opensearch-project/opensearch-go/v4 v4.0.0/go.mod h1:amlBgHgAX9AwwW50eOuzYa5n/8aD18LoWO8eDLoe8KQ=
github.com/orisano/pixelmatch v0.0.0-20220722002657-fb0b55479cde h1:x0TT0RDC7UhAVbbWWBzr41ElhJx5tXPWkIHA2HWPRuw=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4 h1:gQz4mCbXsO+nc9n1hCxHcGA3Zx3Eo+UHZoInFGUIXNM=
github.com/sergi/go-diff v1.2.0 h1:XU+rvMAioB0UC3q1MFrIQy4Vo5/4VsRDQQXHsEya6xQ=
github.com/sergi/go-diff v1.2.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
github.com/spf13/cobra v1.8.1 h1:e5/vxKd/rZsfSJMUX1agtjeTDf+qv1/JdBF8gg5k9ZM=
github.com/spf13/cobra v1.8.1/go.mod h1:wHxEcudfqmLYa8iTfL+OuZPbBZkmvliBWKIezN3kD9Y=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
@@ -131,7 +135,6 @@ github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVS
github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc=
github.com/xi2/xz v0.0.0-20171230120015-48954b6210f8 h1:nIPpBwaJSVYIxUFsDv3M8ofmx9yWTog9BfvIu0q41lo=
github.com/xi2/xz v0.0.0-20171230120015-48954b6210f8/go.mod h1:HUYIGzjTL3rfEspMxjDjgmT5uz5wzYJKVo23qUhYTos=
github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU=
github.com/yuin/goldmark v1.2.1 h1:ruQGxdhGHe7FWOJPT0mKs5+pD2Xs1Bm/kdGlHO04FmM=
github.com/yuin/goldmark v1.4.13 h1:fVcFKWvrslecOb/tg+Cc05dkeYx540o0FuFt3nUVDoE=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
@@ -166,3 +169,5 @@ google.golang.org/grpc v1.56.3 h1:8I4C0Yq1EjstUzUJzpcRVbuYA2mODtEmpWiQoN/b2nc=
google.golang.org/grpc v1.56.3/go.mod h1:I9bI3vqKfayGqPUAwGdOSu7kt6oIJLixfffKrpXqQ9s=
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/src-d/go-billy.v4 v4.3.2 h1:0SQA1pRztfTFx2miS8sA97XvooFeNOmvUenF4o0EcVg=
gopkg.in/src-d/go-billy.v4 v4.3.2/go.mod h1:nDjArDMp+XMs1aFAESLRjfGSgfvoYN0hDfzEk0GjC98=

View File

@@ -1,86 +0,0 @@
package realy
import (
"fmt"
"net/http"
"regexp"
"strings"
"realy.lol/event"
"realy.lol/normalize"
"realy.lol/relay"
"realy.lol/relay/wrapper"
"realy.lol/store"
)
var nip20prefixmatcher = regexp.MustCompile(`^\w+: `)
// AddEvent has a business rule to add an event to the event store
func AddEvent(c Ctx, rl relay.I, ev *event.T, hr *http.Request, origin S,
authedPubkey B) (accepted bool, message B) {
if ev == nil {
return false, normalize.Invalid.F("empty event")
}
sto := rl.Storage(c)
wrapper := &wrapper.RelayWrapper{I: sto}
advancedSaver, _ := sto.(relay.AdvancedSaver)
accept, notice, after := rl.AcceptEvent(c, ev, hr, origin, authedPubkey)
if !accept {
return false, normalize.Blocked.F(notice)
}
if ev.Tags.ContainsProtectedMarker() {
if len(authedPubkey) == 0 || !equals(ev.PubKey, authedPubkey) {
return false, B(fmt.Sprintf(
"event with relay marker tag '-' may only be published by matching npub: %0x is not %0x",
authedPubkey, ev.PubKey))
}
}
if ev.Kind.IsEphemeral() {
// do not store ephemeral events
} else {
if advancedSaver != nil {
advancedSaver.BeforeSave(c, ev)
}
if saveErr := wrapper.Publish(c, ev); chk.E(saveErr) {
switch saveErr {
case store.ErrDupEvent:
return false, normalize.Error.F(saveErr.Error())
default:
errmsg := saveErr.Error()
if nip20prefixmatcher.MatchString(errmsg) {
if strings.Contains(errmsg, "tombstone") {
return false, normalize.Blocked.F(
"event was deleted, not storing it again")
}
return false, normalize.Error.F(errmsg)
} else {
return false, normalize.Error.F("failed to save (%s)", errmsg)
}
}
// } else {
// log.D.F("saved event %s", ev.Serialize())
}
if advancedSaver != nil {
advancedSaver.AfterSave(ev)
}
}
var authRequired bool
if ar, ok := rl.(relay.Authenticator); ok {
authRequired = ar.AuthEnabled()
}
// if the AcceptEvent function returned a closure we run it after the publish
// has been done because at least for now this means it is an updated follow
// list of an owner or their follows
if after != nil {
after()
}
notifyListeners(authRequired, ev)
accepted = true
return
}

View File

@@ -1,118 +0,0 @@
package realy
import (
"crypto/subtle"
"fmt"
"io"
"net/http"
"strings"
"realy.lol/cmd/realy/app"
"realy.lol/context"
"realy.lol/hex"
"realy.lol/sha256"
)
func (s *Server) HTTPAuth(r *http.Request) (authed bool) {
username, password, ok := r.BasicAuth()
if ok {
// Calculate SHA-256 hashes for the provided and expected
// usernames and passwords.
usernameHash := sha256.Sum256(B(username))
passwordHash := sha256.Sum256(B(password))
expectedUsernameHash := sha256.Sum256(B(s.adminUser))
expectedPasswordHash := sha256.Sum256(B(s.adminPass))
// Use the subtle.ConstantTimeCompare() function to check if
// the provided username and password hashes equal the
// expected username and password hashes. ConstantTimeCompare
// will return 1 if the values are equal, or 0 otherwise.
// Importantly, we should to do the work to evaluate both the
// username and password before checking the return values to
// avoid leaking information.
usernameMatch := subtle.ConstantTimeCompare(usernameHash[:],
expectedUsernameHash[:]) == 1
passwordMatch := subtle.ConstantTimeCompare(passwordHash[:],
expectedPasswordHash[:]) == 1
if usernameMatch && passwordMatch {
return true
}
}
return
}
func (s *Server) AuthFail(w http.ResponseWriter) {
w.Header().Set("WWW-Authenticate", `Basic realm="restricted", charset="UTF-8"`)
http.Error(w, "Unauthorized", http.StatusUnauthorized)
}
func (s *Server) HandleAdmin(w http.ResponseWriter, r *http.Request) {
switch {
case strings.HasPrefix(r.URL.Path, "/export"):
if ok := s.HTTPAuth(r); !ok {
s.AuthFail(w)
return
}
log.I.F("export of event data requested on admin port")
store := s.relay.Storage(context.Bg())
if strings.Count(r.URL.Path, "/") > 1 {
split := strings.Split(r.URL.Path, "/")
// there should be 3 for a valid path, an empty, "export" and the final parameter
if len(split) != 3 {
fmt.Fprintf(w, "incorrectly formatted export parameter: '%s'",
r.URL.Path)
return
}
switch split[2] {
case "users":
// todo: naughty reaching through interface here lol... but the relay
// implementation does have this feature and another impl may not. Perhaps add
// a new interface for grabbing the relay's allowed list, and rename things to
// be more clear. And add a method for fetching such a relay's allowed writers.
if rl, ok := s.relay.(*app.Relay); ok {
follows := make([]B, 0, len(rl.Followed))
for f := range rl.Followed {
follows = append(follows, B(f))
}
store.Export(s.Ctx, w, follows...)
}
default:
// this should be a hyphen separated list of hexadecimal pubkey values
var exportPubkeys []B
pubkeys := strings.Split(split[2], "-")
for _, pubkey := range pubkeys {
// check they are valid hex
pk, err := hex.Dec(pubkey)
if err != nil {
log.E.F("invalid public key '%s' in parameters", pubkey)
continue
}
exportPubkeys = append(exportPubkeys, pk)
}
store.Export(s.Ctx, w, exportPubkeys...)
}
} else {
store.Export(s.Ctx, w)
}
case strings.HasPrefix(r.URL.Path, "/import"):
if ok := s.HTTPAuth(r); !ok {
s.AuthFail(w)
return
}
log.I.F("import of event data requested on admin port %s", r.RequestURI)
store := s.relay.Storage(context.Bg())
read := io.LimitReader(r.Body, r.ContentLength)
store.Import(read)
case strings.HasPrefix(r.URL.Path, "/shutdown"):
if ok := s.HTTPAuth(r); !ok {
s.AuthFail(w)
return
}
fmt.Fprintf(w, "shutting down")
defer r.Body.Close()
s.Shutdown()
default:
fmt.Fprintf(w, "todo: realy web interface page\n\n")
s.HandleNIP11(w, r)
}
}

View File

@@ -1,59 +0,0 @@
package realy
import (
_ "embed"
"encoding/json"
"net/http"
"realy.lol/context"
"realy.lol/relay"
ri "realy.lol/relayinfo"
"realy.lol/store"
)
//go:embed version
var version S
func (s *Server) HandleNIP11(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
log.T.Ln("handling relay information document")
var info *ri.T
if informationer, ok := s.relay.(relay.Informationer); ok {
info = informationer.GetNIP11InformationDocument()
} else {
// 1, 11, 42, 70, 86, 9
supportedNIPs := ri.GetList(
ri.BasicProtocol,
ri.EventDeletion,
ri.RelayInformationDocument,
ri.GenericTagQueries,
ri.NostrMarketplace,
ri.EventTreatment,
ri.CommandResults,
ri.ParameterizedReplaceableEvents,
ri.ProtectedEvents,
)
var auther relay.Authenticator
if auther, ok = s.relay.(relay.Authenticator); ok && auther.ServiceUrl(r) != "" {
supportedNIPs = append(supportedNIPs, ri.Authentication.N())
}
var storage store.I
if s.relay.Storage(context.Bg()) != nil {
if _, ok = storage.(relay.EventCounter); ok {
supportedNIPs = append(supportedNIPs, ri.CountingResults.N())
}
}
log.T.Ln("supported NIPs", supportedNIPs)
info = &ri.T{
Name: s.relay.Name(),
Description: "relay powered by the realy framework",
Nips: supportedNIPs,
Software: "https://realy.lol",
Version: version,
Limitation: ri.Limits{MaxLimit: s.maxLimit, AuthRequired: s.authRequired},
Icon: "https://cdn.satellite.earth/ac9778868fbf23b63c47c769a74e163377e6ea94d3f0f31711931663d035c4f6.png",
}
}
if err := json.NewEncoder(w).Encode(info); chk.E(err) {
}
}

View File

@@ -1,853 +0,0 @@
package realy
import (
"bytes"
"context"
"crypto/rand"
"errors"
"fmt"
"net/http"
"sort"
"time"
"realy.lol/kinds"
"realy.lol/units"
"github.com/dgraph-io/badger/v4"
"github.com/fasthttp/websocket"
"golang.org/x/time/rate"
"realy.lol/auth"
"realy.lol/bech32encoding"
"realy.lol/ec/bech32"
"realy.lol/envelopes"
"realy.lol/envelopes/authenvelope"
"realy.lol/envelopes/closedenvelope"
"realy.lol/envelopes/closeenvelope"
"realy.lol/envelopes/countenvelope"
"realy.lol/envelopes/eoseenvelope"
"realy.lol/envelopes/eventenvelope"
"realy.lol/envelopes/noticeenvelope"
"realy.lol/envelopes/okenvelope"
"realy.lol/envelopes/reqenvelope"
"realy.lol/event"
"realy.lol/filter"
"realy.lol/hex"
"realy.lol/ints"
"realy.lol/kind"
"realy.lol/normalize"
"realy.lol/relay"
"realy.lol/sha256"
"realy.lol/store"
"realy.lol/tag"
"realy.lol/web"
)
// TODO: consider moving these to Server as config params
const (
// Time allowed to write a message to the peer.
writeWait = 10 * time.Second
// Time allowed to read the next pong message from the peer.
pongWait = 60 * time.Second
// Send pings to peer with this period. Must be less than pongWait.
pingPeriod = pongWait / 2
// Maximum message size allowed from peer.
maxMessageSize = 1 * units.Mb
)
// TODO: consider moving these to Server as config params
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool { return true },
}
const ChallengeLength = 16
const ChallengeHRP = "nchal"
func challenge(conn *websocket.Conn, req *http.Request, addr string) (ws *web.Socket) {
var err error
// create a new challenge for this connection
cb := make([]byte, ChallengeLength)
if _, err = rand.Read(cb); chk.E(err) {
// i never know what to do for this case, panic? usually just ignore, it should never happen
panic(err)
}
var b5 B
if b5, err = bech32encoding.ConvertForBech32(cb); chk.E(err) {
return
}
var encoded B
if encoded, err = bech32.Encode(bech32.B(ChallengeHRP), b5); chk.E(err) {
return
}
ws = web.NewSocket(conn, req, encoded)
return
}
func (s *Server) handleMessage(c Ctx, ws *web.Socket, msg B, sto store.I) {
var notice B
var err E
var t S
var rem B
if t, rem, err = envelopes.Identify(msg); chk.E(err) {
notice = B(err.Error())
}
switch t {
case eventenvelope.L:
notice = s.doEvent(c, ws, rem, sto)
case countenvelope.L:
notice = s.doCount(c, ws, rem, sto)
case reqenvelope.L:
notice = s.doReq(c, ws, rem, sto)
case closeenvelope.L:
notice = s.doClose(c, ws, rem, sto)
case authenvelope.L:
notice = s.doAuth(c, ws, rem, sto)
default:
if cwh, ok := s.relay.(relay.WebSocketHandler); ok {
cwh.HandleUnknownType(ws, t, rem)
} else {
notice = B(fmt.Sprintf("unknown envelope type %s\n%s", t, rem))
}
}
if len(notice) > 0 {
log.D.F("notice %s", notice)
if err = noticeenvelope.NewFrom(notice).Write(ws); chk.E(err) {
}
}
}
func (s *Server) doEvent(c Ctx, ws *web.Socket, req B, sto store.I) (msg B) {
log.T.F("doEvent %s %s", ws.RealRemote(), req)
var err E
var ok bool
var rem B
advancedDeleter, _ := sto.(relay.AdvancedDeleter)
env := eventenvelope.NewSubmission()
if rem, err = env.UnmarshalJSON(req); chk.E(err) {
return
}
if len(rem) > 0 {
log.I.F("extra '%s'", rem)
}
accept, notice, after := s.relay.AcceptEvent(c, env.T, ws.Req(), ws.RealRemote(),
B(ws.Authed()))
if !accept {
var auther relay.Authenticator
if auther, ok = s.relay.(relay.Authenticator); ok && auther.AuthEnabled() {
if !ws.AuthRequested() {
if err = okenvelope.NewFrom(env.ID, false,
normalize.AuthRequired.F("auth required for request processing")).
Write(ws); chk.T(err) {
}
log.T.F("requesting auth from client %s", ws.RealRemote())
if err = authenvelope.NewChallengeWith(ws.Challenge()).Write(ws); chk.T(err) {
return
}
ws.RequestAuth()
return
} else {
if err = okenvelope.NewFrom(env.ID, false,
normalize.AuthRequired.F("auth required for storing events")).
Write(ws); chk.T(err) {
}
log.T.F("requesting auth again from client %s", ws.RealRemote())
if err = authenvelope.NewChallengeWith(ws.Challenge()).Write(ws); chk.T(err) {
return
}
return
}
}
// return an ok event containing any notice returned
if err = okenvelope.NewFrom(env.ID, false,
normalize.Invalid.F(notice)).Write(ws); chk.T(err) {
}
return
}
// check id
if !equals(env.GetIDBytes(), env.ID) {
if err = okenvelope.NewFrom(env.ID, false,
normalize.Invalid.F("event id is computed incorrectly")).Write(ws); chk.E(err) {
return
}
return
}
// check signature
if ok, err = env.Verify(); chk.T(err) {
if err = okenvelope.NewFrom(env.ID, false,
normalize.Error.F("failed to verify signature")).Write(ws); chk.E(err) {
return
}
} else if !ok {
if err = okenvelope.NewFrom(env.ID, false,
normalize.Error.F("signature is invalid")).Write(ws); chk.E(err) {
return
}
return
}
if env.T.Kind.K == kind.Deletion.K {
// we only handle e and a tag deletes because kind based deletes are too indiscriminate.
log.I.F("delete event\n%s", env.T.Serialize())
// event deletion -- nip-09
for _, t := range env.Tags.Value() {
var res []*event.T
if t.Len() >= 2 {
switch {
case equals(t.Key(), B("e")):
evId := make(B, sha256.Size)
if _, err = hex.DecBytes(evId, t.Value()); chk.E(err) {
continue
}
// fetch event to be deleted
res, err = s.relay.Storage(c).
QueryEvents(c, &filter.T{IDs: tag.New(evId)})
if err != nil {
if err = okenvelope.NewFrom(env.ID, false,
normalize.Error.F("failed to query for target event")).Write(ws); chk.E(err) {
return
}
return
}
for i := range res {
if res[i].Kind.Equal(kind.Deletion) {
if err = okenvelope.NewFrom(env.ID, false,
normalize.Blocked.F(
"not processing or storing delete event containing delete event references")).
Write(ws); chk.E(err) {
return
}
}
if !equals(res[i].PubKey, env.T.PubKey) {
if err = okenvelope.NewFrom(env.ID, false,
normalize.Blocked.F(
"cannot delete other users' events")).
Write(ws); chk.E(err) {
return
}
}
}
case equals(t.Key(), B("a")):
split := bytes.Split(t.Value(), B{':'})
if len(split) != 3 {
continue
}
kin := ints.New(uint16(0))
if _, err = kin.UnmarshalJSON(split[0]); chk.E(err) {
return
}
kk := kind.New(kin.Uint16())
if kk.Equal(kind.Deletion) {
// we don't delete delete events, period
if err = okenvelope.NewFrom(env.ID, false,
normalize.Blocked.F(
"delete event kind may not be deleted")).
Write(ws); chk.E(err) {
return
}
}
// if the kind is not parameterised replaceable, the tag is invalid and the
// delete event will not be saved.
if !kk.IsParameterizedReplaceable() {
if err = okenvelope.NewFrom(env.ID, false,
normalize.Error.F(
"delete tags with a tags containing non-parameterized-replaceable events cannot be processed")).
Write(ws); chk.E(err) {
return
}
}
// for this event kind, the second field of the tag value MUST be the pubkey
// of the author of the event
if !equals(split[1], env.T.PubKey) {
if err = okenvelope.NewFrom(env.ID, false,
normalize.Blocked.F(
"cannot delete other users' events")).
Write(ws); chk.E(err) {
return
}
}
f := filter.New()
f.Kinds.K = []*kind.T{kk}
aut := make(B, 0, len(split[1])/2)
if aut, err = hex.DecAppend(aut, split[1]); chk.E(err) {
return
}
f.Authors.Append(aut)
f.Tags.AppendTags(tag.New(B{'#', 'd'}, split[2]))
res, err = s.relay.Storage(c).QueryEvents(c, f)
if err != nil {
if err = okenvelope.NewFrom(env.ID, false,
normalize.Error.F("failed to query for target event")).Write(ws); chk.E(err) {
return
}
return
}
}
}
if len(res) < 1 {
// this will happen if event is not in the database
continue
}
// filter out any events that are newer than the delete request, deletes only work
// backwards, old delete events might match newer events for a tags.
var resTmp []*event.T
for _, v := range res {
if env.T.CreatedAt.U64() >= v.CreatedAt.U64() {
resTmp = append(resTmp, v)
}
}
res = resTmp
for _, target := range res {
if target.Kind.K == kind.Deletion.K {
if err = okenvelope.NewFrom(env.ID, false,
normalize.Error.F("cannot delete delete event %s",
env.ID)).Write(ws); chk.E(err) {
return
}
}
if target.CreatedAt.Int() > env.T.CreatedAt.Int() {
log.I.F("not deleting\n%d%\nbecause delete event is older\n%d",
target.CreatedAt.Int(), env.T.CreatedAt.Int())
continue
}
// check if this can be deleted
if !equals(target.PubKey, env.PubKey) {
if err = okenvelope.NewFrom(env.ID, false,
normalize.Error.F("only author can delete event")).Write(ws); chk.E(err) {
return
}
return
}
if advancedDeleter != nil {
advancedDeleter.BeforeDelete(c, t.Value(), env.PubKey)
}
// delete the event
if err = sto.DeleteEvent(c, target.EventID()); chk.T(err) {
if err = okenvelope.NewFrom(env.ID, false,
normalize.Error.F(err.Error())).Write(ws); chk.E(err) {
return
}
return
}
if advancedDeleter != nil {
advancedDeleter.AfterDelete(t.Value(), env.PubKey)
}
}
res = nil
}
if err = okenvelope.NewFrom(env.ID, true).Write(ws); chk.E(err) {
return
}
// if the event is a deletion we still want to save it.
}
ok, reason := AddEvent(c, s.relay, env.T, ws.Req(), ws.RealRemote(), B(ws.Authed()))
if err = okenvelope.NewFrom(env.ID, ok, reason).Write(ws); chk.E(err) {
return
}
if after != nil {
after()
}
return
}
func (s *Server) doCount(c context.Context, ws *web.Socket, req B,
store store.I) (msg B) {
counter, ok := store.(relay.EventCounter)
if !ok {
return normalize.Restricted.F("this relay does not support NIP-45")
}
if ws.AuthRequested() && len(ws.Authed()) == 0 {
// ignore requests until request is responded to
return
}
var err E
var rem B
env := countenvelope.New()
if rem, err = env.UnmarshalJSON(req); chk.E(err) {
return normalize.Error.F(err.Error())
}
if len(rem) > 0 {
log.I.F("extra '%s'", rem)
}
if env.Subscription == nil || env.Subscription.String() == "" {
return normalize.Error.F("COUNT has no <subscription id>")
}
allowed := env.Filters
if accepter, ok := s.relay.(relay.ReqAcceptor); ok {
var accepted bool
allowed, accepted = accepter.AcceptReq(c, ws.Req(), env.Subscription.T, env.Filters,
B(ws.Authed()))
if !accepted || allowed == nil {
var auther relay.Authenticator
if auther, ok = s.relay.(relay.Authenticator); ok &&
auther.AuthEnabled() && !ws.AuthRequested() {
ws.RequestAuth()
if err = closedenvelope.NewFrom(env.Subscription,
normalize.AuthRequired.F("auth required for count processing")).
Write(ws); chk.E(err) {
}
log.I.F("requesting auth from client from %s", ws.RealRemote())
if err = authenvelope.NewChallengeWith(ws.Challenge()).Write(ws); chk.E(err) {
return
}
return
}
}
}
if allowed != env.Filters {
defer func() {
// send out an auth request after processing filter if the filter was modified
var auther relay.Authenticator
var ok bool
if auther, ok = s.relay.(relay.Authenticator); ok &&
auther.AuthEnabled() && !ws.AuthRequested() {
ws.RequestAuth()
if err = closedenvelope.NewFrom(env.Subscription,
normalize.AuthRequired.F("auth required for request processing")).
Write(ws); chk.E(err) {
}
log.T.F("requesting auth from client from %s, challenge '%s'",
ws.RealRemote(), ws.Challenge())
if err = authenvelope.NewChallengeWith(ws.Challenge()).Write(ws); chk.E(err) {
return
}
return
}
}()
}
var total N
var approx bool
for _, f := range allowed.F {
// prevent kind-4 events from being returned to unauthed users, only when
// authentication is a thing
var auther relay.Authenticator
if auther, ok = s.relay.(relay.Authenticator); ok && auther.AuthEnabled() {
if f.Kinds.Contains(kind.EncryptedDirectMessage) || f.Kinds.Contains(kind.GiftWrap) {
senders := f.Authors
receivers := f.Tags.GetAll(tag.New("p"))
switch {
case len(ws.Authed()) == 0:
// not authenticated
return normalize.Restricted.F(
"this realy does not serve kind-4 to unauthenticated users," +
" does your client implement NIP-42?")
case senders.Len() == 1 &&
receivers.Len() < 2 &&
equals(senders.F()[0], B(ws.Authed())):
// allowed filter: ws.authed is sole sender (filter specifies one or all
// receivers)
case receivers.Len() == 1 &&
senders.Len() < 2 &&
equals(receivers.N(0).Value(), B(ws.Authed())):
// allowed filter: ws.authed is sole receiver (filter specifies one or all
// senders)
default:
// restricted filter: do not return any events, even if other elements in
// filters array were not restricted). client should know better.
return normalize.Restricted.F("authenticated user does not have" +
" authorization for requested filters")
}
}
}
var count N
count, approx, err = counter.CountEvents(c, f)
if err != nil {
log.E.F("store: %v", err)
continue
}
total += count
}
var res *countenvelope.Response
if res, err = countenvelope.NewResponseFrom(env.Subscription.String(), N(total),
approx); chk.E(err) {
return
}
if err = res.Write(ws); chk.E(err) {
return
}
return
}
func (s *Server) doReq(c Ctx, ws *web.Socket, req B, sto store.I) (r B) {
if ws.AuthRequested() && len(ws.Authed()) == 0 {
// ignore requests until request is responded to
return
}
var err E
var rem B
env := reqenvelope.New()
if rem, err = env.UnmarshalJSON(req); chk.E(err) {
return normalize.Error.F(err.Error())
}
if len(rem) > 0 {
log.I.F("extra '%s'", rem)
}
allowed := env.Filters
if accepter, ok := s.relay.(relay.ReqAcceptor); ok {
var accepted bool
allowed, accepted = accepter.AcceptReq(c, ws.Req(), env.Subscription.T, env.Filters,
B(ws.Authed()))
if !accepted || allowed == nil {
var auther relay.Authenticator
if auther, ok = s.relay.(relay.Authenticator); ok &&
auther.AuthEnabled() && !ws.AuthRequested() {
ws.RequestAuth()
if err = closedenvelope.NewFrom(env.Subscription,
normalize.AuthRequired.F("auth required for request processing")).
Write(ws); chk.E(err) {
}
log.T.F("requesting auth from client from %s, challenge '%s'",
ws.RealRemote(), ws.Challenge())
if err = authenvelope.NewChallengeWith(ws.Challenge()).Write(ws); chk.E(err) {
return
}
return
}
return
}
}
if allowed != env.Filters {
defer func() {
// send out an auth request after processing filter if the filter was modified
var auther relay.Authenticator
var ok bool
if auther, ok = s.relay.(relay.Authenticator); ok &&
auther.AuthEnabled() && !ws.AuthRequested() {
ws.RequestAuth()
if err = closedenvelope.NewFrom(env.Subscription,
normalize.AuthRequired.F("auth required for request processing")).
Write(ws); chk.E(err) {
}
log.T.F("requesting auth from client from %s, challenge '%s'",
ws.RealRemote(), ws.Challenge())
if err = authenvelope.NewChallengeWith(ws.Challenge()).Write(ws); chk.E(err) {
return
}
return
}
}()
}
for _, f := range allowed.F {
var i uint
if filter.Present(f.Limit) {
if *f.Limit == 0 {
continue
}
i = *f.Limit
}
// prevent kind-4 events from being returned to unauthed users,
// only when authentication is a thing
if auther, ok := s.relay.(relay.Authenticator); ok && auther.AuthEnabled() {
if f.Kinds.IsPrivileged() {
log.T.F("privileged request with auth enabled\n%s", f.Serialize())
senders := f.Authors
receivers := f.Tags.GetAll(tag.New("#p"))
// log.I.S(senders, receivers)
// log.I.F("%0x\n%s\nsender:%v\nreceiver:%v", ws.AuthedBytes(), f.Serialize(), senders.Contains(ws.AuthedBytes()),
// receivers.ContainsAny(B("#p"), tag.New(ws.AuthedBytes())))
switch {
case len(ws.Authed()) == 0:
ws.RequestAuth()
if err = closedenvelope.NewFrom(env.Subscription,
normalize.AuthRequired.F("auth required for request processing")).
Write(ws); chk.E(err) {
}
log.I.F("requesting auth from client from %s", ws.RealRemote())
if err = authenvelope.NewChallengeWith(ws.Challenge()).Write(ws); chk.E(err) {
return
}
// not authenticated
notice := normalize.Restricted.F(
"this realy does not serve kind-4 to unauthenticated users," +
" does your client implement NIP-42?")
return notice
case senders.Contains(ws.AuthedBytes()) ||
receivers.ContainsAny(B("#p"), tag.New(ws.AuthedBytes())):
log.T.F("user %0x from %s allowed to query for privileged event",
ws.AuthedBytes(), ws.RealRemote())
default:
// restricted filter: do not return any events,
// even if other elements in filters array were not restricted).
// client should know better.
return normalize.Restricted.F("authenticated user %0x does not have"+
" authorization for requested filters", ws.AuthedBytes())
}
}
}
var events event.Ts
log.D.F("query from %s %0x,%s", ws.RealRemote(), ws.AuthedBytes(), f.Serialize())
if events, err = sto.QueryEvents(c, f); err != nil {
log.E.F("eventstore: %v", err)
if errors.Is(err, badger.ErrDBClosed) {
return
}
continue
}
// filter out events from authors in the user's mute list, because they are literally a
// waste of bandwidth from a user's perspective
if aut := ws.Authed(); ws.IsAuthed() {
var mutes event.Ts
if mutes, err = sto.QueryEvents(c, &filter.T{Authors: tag.New(aut),
Kinds: kinds.New(kind.MuteList)}); !chk.E(err) {
// found the users mute list, now, generate the list so we can filter on it
var mutePubs []B
for _, ev := range mutes {
for _, t := range ev.Tags.F() {
if equals(t.Key(), B("p")) {
var p B
if p, err = hex.Dec(S(t.Value())); chk.E(err) {
continue
}
mutePubs = append(mutePubs, p)
}
}
}
var tmp event.Ts
for _, ev := range events {
for _, pk := range mutePubs {
if equals(ev.PubKey, pk) {
continue
}
tmp = append(tmp, ev)
}
}
events = tmp
}
}
// sort in reverse chronological order
sort.Slice(events, func(i, j int) bool {
return events[i].CreatedAt.Int() > events[j].CreatedAt.Int()
})
for _, ev := range events {
if s.options.skipEventFunc != nil && s.options.skipEventFunc(ev) {
continue
}
i--
if i < 0 {
break
}
var res *eventenvelope.Result
if res, err = eventenvelope.NewResultWith(env.Subscription.T, ev); chk.E(err) {
return
}
if err = res.Write(ws); chk.E(err) {
return
}
}
}
if err = eoseenvelope.NewFrom(env.Subscription).Write(ws); chk.E(err) {
return
}
if env.Filters != allowed {
// don't add a subscription if the user is not authed but auth is required
return
}
setListener(env.Subscription.String(), ws, env.Filters)
return
}
func (s *Server) doClose(c Ctx, ws *web.Socket, req B, store store.I) (note B) {
var err E
var rem B
env := closeenvelope.New()
if rem, err = env.UnmarshalJSON(req); chk.E(err) {
return B(err.Error())
}
if len(rem) > 0 {
log.I.F("extra '%s'", rem)
}
if env.ID.String() == "" {
return B("CLOSE has no <id>")
}
removeListenerId(ws, env.ID.String())
return
}
func (s *Server) doAuth(c Ctx, ws *web.Socket, req B, store store.I) (msg B) {
if auther, ok := s.relay.(relay.Authenticator); ok && auther.AuthEnabled() {
svcUrl := auther.ServiceUrl(ws.Req())
if svcUrl == "" {
return
}
log.T.F("received auth response,%s", req)
var err E
var rem B
env := authenvelope.NewResponse()
if rem, err = env.UnmarshalJSON(req); chk.E(err) {
return
}
if len(rem) > 0 {
log.I.F("extra '%s'", rem)
}
var valid bool
if valid, err = auth.Validate(env.Event, B(ws.Challenge()), svcUrl); chk.E(err) {
if err := okenvelope.NewFrom(env.Event.ID, false,
normalize.Error.F(err.Error())).Write(ws); chk.E(err) {
return B(err.Error())
}
return normalize.Error.F(err.Error())
} else if !valid {
if err = okenvelope.NewFrom(env.Event.ID, false,
normalize.Error.F("failed to authenticate")).Write(ws); chk.E(err) {
return B(err.Error())
}
return normalize.Restricted.F("auth response does not validate")
} else {
if err = okenvelope.NewFrom(env.Event.ID, true, B{}).Write(ws); chk.E(err) {
return
}
log.D.F("%s authed to pubkey,%0x", ws.RealRemote(), env.Event.PubKey)
ws.SetAuthed(S(env.Event.PubKey))
}
}
return
}
func (s *Server) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.E.F("failed to upgrade websocket: %v", err)
return
}
s.clientsMu.Lock()
defer s.clientsMu.Unlock()
s.clients[conn] = struct{}{}
ticker := time.NewTicker(pingPeriod)
ip := conn.RemoteAddr().String()
var realIP S
if realIP = r.Header.Get("X-Forwarded-For"); realIP != "" {
ip = realIP // possible to be multiple comma separated
} else if realIP = r.Header.Get("X-Real-Ip"); realIP != "" {
ip = realIP
}
log.T.F("connected from %s", ip)
ws := challenge(conn, r, ip)
if s.options.perConnectionLimiter != nil {
ws.SetLimiter(rate.NewLimiter(
s.options.perConnectionLimiter.Limit(),
s.options.perConnectionLimiter.Burst(),
))
}
ctx, cancel := context.WithCancel(context.Background())
store := s.relay.Storage(ctx)
// reader
go func() {
defer func() {
cancel()
ticker.Stop()
s.clientsMu.Lock()
if _, ok := s.clients[conn]; ok {
conn.Close()
delete(s.clients, conn)
removeListener(ws)
}
s.clientsMu.Unlock()
// log.T.F("disconnected from %s", ip)
}()
conn.SetReadLimit(maxMessageSize)
conn.SetReadDeadline(time.Now().Add(pongWait))
conn.SetPongHandler(func(S) E {
conn.SetReadDeadline(time.Now().Add(pongWait))
return nil
})
if ws.AuthRequested() && len(ws.Authed()) == 0 {
log.I.F("requesting auth from client from %s", ws.RealRemote())
if err = authenvelope.NewChallengeWith(ws.Challenge()).Write(ws); chk.E(err) {
return
}
// ignore requests until request is responded to
return
}
var message B
var typ N
for {
typ, message, err = conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(
err,
websocket.CloseNormalClosure,
websocket.CloseGoingAway, // 1001
websocket.CloseNoStatusReceived, // 1005
websocket.CloseAbnormalClosure, // 1006
) {
log.W.F("unexpected close error from %s: %v",
r.Header.Get("X-Forwarded-For"), err)
}
break
}
if ws.Limiter() != nil {
// NOTE: Wait will throttle the requests.
// To reject requests exceeding the limit, use if !ws.limiter.Allow()
if err := ws.Limiter().Wait(context.TODO()); chk.T(err) {
log.W.F("unexpected limiter error %v", err)
continue
}
}
if typ == websocket.PingMessage {
if err = ws.WriteMessage(websocket.PongMessage, nil); chk.E(err) {
// probably should abort if error here?
}
continue
}
go s.handleMessage(ctx, ws, message, store)
}
}()
// writer
go func() {
defer func() {
cancel()
ticker.Stop()
conn.Close()
}()
var err E
for {
select {
case <-ticker.C:
err = conn.WriteControl(websocket.PingMessage, nil, time.Now().Add(writeWait))
if err != nil {
log.E.F("error writing ping: %v; closing websocket", err)
return
}
ws.RealRemote()
case <-ctx.Done():
return
}
}
}()
}

View File

@@ -1,42 +1,79 @@
package realy
import (
"crypto/rand"
"net/http"
"regexp"
"sync"
"time"
"github.com/fasthttp/websocket"
"realy.lol/bech32encoding"
"realy.lol/ec/bech32"
"realy.lol/envelopes/eventenvelope"
"realy.lol/event"
"realy.lol/filters"
"realy.lol/tag"
"realy.lol/units"
"realy.lol/web"
)
type Listener struct {
filters *filters.T
}
type (
Listener struct{ filters *filters.T }
)
const (
ChallengeHRP = "nchal"
writeWait = 10 * time.Second
pongWait = 60 * time.Second
pingPeriod = pongWait / 2
maxMessageSize = 1 * units.Mb
ChallengeLength = 16
)
var (
nip20prefixmatcher = regexp.MustCompile(`^\w+: `)
upgrader = websocket.Upgrader{ReadBufferSize: 1024, WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
return true
}}
listeners = make(map[*web.Socket]map[S]*Listener)
listenersMutex sync.Mutex
)
func challenge(conn *websocket.Conn, req *http.Request, addr string) (ws *web.Socket) {
var err error
cb := make([]byte, ChallengeLength)
if _, err = rand.Read(cb); chk.E(err) {
panic(err)
}
var b5 B
if b5, err = bech32encoding.ConvertForBech32(cb); chk.E(err) {
return
}
var encoded B
if encoded, err = bech32.Encode(bech32.B(ChallengeHRP), b5); chk.E(err) {
return
}
ws = web.NewSocket(conn, req, encoded)
return
}
func setListener(id S, ws *web.Socket, ff *filters.T) {
listenersMutex.Lock()
defer listenersMutex.Unlock()
subs, ok := listeners[ws]
if !ok {
subs = make(map[S]*Listener)
listeners[ws] = subs
}
subs[id] = &Listener{filters: ff}
}
// Remove a specific subscription id from listeners for a given ws client
func removeListenerId(ws *web.Socket, id S) {
listenersMutex.Lock()
defer listenersMutex.Unlock()
if subs, ok := listeners[ws]; ok {
delete(listeners[ws], id)
if len(subs) == 0 {
@@ -45,7 +82,6 @@ func removeListenerId(ws *web.Socket, id S) {
}
}
// Remove T conn from listeners
func removeListener(ws *web.Socket) {
listenersMutex.Lock()
defer listenersMutex.Unlock()
@@ -55,7 +91,6 @@ func removeListener(ws *web.Socket) {
func notifyListeners(authRequired bool, ev *event.T) {
if ev == nil {
// nothing to do
return
}
var err E
@@ -69,7 +104,6 @@ func notifyListeners(authRequired bool, ev *event.T) {
if !listener.filters.Match(ev) {
continue
}
// is the subscriber authorized to see privileged event?
if ev.Kind.IsPrivileged() {
ab := ws.AuthedBytes()
var containsPubkey bool

30
realy/options/options.go Normal file
View File

@@ -0,0 +1,30 @@
package options
import (
"golang.org/x/time/rate"
"realy.lol/event"
)
type T struct {
PerConnectionLimiter *rate.Limiter
SkipEventFunc func(*event.T) bool
}
type O func(*T)
func Default() *T {
return &T{}
}
func WithPerConnectionLimiter(rps rate.Limit, burst N) O {
return func(o *T) {
o.PerConnectionLimiter = rate.NewLimiter(rps, burst)
}
}
func WithSkipEventFunc(skipEventFunc func(*event.T) bool) O {
return func(o *T) {
o.SkipEventFunc = skipEventFunc
}
}

21
realy/options/util.go Normal file
View File

@@ -0,0 +1,21 @@
package options
import (
"bytes"
"realy.lol/context"
"realy.lol/lol"
)
type (
B = []byte
S = string
E = error
N = int
Ctx = context.T
)
var (
log, chk, errorf = lol.Main.Log, lol.Main.Check, lol.Main.Errorf
equals = bytes.Equal
)

File diff suppressed because it is too large Load Diff

6
realy/version.go Normal file
View File

@@ -0,0 +1,6 @@
package realy
import _ "embed"
//go:embed version
var version S

View File

@@ -21,13 +21,13 @@ type RelayInterface interface {
QuerySync(c Ctx, f *filter.T, opts ...ws.SubscriptionOption) ([]*event.T, E)
}
type RelayWrapper struct {
type Relay struct {
store.I
}
var _ RelayInterface = (*RelayWrapper)(nil)
var _ RelayInterface = (*Relay)(nil)
func (w RelayWrapper) Publish(c Ctx, evt *event.T) (err E) {
func (w Relay) Publish(c Ctx, evt *event.T) (err E) {
if evt.Kind.IsEphemeral() {
// do not store ephemeral events
return nil
@@ -127,7 +127,7 @@ func (w RelayWrapper) Publish(c Ctx, evt *event.T) (err E) {
return
}
func (w RelayWrapper) QuerySync(c Ctx, f *filter.T,
func (w Relay) QuerySync(c Ctx, f *filter.T,
opts ...ws.SubscriptionOption) ([]*event.T, E) {
evs, err := w.I.QueryEvents(c, f)