From 4f695ff51733f64e01ecb44f33f19daeb11a3e8d Mon Sep 17 00:00:00 2001 From: mleku Date: Sat, 30 Nov 2024 12:10:39 +0000 Subject: [PATCH] massive cleanup moves one type into its own package, but no actual code changes, just grouping and sorting --- _util.go | 0 go.mod | 1 + go.sum | 11 +- go.work.sum | 7 +- realy/add-event.go | 86 --- realy/admin.go | 118 ---- realy/handlerelayinfo.go | 59 -- realy/handlers.go | 853 ---------------------------- realy/{listener.go => merged.go} | 54 +- realy/options/options.go | 30 + realy/options/util.go | 21 + realy/server.go | 941 ++++++++++++++++++++++++++++--- realy/version.go | 6 + relay/wrapper/relay_interface.go | 8 +- 14 files changed, 983 insertions(+), 1212 deletions(-) create mode 100644 _util.go delete mode 100644 realy/add-event.go delete mode 100644 realy/admin.go delete mode 100644 realy/handlerelayinfo.go delete mode 100644 realy/handlers.go rename realy/{listener.go => merged.go} (62%) create mode 100644 realy/options/options.go create mode 100644 realy/options/util.go create mode 100644 realy/version.go diff --git a/_util.go b/_util.go new file mode 100644 index 0000000..e69de29 diff --git a/go.mod b/go.mod index b41eeed..7281ff6 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.23.1 require ( github.com/alexflint/go-arg v1.5.1 + github.com/dave/dst v0.27.3 github.com/davecgh/go-spew v1.1.1 github.com/dgraph-io/badger/v4 v4.3.1 github.com/fasthttp/websocket v1.5.10 diff --git a/go.sum b/go.sum index bb89162..7c16a45 100644 --- a/go.sum +++ b/go.sum @@ -22,6 +22,8 @@ github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMn github.com/chzyer/test v1.0.0/go.mod h1:2JlltgoNkt4TW/z9V/IzDdFaMTM2JPIi26O1pF38GC8= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= +github.com/dave/dst v0.27.3 h1:P1HPoMza3cMEquVf9kKy8yXsFirry4zEnWOdYPOoIzY= +github.com/dave/dst v0.27.3/go.mod h1:jHh6EOibnHgcUW3WjKHisiooEkYwqpHLBSX1iOBhEyc= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -81,8 +83,6 @@ github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/pprof v0.0.0-20211214055906-6f57359322fd/go.mod h1:KgnwoLYCZ8IQu3XUZ8Nc/bM9CCZFOyjUNOSygVozoDg= github.com/google/pprof v0.0.0-20240227163752-401108e1b7e7/go.mod h1:czg5+yv1E0ZGTi6S6vVK1mke0fV+FaUhNGcd6VRS9Ik= -github.com/google/pprof v0.0.0-20241023014458-598669927662 h1:SKMkD83p7FwUqKmBsPdLHF5dNyxq3jOWwu9w9UyH5vA= -github.com/google/pprof v0.0.0-20241023014458-598669927662/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144= github.com/google/pprof v0.0.0-20241122213907-cbe949e5a41b h1:SXO0REt4iu865upYCk8aKBBJQ4BqoE0ReP23ClMu60s= github.com/google/pprof v0.0.0-20241122213907-cbe949e5a41b/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -95,8 +95,6 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= -github.com/klauspost/cpuid/v2 v2.2.8 h1:+StwCXwm9PdpiEkPyzBXIy+M9KUb4ODm0Zarf1kS5BM= -github.com/klauspost/cpuid/v2 v2.2.8/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= github.com/klauspost/cpuid/v2 v2.2.9 h1:66ze0taIn2H33fBvCkXuv9BmCwDfafmiIVpKV9kKGuY= github.com/klauspost/cpuid/v2 v2.2.9/go.mod h1:rqkxqrZ1EhYM9G+hXH7YdowN5R5RGN6NK4QwQ3WMXF8= github.com/kopoli/go-terminal-size v0.0.0-20170219200355-5c97524c8b54 h1:0SMHxjkLKNawqUjjnMlCtEdj6uWZjv0+qDZ3F6GOADI= @@ -159,8 +157,6 @@ golang.org/x/exp/typeparams v0.0.0-20241009180824-f66d83c29e7c/go.mod h1:AbB0pIl golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= -golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 h1:VLliZ0d+/avPrXXH+OakdXhpJuEoBZuwh1m2j7U6Iug= -golang.org/x/lint v0.0.0-20210508222113-6edffad5e616/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= golang.org/x/lint v0.0.0-20241112194109-818c5a804067 h1:adDmSQyFTCiv19j015EGKJBoaa7ElV0Q1Wovb/4G7NA= golang.org/x/lint v0.0.0-20241112194109-818c5a804067/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= @@ -194,7 +190,6 @@ golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220310020820-b874c991c1a5/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= @@ -238,8 +233,6 @@ google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2 google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= -google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA= -google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= google.golang.org/protobuf v1.35.2 h1:8Ar7bF+apOIoThw1EdZl0p1oWvMqTHmpA2fRTyZO8io= google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= diff --git a/go.work.sum b/go.work.sum index 83f656c..cfbad1e 100644 --- a/go.work.sum +++ b/go.work.sum @@ -55,6 +55,8 @@ github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1 h1:q763qf9huN11kDQavWs github.com/chzyer/test v1.0.0 h1:p3BQDXSxOhOG0P9z6/hGnII4LGiEPOYBhs8asl/fC04= github.com/client9/misspell v0.3.4 h1:ta993UF76GwbvJcIo3Y68y/M3WxlpEHPWIGDkJYwzJI= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f h1:WBZRG4aNOuI15bLRrCgN8fCq8E5Xuty6jGbmSNEvSsU= +github.com/dave/jennifer v1.5.0 h1:HmgPN93bVDpkQyYbqhCHj5QlgvUkvEOzMyEvKLgCRrg= +github.com/dave/jennifer v1.5.0/go.mod h1:4MnyiFIlZS3l5tSDn8VnzE6ffAhYBMB2SZntBsZGUok= github.com/dgraph-io/badger/v4 v4.2.0 h1:kJrlajbXXL9DFTNuhhu9yCx7JJa4qpYWxtE8BzuWsEs= github.com/dgraph-io/badger/v4 v4.2.0/go.mod h1:qfCqhPoWDFJRx1gp5QwwyGo8xk1lbHUxvK9nK0OGAak= github.com/dgraph-io/ristretto v0.1.1 h1:6CWw5tJNgpegArSHpNHJKldNeq03FQCwYvfMVWajOK8= @@ -115,6 +117,8 @@ github.com/opensearch-project/opensearch-go/v4 v4.0.0 h1:Nrh30HhaknKcaPcIzlqA6Jf github.com/opensearch-project/opensearch-go/v4 v4.0.0/go.mod h1:amlBgHgAX9AwwW50eOuzYa5n/8aD18LoWO8eDLoe8KQ= github.com/orisano/pixelmatch v0.0.0-20220722002657-fb0b55479cde h1:x0TT0RDC7UhAVbbWWBzr41ElhJx5tXPWkIHA2HWPRuw= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4 h1:gQz4mCbXsO+nc9n1hCxHcGA3Zx3Eo+UHZoInFGUIXNM= +github.com/sergi/go-diff v1.2.0 h1:XU+rvMAioB0UC3q1MFrIQy4Vo5/4VsRDQQXHsEya6xQ= +github.com/sergi/go-diff v1.2.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= github.com/spf13/cobra v1.8.1 h1:e5/vxKd/rZsfSJMUX1agtjeTDf+qv1/JdBF8gg5k9ZM= github.com/spf13/cobra v1.8.1/go.mod h1:wHxEcudfqmLYa8iTfL+OuZPbBZkmvliBWKIezN3kD9Y= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= @@ -131,7 +135,6 @@ github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVS github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc= github.com/xi2/xz v0.0.0-20171230120015-48954b6210f8 h1:nIPpBwaJSVYIxUFsDv3M8ofmx9yWTog9BfvIu0q41lo= github.com/xi2/xz v0.0.0-20171230120015-48954b6210f8/go.mod h1:HUYIGzjTL3rfEspMxjDjgmT5uz5wzYJKVo23qUhYTos= -github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU= github.com/yuin/goldmark v1.2.1 h1:ruQGxdhGHe7FWOJPT0mKs5+pD2Xs1Bm/kdGlHO04FmM= github.com/yuin/goldmark v1.4.13 h1:fVcFKWvrslecOb/tg+Cc05dkeYx540o0FuFt3nUVDoE= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= @@ -166,3 +169,5 @@ google.golang.org/grpc v1.56.3 h1:8I4C0Yq1EjstUzUJzpcRVbuYA2mODtEmpWiQoN/b2nc= google.golang.org/grpc v1.56.3/go.mod h1:I9bI3vqKfayGqPUAwGdOSu7kt6oIJLixfffKrpXqQ9s= google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +gopkg.in/src-d/go-billy.v4 v4.3.2 h1:0SQA1pRztfTFx2miS8sA97XvooFeNOmvUenF4o0EcVg= +gopkg.in/src-d/go-billy.v4 v4.3.2/go.mod h1:nDjArDMp+XMs1aFAESLRjfGSgfvoYN0hDfzEk0GjC98= diff --git a/realy/add-event.go b/realy/add-event.go deleted file mode 100644 index 5f70cd2..0000000 --- a/realy/add-event.go +++ /dev/null @@ -1,86 +0,0 @@ -package realy - -import ( - "fmt" - "net/http" - "regexp" - "strings" - - "realy.lol/event" - "realy.lol/normalize" - "realy.lol/relay" - "realy.lol/relay/wrapper" - "realy.lol/store" -) - -var nip20prefixmatcher = regexp.MustCompile(`^\w+: `) - -// AddEvent has a business rule to add an event to the event store -func AddEvent(c Ctx, rl relay.I, ev *event.T, hr *http.Request, origin S, - authedPubkey B) (accepted bool, message B) { - if ev == nil { - return false, normalize.Invalid.F("empty event") - } - - sto := rl.Storage(c) - wrapper := &wrapper.RelayWrapper{I: sto} - advancedSaver, _ := sto.(relay.AdvancedSaver) - - accept, notice, after := rl.AcceptEvent(c, ev, hr, origin, authedPubkey) - if !accept { - return false, normalize.Blocked.F(notice) - } - if ev.Tags.ContainsProtectedMarker() { - if len(authedPubkey) == 0 || !equals(ev.PubKey, authedPubkey) { - return false, B(fmt.Sprintf( - "event with relay marker tag '-' may only be published by matching npub: %0x is not %0x", - authedPubkey, ev.PubKey)) - } - } - if ev.Kind.IsEphemeral() { - // do not store ephemeral events - } else { - if advancedSaver != nil { - advancedSaver.BeforeSave(c, ev) - } - - if saveErr := wrapper.Publish(c, ev); chk.E(saveErr) { - switch saveErr { - case store.ErrDupEvent: - return false, normalize.Error.F(saveErr.Error()) - default: - errmsg := saveErr.Error() - if nip20prefixmatcher.MatchString(errmsg) { - if strings.Contains(errmsg, "tombstone") { - return false, normalize.Blocked.F( - "event was deleted, not storing it again") - } - return false, normalize.Error.F(errmsg) - } else { - return false, normalize.Error.F("failed to save (%s)", errmsg) - } - } - // } else { - // log.D.F("saved event %s", ev.Serialize()) - } - - if advancedSaver != nil { - advancedSaver.AfterSave(ev) - } - } - - var authRequired bool - if ar, ok := rl.(relay.Authenticator); ok { - authRequired = ar.AuthEnabled() - } - // if the AcceptEvent function returned a closure we run it after the publish - // has been done because at least for now this means it is an updated follow - // list of an owner or their follows - if after != nil { - after() - } - notifyListeners(authRequired, ev) - - accepted = true - return -} diff --git a/realy/admin.go b/realy/admin.go deleted file mode 100644 index 1aed0af..0000000 --- a/realy/admin.go +++ /dev/null @@ -1,118 +0,0 @@ -package realy - -import ( - "crypto/subtle" - "fmt" - "io" - "net/http" - "strings" - - "realy.lol/cmd/realy/app" - "realy.lol/context" - "realy.lol/hex" - "realy.lol/sha256" -) - -func (s *Server) HTTPAuth(r *http.Request) (authed bool) { - username, password, ok := r.BasicAuth() - if ok { - // Calculate SHA-256 hashes for the provided and expected - // usernames and passwords. - usernameHash := sha256.Sum256(B(username)) - passwordHash := sha256.Sum256(B(password)) - expectedUsernameHash := sha256.Sum256(B(s.adminUser)) - expectedPasswordHash := sha256.Sum256(B(s.adminPass)) - - // Use the subtle.ConstantTimeCompare() function to check if - // the provided username and password hashes equal the - // expected username and password hashes. ConstantTimeCompare - // will return 1 if the values are equal, or 0 otherwise. - // Importantly, we should to do the work to evaluate both the - // username and password before checking the return values to - // avoid leaking information. - usernameMatch := subtle.ConstantTimeCompare(usernameHash[:], - expectedUsernameHash[:]) == 1 - passwordMatch := subtle.ConstantTimeCompare(passwordHash[:], - expectedPasswordHash[:]) == 1 - if usernameMatch && passwordMatch { - return true - } - } - return -} - -func (s *Server) AuthFail(w http.ResponseWriter) { - w.Header().Set("WWW-Authenticate", `Basic realm="restricted", charset="UTF-8"`) - http.Error(w, "Unauthorized", http.StatusUnauthorized) -} - -func (s *Server) HandleAdmin(w http.ResponseWriter, r *http.Request) { - switch { - case strings.HasPrefix(r.URL.Path, "/export"): - if ok := s.HTTPAuth(r); !ok { - s.AuthFail(w) - return - } - log.I.F("export of event data requested on admin port") - store := s.relay.Storage(context.Bg()) - if strings.Count(r.URL.Path, "/") > 1 { - split := strings.Split(r.URL.Path, "/") - // there should be 3 for a valid path, an empty, "export" and the final parameter - if len(split) != 3 { - fmt.Fprintf(w, "incorrectly formatted export parameter: '%s'", - r.URL.Path) - return - } - switch split[2] { - case "users": - // todo: naughty reaching through interface here lol... but the relay - // implementation does have this feature and another impl may not. Perhaps add - // a new interface for grabbing the relay's allowed list, and rename things to - // be more clear. And add a method for fetching such a relay's allowed writers. - if rl, ok := s.relay.(*app.Relay); ok { - follows := make([]B, 0, len(rl.Followed)) - for f := range rl.Followed { - follows = append(follows, B(f)) - } - store.Export(s.Ctx, w, follows...) - } - default: - // this should be a hyphen separated list of hexadecimal pubkey values - var exportPubkeys []B - pubkeys := strings.Split(split[2], "-") - for _, pubkey := range pubkeys { - // check they are valid hex - pk, err := hex.Dec(pubkey) - if err != nil { - log.E.F("invalid public key '%s' in parameters", pubkey) - continue - } - exportPubkeys = append(exportPubkeys, pk) - } - store.Export(s.Ctx, w, exportPubkeys...) - } - } else { - store.Export(s.Ctx, w) - } - case strings.HasPrefix(r.URL.Path, "/import"): - if ok := s.HTTPAuth(r); !ok { - s.AuthFail(w) - return - } - log.I.F("import of event data requested on admin port %s", r.RequestURI) - store := s.relay.Storage(context.Bg()) - read := io.LimitReader(r.Body, r.ContentLength) - store.Import(read) - case strings.HasPrefix(r.URL.Path, "/shutdown"): - if ok := s.HTTPAuth(r); !ok { - s.AuthFail(w) - return - } - fmt.Fprintf(w, "shutting down") - defer r.Body.Close() - s.Shutdown() - default: - fmt.Fprintf(w, "todo: realy web interface page\n\n") - s.HandleNIP11(w, r) - } -} diff --git a/realy/handlerelayinfo.go b/realy/handlerelayinfo.go deleted file mode 100644 index 526888a..0000000 --- a/realy/handlerelayinfo.go +++ /dev/null @@ -1,59 +0,0 @@ -package realy - -import ( - _ "embed" - "encoding/json" - "net/http" - - "realy.lol/context" - "realy.lol/relay" - ri "realy.lol/relayinfo" - "realy.lol/store" -) - -//go:embed version -var version S - -func (s *Server) HandleNIP11(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "application/json") - log.T.Ln("handling relay information document") - var info *ri.T - if informationer, ok := s.relay.(relay.Informationer); ok { - info = informationer.GetNIP11InformationDocument() - } else { - // 1, 11, 42, 70, 86, 9 - supportedNIPs := ri.GetList( - ri.BasicProtocol, - ri.EventDeletion, - ri.RelayInformationDocument, - ri.GenericTagQueries, - ri.NostrMarketplace, - ri.EventTreatment, - ri.CommandResults, - ri.ParameterizedReplaceableEvents, - ri.ProtectedEvents, - ) - var auther relay.Authenticator - if auther, ok = s.relay.(relay.Authenticator); ok && auther.ServiceUrl(r) != "" { - supportedNIPs = append(supportedNIPs, ri.Authentication.N()) - } - var storage store.I - if s.relay.Storage(context.Bg()) != nil { - if _, ok = storage.(relay.EventCounter); ok { - supportedNIPs = append(supportedNIPs, ri.CountingResults.N()) - } - } - log.T.Ln("supported NIPs", supportedNIPs) - info = &ri.T{ - Name: s.relay.Name(), - Description: "relay powered by the realy framework", - Nips: supportedNIPs, - Software: "https://realy.lol", - Version: version, - Limitation: ri.Limits{MaxLimit: s.maxLimit, AuthRequired: s.authRequired}, - Icon: "https://cdn.satellite.earth/ac9778868fbf23b63c47c769a74e163377e6ea94d3f0f31711931663d035c4f6.png", - } - } - if err := json.NewEncoder(w).Encode(info); chk.E(err) { - } -} diff --git a/realy/handlers.go b/realy/handlers.go deleted file mode 100644 index aca9be9..0000000 --- a/realy/handlers.go +++ /dev/null @@ -1,853 +0,0 @@ -package realy - -import ( - "bytes" - "context" - "crypto/rand" - "errors" - "fmt" - "net/http" - "sort" - "time" - - "realy.lol/kinds" - "realy.lol/units" - - "github.com/dgraph-io/badger/v4" - "github.com/fasthttp/websocket" - "golang.org/x/time/rate" - - "realy.lol/auth" - "realy.lol/bech32encoding" - "realy.lol/ec/bech32" - "realy.lol/envelopes" - "realy.lol/envelopes/authenvelope" - "realy.lol/envelopes/closedenvelope" - "realy.lol/envelopes/closeenvelope" - "realy.lol/envelopes/countenvelope" - "realy.lol/envelopes/eoseenvelope" - "realy.lol/envelopes/eventenvelope" - "realy.lol/envelopes/noticeenvelope" - "realy.lol/envelopes/okenvelope" - "realy.lol/envelopes/reqenvelope" - "realy.lol/event" - "realy.lol/filter" - "realy.lol/hex" - "realy.lol/ints" - "realy.lol/kind" - "realy.lol/normalize" - "realy.lol/relay" - "realy.lol/sha256" - "realy.lol/store" - "realy.lol/tag" - "realy.lol/web" -) - -// TODO: consider moving these to Server as config params -const ( - // Time allowed to write a message to the peer. - writeWait = 10 * time.Second - - // Time allowed to read the next pong message from the peer. - pongWait = 60 * time.Second - - // Send pings to peer with this period. Must be less than pongWait. - pingPeriod = pongWait / 2 - - // Maximum message size allowed from peer. - maxMessageSize = 1 * units.Mb -) - -// TODO: consider moving these to Server as config params -var upgrader = websocket.Upgrader{ - ReadBufferSize: 1024, - WriteBufferSize: 1024, - CheckOrigin: func(r *http.Request) bool { return true }, -} - -const ChallengeLength = 16 -const ChallengeHRP = "nchal" - -func challenge(conn *websocket.Conn, req *http.Request, addr string) (ws *web.Socket) { - var err error - // create a new challenge for this connection - cb := make([]byte, ChallengeLength) - if _, err = rand.Read(cb); chk.E(err) { - // i never know what to do for this case, panic? usually just ignore, it should never happen - panic(err) - } - var b5 B - if b5, err = bech32encoding.ConvertForBech32(cb); chk.E(err) { - return - } - var encoded B - if encoded, err = bech32.Encode(bech32.B(ChallengeHRP), b5); chk.E(err) { - return - } - ws = web.NewSocket(conn, req, encoded) - return -} - -func (s *Server) handleMessage(c Ctx, ws *web.Socket, msg B, sto store.I) { - var notice B - var err E - var t S - var rem B - if t, rem, err = envelopes.Identify(msg); chk.E(err) { - notice = B(err.Error()) - } - switch t { - case eventenvelope.L: - notice = s.doEvent(c, ws, rem, sto) - case countenvelope.L: - notice = s.doCount(c, ws, rem, sto) - case reqenvelope.L: - notice = s.doReq(c, ws, rem, sto) - case closeenvelope.L: - notice = s.doClose(c, ws, rem, sto) - case authenvelope.L: - notice = s.doAuth(c, ws, rem, sto) - default: - if cwh, ok := s.relay.(relay.WebSocketHandler); ok { - cwh.HandleUnknownType(ws, t, rem) - } else { - notice = B(fmt.Sprintf("unknown envelope type %s\n%s", t, rem)) - } - } - if len(notice) > 0 { - log.D.F("notice %s", notice) - if err = noticeenvelope.NewFrom(notice).Write(ws); chk.E(err) { - } - } -} - -func (s *Server) doEvent(c Ctx, ws *web.Socket, req B, sto store.I) (msg B) { - log.T.F("doEvent %s %s", ws.RealRemote(), req) - var err E - var ok bool - var rem B - advancedDeleter, _ := sto.(relay.AdvancedDeleter) - - env := eventenvelope.NewSubmission() - if rem, err = env.UnmarshalJSON(req); chk.E(err) { - return - } - if len(rem) > 0 { - log.I.F("extra '%s'", rem) - } - accept, notice, after := s.relay.AcceptEvent(c, env.T, ws.Req(), ws.RealRemote(), - B(ws.Authed())) - if !accept { - var auther relay.Authenticator - if auther, ok = s.relay.(relay.Authenticator); ok && auther.AuthEnabled() { - if !ws.AuthRequested() { - if err = okenvelope.NewFrom(env.ID, false, - normalize.AuthRequired.F("auth required for request processing")). - Write(ws); chk.T(err) { - } - log.T.F("requesting auth from client %s", ws.RealRemote()) - if err = authenvelope.NewChallengeWith(ws.Challenge()).Write(ws); chk.T(err) { - return - } - ws.RequestAuth() - return - } else { - if err = okenvelope.NewFrom(env.ID, false, - normalize.AuthRequired.F("auth required for storing events")). - Write(ws); chk.T(err) { - } - log.T.F("requesting auth again from client %s", ws.RealRemote()) - if err = authenvelope.NewChallengeWith(ws.Challenge()).Write(ws); chk.T(err) { - return - } - return - } - } - // return an ok event containing any notice returned - if err = okenvelope.NewFrom(env.ID, false, - normalize.Invalid.F(notice)).Write(ws); chk.T(err) { - } - return - } - - // check id - if !equals(env.GetIDBytes(), env.ID) { - if err = okenvelope.NewFrom(env.ID, false, - normalize.Invalid.F("event id is computed incorrectly")).Write(ws); chk.E(err) { - return - } - return - } - - // check signature - if ok, err = env.Verify(); chk.T(err) { - if err = okenvelope.NewFrom(env.ID, false, - normalize.Error.F("failed to verify signature")).Write(ws); chk.E(err) { - return - } - } else if !ok { - if err = okenvelope.NewFrom(env.ID, false, - normalize.Error.F("signature is invalid")).Write(ws); chk.E(err) { - return - } - return - } - if env.T.Kind.K == kind.Deletion.K { - // we only handle e and a tag deletes because kind based deletes are too indiscriminate. - log.I.F("delete event\n%s", env.T.Serialize()) - // event deletion -- nip-09 - for _, t := range env.Tags.Value() { - var res []*event.T - if t.Len() >= 2 { - switch { - case equals(t.Key(), B("e")): - evId := make(B, sha256.Size) - if _, err = hex.DecBytes(evId, t.Value()); chk.E(err) { - continue - } - // fetch event to be deleted - res, err = s.relay.Storage(c). - QueryEvents(c, &filter.T{IDs: tag.New(evId)}) - if err != nil { - if err = okenvelope.NewFrom(env.ID, false, - normalize.Error.F("failed to query for target event")).Write(ws); chk.E(err) { - return - } - return - } - for i := range res { - if res[i].Kind.Equal(kind.Deletion) { - if err = okenvelope.NewFrom(env.ID, false, - normalize.Blocked.F( - "not processing or storing delete event containing delete event references")). - Write(ws); chk.E(err) { - return - } - } - if !equals(res[i].PubKey, env.T.PubKey) { - if err = okenvelope.NewFrom(env.ID, false, - normalize.Blocked.F( - "cannot delete other users' events")). - Write(ws); chk.E(err) { - return - } - } - } - case equals(t.Key(), B("a")): - split := bytes.Split(t.Value(), B{':'}) - if len(split) != 3 { - continue - } - kin := ints.New(uint16(0)) - if _, err = kin.UnmarshalJSON(split[0]); chk.E(err) { - return - } - kk := kind.New(kin.Uint16()) - if kk.Equal(kind.Deletion) { - // we don't delete delete events, period - if err = okenvelope.NewFrom(env.ID, false, - normalize.Blocked.F( - "delete event kind may not be deleted")). - Write(ws); chk.E(err) { - return - } - } - // if the kind is not parameterised replaceable, the tag is invalid and the - // delete event will not be saved. - if !kk.IsParameterizedReplaceable() { - if err = okenvelope.NewFrom(env.ID, false, - normalize.Error.F( - "delete tags with a tags containing non-parameterized-replaceable events cannot be processed")). - Write(ws); chk.E(err) { - return - } - } - // for this event kind, the second field of the tag value MUST be the pubkey - // of the author of the event - if !equals(split[1], env.T.PubKey) { - if err = okenvelope.NewFrom(env.ID, false, - normalize.Blocked.F( - "cannot delete other users' events")). - Write(ws); chk.E(err) { - return - } - } - f := filter.New() - f.Kinds.K = []*kind.T{kk} - aut := make(B, 0, len(split[1])/2) - if aut, err = hex.DecAppend(aut, split[1]); chk.E(err) { - return - } - f.Authors.Append(aut) - f.Tags.AppendTags(tag.New(B{'#', 'd'}, split[2])) - res, err = s.relay.Storage(c).QueryEvents(c, f) - if err != nil { - if err = okenvelope.NewFrom(env.ID, false, - normalize.Error.F("failed to query for target event")).Write(ws); chk.E(err) { - return - } - return - } - } - } - if len(res) < 1 { - // this will happen if event is not in the database - continue - } - // filter out any events that are newer than the delete request, deletes only work - // backwards, old delete events might match newer events for a tags. - var resTmp []*event.T - for _, v := range res { - if env.T.CreatedAt.U64() >= v.CreatedAt.U64() { - resTmp = append(resTmp, v) - } - } - res = resTmp - for _, target := range res { - if target.Kind.K == kind.Deletion.K { - if err = okenvelope.NewFrom(env.ID, false, - normalize.Error.F("cannot delete delete event %s", - env.ID)).Write(ws); chk.E(err) { - return - } - } - if target.CreatedAt.Int() > env.T.CreatedAt.Int() { - log.I.F("not deleting\n%d%\nbecause delete event is older\n%d", - target.CreatedAt.Int(), env.T.CreatedAt.Int()) - continue - } - // check if this can be deleted - if !equals(target.PubKey, env.PubKey) { - if err = okenvelope.NewFrom(env.ID, false, - normalize.Error.F("only author can delete event")).Write(ws); chk.E(err) { - return - } - return - } - - if advancedDeleter != nil { - advancedDeleter.BeforeDelete(c, t.Value(), env.PubKey) - } - - // delete the event - if err = sto.DeleteEvent(c, target.EventID()); chk.T(err) { - if err = okenvelope.NewFrom(env.ID, false, - normalize.Error.F(err.Error())).Write(ws); chk.E(err) { - return - } - return - } - - if advancedDeleter != nil { - advancedDeleter.AfterDelete(t.Value(), env.PubKey) - } - } - res = nil - } - if err = okenvelope.NewFrom(env.ID, true).Write(ws); chk.E(err) { - return - } - // if the event is a deletion we still want to save it. - } - ok, reason := AddEvent(c, s.relay, env.T, ws.Req(), ws.RealRemote(), B(ws.Authed())) - if err = okenvelope.NewFrom(env.ID, ok, reason).Write(ws); chk.E(err) { - return - } - if after != nil { - after() - } - return -} - -func (s *Server) doCount(c context.Context, ws *web.Socket, req B, - store store.I) (msg B) { - - counter, ok := store.(relay.EventCounter) - if !ok { - return normalize.Restricted.F("this relay does not support NIP-45") - } - - if ws.AuthRequested() && len(ws.Authed()) == 0 { - // ignore requests until request is responded to - return - } - - var err E - var rem B - env := countenvelope.New() - if rem, err = env.UnmarshalJSON(req); chk.E(err) { - return normalize.Error.F(err.Error()) - } - if len(rem) > 0 { - log.I.F("extra '%s'", rem) - } - - if env.Subscription == nil || env.Subscription.String() == "" { - return normalize.Error.F("COUNT has no ") - } - allowed := env.Filters - if accepter, ok := s.relay.(relay.ReqAcceptor); ok { - var accepted bool - allowed, accepted = accepter.AcceptReq(c, ws.Req(), env.Subscription.T, env.Filters, - B(ws.Authed())) - if !accepted || allowed == nil { - var auther relay.Authenticator - if auther, ok = s.relay.(relay.Authenticator); ok && - auther.AuthEnabled() && !ws.AuthRequested() { - - ws.RequestAuth() - if err = closedenvelope.NewFrom(env.Subscription, - normalize.AuthRequired.F("auth required for count processing")). - Write(ws); chk.E(err) { - } - log.I.F("requesting auth from client from %s", ws.RealRemote()) - if err = authenvelope.NewChallengeWith(ws.Challenge()).Write(ws); chk.E(err) { - return - } - return - } - } - } - if allowed != env.Filters { - defer func() { - // send out an auth request after processing filter if the filter was modified - var auther relay.Authenticator - var ok bool - if auther, ok = s.relay.(relay.Authenticator); ok && - auther.AuthEnabled() && !ws.AuthRequested() { - - ws.RequestAuth() - if err = closedenvelope.NewFrom(env.Subscription, - normalize.AuthRequired.F("auth required for request processing")). - Write(ws); chk.E(err) { - } - log.T.F("requesting auth from client from %s, challenge '%s'", - ws.RealRemote(), ws.Challenge()) - if err = authenvelope.NewChallengeWith(ws.Challenge()).Write(ws); chk.E(err) { - return - } - return - } - }() - } - var total N - var approx bool - for _, f := range allowed.F { - // prevent kind-4 events from being returned to unauthed users, only when - // authentication is a thing - var auther relay.Authenticator - if auther, ok = s.relay.(relay.Authenticator); ok && auther.AuthEnabled() { - if f.Kinds.Contains(kind.EncryptedDirectMessage) || f.Kinds.Contains(kind.GiftWrap) { - senders := f.Authors - receivers := f.Tags.GetAll(tag.New("p")) - switch { - case len(ws.Authed()) == 0: - // not authenticated - return normalize.Restricted.F( - "this realy does not serve kind-4 to unauthenticated users," + - " does your client implement NIP-42?") - case senders.Len() == 1 && - receivers.Len() < 2 && - equals(senders.F()[0], B(ws.Authed())): - // allowed filter: ws.authed is sole sender (filter specifies one or all - // receivers) - case receivers.Len() == 1 && - senders.Len() < 2 && - equals(receivers.N(0).Value(), B(ws.Authed())): - // allowed filter: ws.authed is sole receiver (filter specifies one or all - // senders) - default: - // restricted filter: do not return any events, even if other elements in - // filters array were not restricted). client should know better. - return normalize.Restricted.F("authenticated user does not have" + - " authorization for requested filters") - } - } - } - var count N - count, approx, err = counter.CountEvents(c, f) - if err != nil { - log.E.F("store: %v", err) - continue - } - total += count - } - var res *countenvelope.Response - if res, err = countenvelope.NewResponseFrom(env.Subscription.String(), N(total), - approx); chk.E(err) { - return - } - if err = res.Write(ws); chk.E(err) { - return - } - return -} - -func (s *Server) doReq(c Ctx, ws *web.Socket, req B, sto store.I) (r B) { - - if ws.AuthRequested() && len(ws.Authed()) == 0 { - // ignore requests until request is responded to - return - } - var err E - var rem B - env := reqenvelope.New() - if rem, err = env.UnmarshalJSON(req); chk.E(err) { - return normalize.Error.F(err.Error()) - } - if len(rem) > 0 { - log.I.F("extra '%s'", rem) - } - allowed := env.Filters - if accepter, ok := s.relay.(relay.ReqAcceptor); ok { - var accepted bool - allowed, accepted = accepter.AcceptReq(c, ws.Req(), env.Subscription.T, env.Filters, - B(ws.Authed())) - if !accepted || allowed == nil { - var auther relay.Authenticator - if auther, ok = s.relay.(relay.Authenticator); ok && - auther.AuthEnabled() && !ws.AuthRequested() { - - ws.RequestAuth() - if err = closedenvelope.NewFrom(env.Subscription, - normalize.AuthRequired.F("auth required for request processing")). - Write(ws); chk.E(err) { - } - log.T.F("requesting auth from client from %s, challenge '%s'", - ws.RealRemote(), ws.Challenge()) - if err = authenvelope.NewChallengeWith(ws.Challenge()).Write(ws); chk.E(err) { - return - } - return - } - return - } - } - if allowed != env.Filters { - defer func() { - // send out an auth request after processing filter if the filter was modified - var auther relay.Authenticator - var ok bool - if auther, ok = s.relay.(relay.Authenticator); ok && - auther.AuthEnabled() && !ws.AuthRequested() { - - ws.RequestAuth() - if err = closedenvelope.NewFrom(env.Subscription, - normalize.AuthRequired.F("auth required for request processing")). - Write(ws); chk.E(err) { - } - log.T.F("requesting auth from client from %s, challenge '%s'", - ws.RealRemote(), ws.Challenge()) - if err = authenvelope.NewChallengeWith(ws.Challenge()).Write(ws); chk.E(err) { - return - } - return - } - }() - } - for _, f := range allowed.F { - var i uint - if filter.Present(f.Limit) { - if *f.Limit == 0 { - continue - } - i = *f.Limit - } - // prevent kind-4 events from being returned to unauthed users, - // only when authentication is a thing - if auther, ok := s.relay.(relay.Authenticator); ok && auther.AuthEnabled() { - if f.Kinds.IsPrivileged() { - - log.T.F("privileged request with auth enabled\n%s", f.Serialize()) - senders := f.Authors - receivers := f.Tags.GetAll(tag.New("#p")) - // log.I.S(senders, receivers) - // log.I.F("%0x\n%s\nsender:%v\nreceiver:%v", ws.AuthedBytes(), f.Serialize(), senders.Contains(ws.AuthedBytes()), - // receivers.ContainsAny(B("#p"), tag.New(ws.AuthedBytes()))) - switch { - case len(ws.Authed()) == 0: - ws.RequestAuth() - if err = closedenvelope.NewFrom(env.Subscription, - normalize.AuthRequired.F("auth required for request processing")). - Write(ws); chk.E(err) { - } - log.I.F("requesting auth from client from %s", ws.RealRemote()) - if err = authenvelope.NewChallengeWith(ws.Challenge()).Write(ws); chk.E(err) { - return - } - // not authenticated - notice := normalize.Restricted.F( - "this realy does not serve kind-4 to unauthenticated users," + - " does your client implement NIP-42?") - return notice - case senders.Contains(ws.AuthedBytes()) || - receivers.ContainsAny(B("#p"), tag.New(ws.AuthedBytes())): - log.T.F("user %0x from %s allowed to query for privileged event", - ws.AuthedBytes(), ws.RealRemote()) - default: - // restricted filter: do not return any events, - // even if other elements in filters array were not restricted). - // client should know better. - return normalize.Restricted.F("authenticated user %0x does not have"+ - " authorization for requested filters", ws.AuthedBytes()) - } - } - } - var events event.Ts - log.D.F("query from %s %0x,%s", ws.RealRemote(), ws.AuthedBytes(), f.Serialize()) - if events, err = sto.QueryEvents(c, f); err != nil { - log.E.F("eventstore: %v", err) - if errors.Is(err, badger.ErrDBClosed) { - return - } - continue - } - // filter out events from authors in the user's mute list, because they are literally a - // waste of bandwidth from a user's perspective - if aut := ws.Authed(); ws.IsAuthed() { - var mutes event.Ts - if mutes, err = sto.QueryEvents(c, &filter.T{Authors: tag.New(aut), - Kinds: kinds.New(kind.MuteList)}); !chk.E(err) { - // found the users mute list, now, generate the list so we can filter on it - var mutePubs []B - for _, ev := range mutes { - for _, t := range ev.Tags.F() { - if equals(t.Key(), B("p")) { - var p B - if p, err = hex.Dec(S(t.Value())); chk.E(err) { - continue - } - mutePubs = append(mutePubs, p) - } - } - } - var tmp event.Ts - for _, ev := range events { - for _, pk := range mutePubs { - if equals(ev.PubKey, pk) { - continue - } - tmp = append(tmp, ev) - } - } - events = tmp - } - } - // sort in reverse chronological order - sort.Slice(events, func(i, j int) bool { - return events[i].CreatedAt.Int() > events[j].CreatedAt.Int() - }) - for _, ev := range events { - if s.options.skipEventFunc != nil && s.options.skipEventFunc(ev) { - continue - } - i-- - if i < 0 { - break - } - var res *eventenvelope.Result - if res, err = eventenvelope.NewResultWith(env.Subscription.T, ev); chk.E(err) { - return - } - if err = res.Write(ws); chk.E(err) { - return - } - } - } - if err = eoseenvelope.NewFrom(env.Subscription).Write(ws); chk.E(err) { - return - } - if env.Filters != allowed { - // don't add a subscription if the user is not authed but auth is required - return - } - setListener(env.Subscription.String(), ws, env.Filters) - return -} - -func (s *Server) doClose(c Ctx, ws *web.Socket, req B, store store.I) (note B) { - - var err E - var rem B - env := closeenvelope.New() - if rem, err = env.UnmarshalJSON(req); chk.E(err) { - return B(err.Error()) - } - if len(rem) > 0 { - log.I.F("extra '%s'", rem) - } - if env.ID.String() == "" { - return B("CLOSE has no ") - } - removeListenerId(ws, env.ID.String()) - return -} - -func (s *Server) doAuth(c Ctx, ws *web.Socket, req B, store store.I) (msg B) { - if auther, ok := s.relay.(relay.Authenticator); ok && auther.AuthEnabled() { - svcUrl := auther.ServiceUrl(ws.Req()) - if svcUrl == "" { - return - } - log.T.F("received auth response,%s", req) - var err E - var rem B - env := authenvelope.NewResponse() - if rem, err = env.UnmarshalJSON(req); chk.E(err) { - return - } - if len(rem) > 0 { - log.I.F("extra '%s'", rem) - } - - var valid bool - if valid, err = auth.Validate(env.Event, B(ws.Challenge()), svcUrl); chk.E(err) { - - if err := okenvelope.NewFrom(env.Event.ID, false, - normalize.Error.F(err.Error())).Write(ws); chk.E(err) { - - return B(err.Error()) - } - return normalize.Error.F(err.Error()) - - } else if !valid { - - if err = okenvelope.NewFrom(env.Event.ID, false, - normalize.Error.F("failed to authenticate")).Write(ws); chk.E(err) { - - return B(err.Error()) - } - return normalize.Restricted.F("auth response does not validate") - } else { - if err = okenvelope.NewFrom(env.Event.ID, true, B{}).Write(ws); chk.E(err) { - return - } - log.D.F("%s authed to pubkey,%0x", ws.RealRemote(), env.Event.PubKey) - ws.SetAuthed(S(env.Event.PubKey)) - } - } - return -} - -func (s *Server) HandleWebsocket(w http.ResponseWriter, r *http.Request) { - conn, err := upgrader.Upgrade(w, r, nil) - if err != nil { - log.E.F("failed to upgrade websocket: %v", err) - return - } - s.clientsMu.Lock() - defer s.clientsMu.Unlock() - s.clients[conn] = struct{}{} - ticker := time.NewTicker(pingPeriod) - - ip := conn.RemoteAddr().String() - var realIP S - if realIP = r.Header.Get("X-Forwarded-For"); realIP != "" { - ip = realIP // possible to be multiple comma separated - } else if realIP = r.Header.Get("X-Real-Ip"); realIP != "" { - ip = realIP - } - log.T.F("connected from %s", ip) - ws := challenge(conn, r, ip) - if s.options.perConnectionLimiter != nil { - ws.SetLimiter(rate.NewLimiter( - s.options.perConnectionLimiter.Limit(), - s.options.perConnectionLimiter.Burst(), - )) - } - - ctx, cancel := context.WithCancel(context.Background()) - - store := s.relay.Storage(ctx) - - // reader - go func() { - defer func() { - cancel() - ticker.Stop() - s.clientsMu.Lock() - if _, ok := s.clients[conn]; ok { - conn.Close() - delete(s.clients, conn) - removeListener(ws) - } - s.clientsMu.Unlock() - // log.T.F("disconnected from %s", ip) - }() - - conn.SetReadLimit(maxMessageSize) - conn.SetReadDeadline(time.Now().Add(pongWait)) - conn.SetPongHandler(func(S) E { - conn.SetReadDeadline(time.Now().Add(pongWait)) - return nil - }) - - if ws.AuthRequested() && len(ws.Authed()) == 0 { - log.I.F("requesting auth from client from %s", ws.RealRemote()) - if err = authenvelope.NewChallengeWith(ws.Challenge()).Write(ws); chk.E(err) { - return - } - // ignore requests until request is responded to - return - } - var message B - var typ N - for { - typ, message, err = conn.ReadMessage() - if err != nil { - if websocket.IsUnexpectedCloseError( - err, - websocket.CloseNormalClosure, - websocket.CloseGoingAway, // 1001 - websocket.CloseNoStatusReceived, // 1005 - websocket.CloseAbnormalClosure, // 1006 - ) { - log.W.F("unexpected close error from %s: %v", - r.Header.Get("X-Forwarded-For"), err) - } - break - } - - if ws.Limiter() != nil { - // NOTE: Wait will throttle the requests. - // To reject requests exceeding the limit, use if !ws.limiter.Allow() - if err := ws.Limiter().Wait(context.TODO()); chk.T(err) { - log.W.F("unexpected limiter error %v", err) - continue - } - } - - if typ == websocket.PingMessage { - if err = ws.WriteMessage(websocket.PongMessage, nil); chk.E(err) { - // probably should abort if error here? - } - continue - } - - go s.handleMessage(ctx, ws, message, store) - } - }() - - // writer - go func() { - defer func() { - cancel() - ticker.Stop() - conn.Close() - }() - var err E - for { - select { - case <-ticker.C: - err = conn.WriteControl(websocket.PingMessage, nil, time.Now().Add(writeWait)) - if err != nil { - log.E.F("error writing ping: %v; closing websocket", err) - return - } - ws.RealRemote() - case <-ctx.Done(): - return - } - } - }() -} diff --git a/realy/listener.go b/realy/merged.go similarity index 62% rename from realy/listener.go rename to realy/merged.go index 04ee970..a7c71ee 100644 --- a/realy/listener.go +++ b/realy/merged.go @@ -1,42 +1,79 @@ package realy import ( + "crypto/rand" + "net/http" + "regexp" "sync" + "time" + "github.com/fasthttp/websocket" + + "realy.lol/bech32encoding" + "realy.lol/ec/bech32" "realy.lol/envelopes/eventenvelope" "realy.lol/event" "realy.lol/filters" "realy.lol/tag" + "realy.lol/units" "realy.lol/web" ) -type Listener struct { - filters *filters.T -} +type ( + Listener struct{ filters *filters.T } +) + +const ( + ChallengeHRP = "nchal" + writeWait = 10 * time.Second + pongWait = 60 * time.Second + pingPeriod = pongWait / 2 + maxMessageSize = 1 * units.Mb + ChallengeLength = 16 +) var ( + nip20prefixmatcher = regexp.MustCompile(`^\w+: `) + upgrader = websocket.Upgrader{ReadBufferSize: 1024, WriteBufferSize: 1024, + CheckOrigin: func(r *http.Request) bool { + return true + }} listeners = make(map[*web.Socket]map[S]*Listener) listenersMutex sync.Mutex ) +func challenge(conn *websocket.Conn, req *http.Request, addr string) (ws *web.Socket) { + var err error + cb := make([]byte, ChallengeLength) + if _, err = rand.Read(cb); chk.E(err) { + panic(err) + } + var b5 B + if b5, err = bech32encoding.ConvertForBech32(cb); chk.E(err) { + return + } + var encoded B + if encoded, err = bech32.Encode(bech32.B(ChallengeHRP), b5); chk.E(err) { + return + } + ws = web.NewSocket(conn, req, encoded) + return +} + func setListener(id S, ws *web.Socket, ff *filters.T) { listenersMutex.Lock() defer listenersMutex.Unlock() - subs, ok := listeners[ws] if !ok { subs = make(map[S]*Listener) listeners[ws] = subs } - subs[id] = &Listener{filters: ff} } -// Remove a specific subscription id from listeners for a given ws client func removeListenerId(ws *web.Socket, id S) { listenersMutex.Lock() defer listenersMutex.Unlock() - if subs, ok := listeners[ws]; ok { delete(listeners[ws], id) if len(subs) == 0 { @@ -45,7 +82,6 @@ func removeListenerId(ws *web.Socket, id S) { } } -// Remove T conn from listeners func removeListener(ws *web.Socket) { listenersMutex.Lock() defer listenersMutex.Unlock() @@ -55,7 +91,6 @@ func removeListener(ws *web.Socket) { func notifyListeners(authRequired bool, ev *event.T) { if ev == nil { - // nothing to do return } var err E @@ -69,7 +104,6 @@ func notifyListeners(authRequired bool, ev *event.T) { if !listener.filters.Match(ev) { continue } - // is the subscriber authorized to see privileged event? if ev.Kind.IsPrivileged() { ab := ws.AuthedBytes() var containsPubkey bool diff --git a/realy/options/options.go b/realy/options/options.go new file mode 100644 index 0000000..05d2fc5 --- /dev/null +++ b/realy/options/options.go @@ -0,0 +1,30 @@ +package options + +import ( + "golang.org/x/time/rate" + + "realy.lol/event" +) + +type T struct { + PerConnectionLimiter *rate.Limiter + SkipEventFunc func(*event.T) bool +} + +type O func(*T) + +func Default() *T { + return &T{} +} + +func WithPerConnectionLimiter(rps rate.Limit, burst N) O { + return func(o *T) { + o.PerConnectionLimiter = rate.NewLimiter(rps, burst) + } +} + +func WithSkipEventFunc(skipEventFunc func(*event.T) bool) O { + return func(o *T) { + o.SkipEventFunc = skipEventFunc + } +} diff --git a/realy/options/util.go b/realy/options/util.go new file mode 100644 index 0000000..4723ebf --- /dev/null +++ b/realy/options/util.go @@ -0,0 +1,21 @@ +package options + +import ( + "bytes" + + "realy.lol/context" + "realy.lol/lol" +) + +type ( + B = []byte + S = string + E = error + N = int + Ctx = context.T +) + +var ( + log, chk, errorf = lol.Main.Log, lol.Main.Check, lol.Main.Errorf + equals = bytes.Equal +) diff --git a/realy/server.go b/realy/server.go index 36d00d4..e477a46 100644 --- a/realy/server.go +++ b/realy/server.go @@ -1,36 +1,60 @@ package realy import ( + "bytes" + "crypto/subtle" + _ "embed" + "encoding/json" "errors" "fmt" + "io" "net" "net/http" + "sort" "strconv" + "strings" "sync" "time" + "github.com/dgraph-io/badger/v4" "github.com/fasthttp/websocket" "github.com/rs/cors" "golang.org/x/time/rate" + "realy.lol/auth" + "realy.lol/cmd/realy/app" "realy.lol/context" + "realy.lol/envelopes" + "realy.lol/envelopes/authenvelope" + "realy.lol/envelopes/closedenvelope" + "realy.lol/envelopes/closeenvelope" + "realy.lol/envelopes/countenvelope" + "realy.lol/envelopes/eoseenvelope" + "realy.lol/envelopes/eventenvelope" + "realy.lol/envelopes/noticeenvelope" + "realy.lol/envelopes/okenvelope" + "realy.lol/envelopes/reqenvelope" "realy.lol/event" + "realy.lol/filter" + "realy.lol/hex" + "realy.lol/ints" + "realy.lol/kind" + "realy.lol/kinds" + "realy.lol/normalize" + "realy.lol/realy/options" "realy.lol/relay" + "realy.lol/relay/wrapper" + "realy.lol/relayinfo" + "realy.lol/sha256" + "realy.lol/store" + "realy.lol/tag" + "realy.lol/web" ) -// Server is a base for package users to implement nostr relays. -// It can serve HTTP requests and websockets, passing control over to a relay implementation. -// -// To implement a relay, it is enough to satisfy [Relay] interface. Other interfaces are -// [Informationer], [CustomWebSocketHandler], [ShutdownAware] and AdvancedXxx types. -// See their respective doc comments. -// -// The basic usage is to call Start, which starts serving immediately. -// For a more fine-grained control, use NewServer. type Server struct { Ctx Cancel context.F - options *Options + options *options.T relay relay.I clientsMu sync.Mutex clients map[*websocket.Conn]struct{} @@ -42,8 +66,6 @@ type Server struct { adminUser, adminPass S } -func (s *Server) Router() *http.ServeMux { return s.serveMux } - type ServerParams struct { Ctx Cancel context.F @@ -53,42 +75,27 @@ type ServerParams struct { AdminUser, AdminPass S } -// NewServer initializes the realy and its storage using their respective Init methods, -// returning any non-nil errors, and returns a Server ready to listen for HTTP requests. -func NewServer(sp ServerParams, opts ...Option) (*Server, E) { - options := DefaultOptions() +func NewServer(sp ServerParams, opts ...options.O) (*Server, E) { + op := options.Default() for _, opt := range opts { - opt(options) + opt(op) } var authRequired bool if ar, ok := sp.Rl.(relay.Authenticator); ok { authRequired = ar.AuthEnabled() } - srv := &Server{ - Ctx: sp.Ctx, - Cancel: sp.Cancel, - relay: sp.Rl, - clients: make(map[*websocket.Conn]struct{}), - serveMux: http.NewServeMux(), - options: options, - authRequired: authRequired, - maxLimit: sp.MaxLimit, - adminUser: sp.AdminUser, - adminPass: sp.AdminPass, - } - + srv := &Server{Ctx: sp.Ctx, Cancel: sp.Cancel, relay: sp.Rl, + clients: make(map[*websocket.Conn]struct{}), serveMux: http.NewServeMux(), + options: op, authRequired: authRequired, maxLimit: sp.MaxLimit, + adminUser: sp.AdminUser, adminPass: sp.AdminPass} if storage := sp.Rl.Storage(context.Bg()); storage != nil { if err := storage.Init(sp.DbPath); chk.T(err) { return nil, fmt.Errorf("storage init: %w", err) } } - - // init the relay if err := sp.Rl.Init(); chk.T(err) { return nil, fmt.Errorf("realy init: %w", err) } - - // start listening from events from other sources, if any if inj, ok := sp.Rl.(relay.Injector); ok { go func() { for ev := range inj.InjectEvents() { @@ -99,7 +106,824 @@ func NewServer(sp ServerParams, opts ...Option) (*Server, E) { return srv, nil } -// ServeHTTP implements http.Handler interface. +func (s *Server) HTTPAuth(r *http.Request) (authed bool) { + username, password, ok := r.BasicAuth() + if ok { + usernameHash := sha256.Sum256(B(username)) + passwordHash := sha256.Sum256(B(password)) + expectedUsernameHash := sha256.Sum256(B(s.adminUser)) + expectedPasswordHash := sha256.Sum256(B(s.adminPass)) + usernameMatch := subtle.ConstantTimeCompare(usernameHash[:], + expectedUsernameHash[:]) == 1 + passwordMatch := subtle.ConstantTimeCompare(passwordHash[:], + expectedPasswordHash[:]) == 1 + if usernameMatch && passwordMatch { + return true + } + } + return +} + +func (s *Server) AuthFail(w http.ResponseWriter) { + w.Header().Set("WWW-Authenticate", `Basic realm="restricted", charset="UTF-8"`) + http.Error(w, "Unauthorized", http.StatusUnauthorized) +} + +func (s *Server) HandleAdmin(w http.ResponseWriter, r *http.Request) { + switch { + case strings.HasPrefix(r.URL.Path, "/export"): + if ok := s.HTTPAuth(r); !ok { + s.AuthFail(w) + return + } + log.I.F("export of event data requested on admin port") + sto := s.relay.Storage(context.Bg()) + if strings.Count(r.URL.Path, "/") > 1 { + split := strings.Split(r.URL.Path, "/") + if len(split) != 3 { + fprintf(w, "incorrectly formatted export parameter: '%s'", r.URL.Path) + return + } + switch split[2] { + case "users": + if rl, ok := s.relay.(*app.Relay); ok { + follows := make([]B, 0, len(rl.Followed)) + for f := range rl.Followed { + follows = append(follows, B(f)) + } + sto.Export(s.Ctx, w, follows...) + } + default: + var exportPubkeys []B + pubkeys := strings.Split(split[2], "-") + for _, pubkey := range pubkeys { + pk, err := hex.Dec(pubkey) + if err != nil { + log.E.F("invalid public key '%s' in parameters", pubkey) + continue + } + exportPubkeys = append(exportPubkeys, pk) + } + sto.Export(s.Ctx, w, exportPubkeys...) + } + } else { + sto.Export(s.Ctx, w) + } + case strings.HasPrefix(r.URL.Path, "/import"): + if ok := s.HTTPAuth(r); !ok { + s.AuthFail(w) + return + } + log.I.F("import of event data requested on admin port %s", r.RequestURI) + sto := s.relay.Storage(context.Bg()) + read := io.LimitReader(r.Body, r.ContentLength) + sto.Import(read) + case strings.HasPrefix(r.URL.Path, "/shutdown"): + if ok := s.HTTPAuth(r); !ok { + s.AuthFail(w) + return + } + fprintf(w, "shutting down") + defer chk.E(r.Body.Close()) + s.Shutdown() + default: + fprintf(w, "todo: realy web interface page\n\n") + s.HandleNIP11(w, r) + } +} + +func (s *Server) HandleNIP11(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + log.T.Ln("handling relay information document") + var info *relayinfo.T + if informationer, ok := s.relay.(relay.Informationer); ok { + info = informationer.GetNIP11InformationDocument() + } else { + supportedNIPs := relayinfo.GetList(relayinfo.BasicProtocol, relayinfo.EventDeletion, + relayinfo.RelayInformationDocument, relayinfo.GenericTagQueries, + relayinfo.NostrMarketplace, + relayinfo.EventTreatment, relayinfo.CommandResults, + relayinfo.ParameterizedReplaceableEvents, + relayinfo.ProtectedEvents) + var auther relay.Authenticator + if auther, ok = s.relay.(relay.Authenticator); ok && auther.ServiceUrl(r) != "" { + supportedNIPs = append(supportedNIPs, relayinfo.Authentication.N()) + } + var storage store.I + if s.relay.Storage(context.Bg()) != nil { + if _, ok = storage.(relay.EventCounter); ok { + supportedNIPs = append(supportedNIPs, relayinfo.CountingResults.N()) + } + } + log.T.Ln("supported NIPs", supportedNIPs) + info = &relayinfo.T{Name: s.relay.Name(), + Description: "relay powered by the realy framework", + Nips: supportedNIPs, Software: "https://realy.lol", Version: version, + Limitation: relayinfo.Limits{MaxLimit: s.maxLimit, AuthRequired: s.authRequired}, + Icon: "https://cdn.satellite.earth/ac9778868fbf23b63c47c769a74e163377e6ea94d3f0f31711931663d035c4f6.png"} + } + if err := json.NewEncoder(w).Encode(info); chk.E(err) { + } +} + +func addEvent(c Ctx, rl relay.I, ev *event.T, hr *http.Request, origin S, + authedPubkey B) (accepted bool, message B) { + if ev == nil { + return false, normalize.Invalid.F("empty event") + } + sto := rl.Storage(c) + wrap := &wrapper.Relay{I: sto} + advancedSaver, _ := sto.(relay.AdvancedSaver) + accept, notice, after := rl.AcceptEvent(c, ev, hr, origin, authedPubkey) + if !accept { + return false, normalize.Blocked.F(notice) + } + if ev.Tags.ContainsProtectedMarker() { + if len(authedPubkey) == 0 || !equals(ev.PubKey, authedPubkey) { + return false, B(fmt.Sprintf("event with relay marker tag '-' may only be published by matching npub: %0x is not %0x", + authedPubkey, ev.PubKey)) + } + } + if ev.Kind.IsEphemeral() { + } else { + if advancedSaver != nil { + advancedSaver.BeforeSave(c, ev) + } + if saveErr := wrap.Publish(c, ev); chk.E(saveErr) { + if errors.Is(saveErr, store.ErrDupEvent) { + return false, normalize.Error.F(saveErr.Error()) + } + errmsg := saveErr.Error() + if nip20prefixmatcher.MatchString(errmsg) { + if strings.Contains(errmsg, "tombstone") { + return false, normalize.Blocked.F("event was deleted, not storing it again") + } + return false, normalize.Error.F(errmsg) + } else { + return false, normalize.Error.F("failed to save (%s)", errmsg) + } + } + if advancedSaver != nil { + advancedSaver.AfterSave(ev) + } + } + var authRequired bool + if ar, ok := rl.(relay.Authenticator); ok { + authRequired = ar.AuthEnabled() + } + if after != nil { + after() + } + notifyListeners(authRequired, ev) + accepted = true + return +} + +func (s *Server) doEvent(c Ctx, ws *web.Socket, req B, sto store.I) (msg B) { + log.T.F("doEvent %s %s", ws.RealRemote(), req) + var err E + var ok bool + var rem B + advancedDeleter, _ := sto.(relay.AdvancedDeleter) + env := eventenvelope.NewSubmission() + if rem, err = env.UnmarshalJSON(req); chk.E(err) { + return + } + if len(rem) > 0 { + log.I.F("extra '%s'", rem) + } + accept, notice, after := s.relay.AcceptEvent(c, env.T, ws.Req(), ws.RealRemote(), + B(ws.Authed())) + if !accept { + var auther relay.Authenticator + if auther, ok = s.relay.(relay.Authenticator); ok && auther.AuthEnabled() { + if !ws.AuthRequested() { + if err = okenvelope.NewFrom(env.ID, false, + normalize.AuthRequired.F("auth required for request processing")).Write(ws); chk.T(err) { + } + log.T.F("requesting auth from client %s", ws.RealRemote()) + if err = authenvelope.NewChallengeWith(ws.Challenge()).Write(ws); chk.T(err) { + return + } + ws.RequestAuth() + return + } else { + if err = okenvelope.NewFrom(env.ID, false, + normalize.AuthRequired.F("auth required for storing events")).Write(ws); chk.T(err) { + } + log.T.F("requesting auth again from client %s", ws.RealRemote()) + if err = authenvelope.NewChallengeWith(ws.Challenge()).Write(ws); chk.T(err) { + return + } + return + } + } + if err = okenvelope.NewFrom(env.ID, false, + normalize.Invalid.F(notice)).Write(ws); chk.T(err) { + } + return + } + if !equals(env.GetIDBytes(), env.ID) { + if err = okenvelope.NewFrom(env.ID, false, + normalize.Invalid.F("event id is computed incorrectly")).Write(ws); chk.E(err) { + return + } + return + } + if ok, err = env.Verify(); chk.T(err) { + if err = okenvelope.NewFrom(env.ID, false, + normalize.Error.F("failed to verify signature")).Write(ws); chk.E(err) { + return + } + } else if !ok { + if err = okenvelope.NewFrom(env.ID, false, + normalize.Error.F("signature is invalid")).Write(ws); chk.E(err) { + return + } + return + } + if env.T.Kind.K == kind.Deletion.K { + log.I.F("delete event\n%s", env.T.Serialize()) + for _, t := range env.Tags.Value() { + var res []*event.T + if t.Len() >= 2 { + switch { + case equals(t.Key(), B("e")): + evId := make(B, sha256.Size) + if _, err = hex.DecBytes(evId, t.Value()); chk.E(err) { + continue + } + res, err = s.relay.Storage(c).QueryEvents(c, &filter.T{IDs: tag.New(evId)}) + if err != nil { + if err = okenvelope.NewFrom(env.ID, false, + normalize.Error.F("failed to query for target event")).Write(ws); chk.E(err) { + return + } + return + } + for i := range res { + if res[i].Kind.Equal(kind.Deletion) { + if err = okenvelope.NewFrom(env.ID, false, + normalize.Blocked.F("not processing or storing delete event containing delete event references")).Write(ws); chk.E(err) { + return + } + } + if !equals(res[i].PubKey, env.T.PubKey) { + if err = okenvelope.NewFrom(env.ID, false, + normalize.Blocked.F("cannot delete other users' events")).Write(ws); chk.E(err) { + return + } + } + } + case equals(t.Key(), B("a")): + split := bytes.Split(t.Value(), B{':'}) + if len(split) != 3 { + continue + } + kin := ints.New(uint16(0)) + if _, err = kin.UnmarshalJSON(split[0]); chk.E(err) { + return + } + kk := kind.New(kin.Uint16()) + if kk.Equal(kind.Deletion) { + if err = okenvelope.NewFrom(env.ID, false, + normalize.Blocked.F("delete event kind may not be deleted")).Write(ws); chk.E(err) { + return + } + } + if !kk.IsParameterizedReplaceable() { + if err = okenvelope.NewFrom(env.ID, false, + normalize.Error.F("delete tags with a tags containing non-parameterized-replaceable events cannot be processed")).Write(ws); chk.E(err) { + return + } + } + if !equals(split[1], env.T.PubKey) { + if err = okenvelope.NewFrom(env.ID, false, + normalize.Blocked.F("cannot delete other users' events")).Write(ws); chk.E(err) { + return + } + } + f := filter.New() + f.Kinds.K = []*kind.T{kk} + aut := make(B, 0, len(split[1])/2) + if aut, err = hex.DecAppend(aut, split[1]); chk.E(err) { + return + } + f.Authors.Append(aut) + f.Tags.AppendTags(tag.New(B{'#', 'd'}, split[2])) + res, err = s.relay.Storage(c).QueryEvents(c, f) + if err != nil { + if err = okenvelope.NewFrom(env.ID, false, + normalize.Error.F("failed to query for target event")).Write(ws); chk.E(err) { + return + } + return + } + } + } + if len(res) < 1 { + continue + } + var resTmp []*event.T + for _, v := range res { + if env.T.CreatedAt.U64() >= v.CreatedAt.U64() { + resTmp = append(resTmp, v) + } + } + res = resTmp + for _, target := range res { + if target.Kind.K == kind.Deletion.K { + if err = okenvelope.NewFrom(env.ID, false, + normalize.Error.F("cannot delete delete event %s", + env.ID)).Write(ws); chk.E(err) { + return + } + } + if target.CreatedAt.Int() > env.T.CreatedAt.Int() { + log.I.F("not deleting\n%d%\nbecause delete event is older\n%d", + target.CreatedAt.Int(), env.T.CreatedAt.Int()) + continue + } + if !equals(target.PubKey, env.PubKey) { + if err = okenvelope.NewFrom(env.ID, false, + normalize.Error.F("only author can delete event")).Write(ws); chk.E(err) { + return + } + return + } + if advancedDeleter != nil { + advancedDeleter.BeforeDelete(c, t.Value(), env.PubKey) + } + if err = sto.DeleteEvent(c, target.EventID()); chk.T(err) { + if err = okenvelope.NewFrom(env.ID, false, + normalize.Error.F(err.Error())).Write(ws); chk.E(err) { + return + } + return + } + if advancedDeleter != nil { + advancedDeleter.AfterDelete(t.Value(), env.PubKey) + } + } + res = nil + } + if err = okenvelope.NewFrom(env.ID, true).Write(ws); chk.E(err) { + return + } + } + ok, reason := addEvent(c, s.relay, env.T, ws.Req(), ws.RealRemote(), B(ws.Authed())) + if err = okenvelope.NewFrom(env.ID, ok, reason).Write(ws); chk.E(err) { + return + } + if after != nil { + after() + } + return +} + +func (s *Server) doCount(c context.T, ws *web.Socket, req B, store store.I) (msg B) { + counter, ok := store.(relay.EventCounter) + if !ok { + return normalize.Restricted.F("this relay does not support NIP-45") + } + if ws.AuthRequested() && len(ws.Authed()) == 0 { + return + } + var err E + var rem B + env := countenvelope.New() + if rem, err = env.UnmarshalJSON(req); chk.E(err) { + return normalize.Error.F(err.Error()) + } + if len(rem) > 0 { + log.I.F("extra '%s'", rem) + } + if env.Subscription == nil || env.Subscription.String() == "" { + return normalize.Error.F("COUNT has no ") + } + allowed := env.Filters + if accepter, ok := s.relay.(relay.ReqAcceptor); ok { + var accepted bool + allowed, accepted = accepter.AcceptReq(c, ws.Req(), env.Subscription.T, env.Filters, + B(ws.Authed())) + if !accepted || allowed == nil { + var auther relay.Authenticator + if auther, ok = s.relay.(relay.Authenticator); ok && auther.AuthEnabled() && !ws.AuthRequested() { + ws.RequestAuth() + if err = closedenvelope.NewFrom(env.Subscription, + normalize.AuthRequired.F("auth required for count processing")).Write(ws); chk.E(err) { + } + log.I.F("requesting auth from client from %s", ws.RealRemote()) + if err = authenvelope.NewChallengeWith(ws.Challenge()).Write(ws); chk.E(err) { + return + } + return + } + } + } + if allowed != env.Filters { + defer func() { + var auther relay.Authenticator + var ok bool + if auther, ok = s.relay.(relay.Authenticator); ok && auther.AuthEnabled() && !ws.AuthRequested() { + ws.RequestAuth() + if err = closedenvelope.NewFrom(env.Subscription, + normalize.AuthRequired.F("auth required for request processing")).Write(ws); chk.E(err) { + } + log.T.F("requesting auth from client from %s, challenge '%s'", ws.RealRemote(), + ws.Challenge()) + if err = authenvelope.NewChallengeWith(ws.Challenge()).Write(ws); chk.E(err) { + return + } + return + } + }() + } + var total N + var approx bool + if allowed != nil { + for _, f := range allowed.F { + var auther relay.Authenticator + if auther, ok = s.relay.(relay.Authenticator); ok && auther.AuthEnabled() { + if f.Kinds.Contains(kind.EncryptedDirectMessage) || f.Kinds.Contains(kind.GiftWrap) { + senders := f.Authors + receivers := f.Tags.GetAll(tag.New("p")) + switch { + case len(ws.Authed()) == 0: + return normalize.Restricted.F("this realy does not serve kind-4 to unauthenticated users," + " does your client implement NIP-42?") + case senders.Len() == 1 && receivers.Len() < 2 && equals(senders.F()[0], + B(ws.Authed())): + case receivers.Len() == 1 && senders.Len() < 2 && equals(receivers.N(0).Value(), + B(ws.Authed())): + default: + return normalize.Restricted.F("authenticated user does not have" + " authorization for requested filters") + } + } + } + var count N + count, approx, err = counter.CountEvents(c, f) + if err != nil { + log.E.F("store: %v", err) + continue + } + total += count + } + } + var res *countenvelope.Response + if res, err = countenvelope.NewResponseFrom(env.Subscription.T, total, approx); chk.E(err) { + return + } + if err = res.Write(ws); chk.E(err) { + return + } + return +} + +func (s *Server) doReq(c Ctx, ws *web.Socket, req B, sto store.I) (r B) { + if ws.AuthRequested() && len(ws.Authed()) == 0 { + return + } + var err E + var rem B + env := reqenvelope.New() + if rem, err = env.UnmarshalJSON(req); chk.E(err) { + return normalize.Error.F(err.Error()) + } + if len(rem) > 0 { + log.I.F("extra '%s'", rem) + } + allowed := env.Filters + if accepter, ok := s.relay.(relay.ReqAcceptor); ok { + var accepted bool + allowed, accepted = accepter.AcceptReq(c, ws.Req(), env.Subscription.T, env.Filters, + B(ws.Authed())) + if !accepted || allowed == nil { + var auther relay.Authenticator + if auther, ok = s.relay.(relay.Authenticator); ok && auther.AuthEnabled() && !ws.AuthRequested() { + ws.RequestAuth() + if err = closedenvelope.NewFrom(env.Subscription, + normalize.AuthRequired.F("auth required for request processing")).Write(ws); chk.E(err) { + } + log.T.F("requesting auth from client from %s, challenge '%s'", ws.RealRemote(), + ws.Challenge()) + if err = authenvelope.NewChallengeWith(ws.Challenge()).Write(ws); chk.E(err) { + return + } + return + } + return + } + } + if allowed != env.Filters { + defer func() { + var auther relay.Authenticator + var ok bool + if auther, ok = s.relay.(relay.Authenticator); ok && auther.AuthEnabled() && !ws.AuthRequested() { + ws.RequestAuth() + if err = closedenvelope.NewFrom(env.Subscription, + normalize.AuthRequired.F("auth required for request processing")).Write(ws); chk.E(err) { + } + log.T.F("requesting auth from client from %s, challenge '%s'", ws.RealRemote(), + ws.Challenge()) + if err = authenvelope.NewChallengeWith(ws.Challenge()).Write(ws); chk.E(err) { + return + } + return + } + }() + } + for _, f := range allowed.F { + var i uint + if filter.Present(f.Limit) { + if *f.Limit == 0 { + continue + } + i = *f.Limit + } + if auther, ok := s.relay.(relay.Authenticator); ok && auther.AuthEnabled() { + if f.Kinds.IsPrivileged() { + log.T.F("privileged request with auth enabled\n%s", f.Serialize()) + senders := f.Authors + receivers := f.Tags.GetAll(tag.New("#p")) + switch { + case len(ws.Authed()) == 0: + ws.RequestAuth() + if err = closedenvelope.NewFrom(env.Subscription, + normalize.AuthRequired.F("auth required for request processing")).Write(ws); chk.E(err) { + } + log.I.F("requesting auth from client from %s", ws.RealRemote()) + if err = authenvelope.NewChallengeWith(ws.Challenge()).Write(ws); chk.E(err) { + return + } + notice := normalize.Restricted.F("this realy does not serve kind-4 to unauthenticated users," + " does your client implement NIP-42?") + return notice + case senders.Contains(ws.AuthedBytes()) || receivers.ContainsAny(B("#p"), + tag.New(ws.AuthedBytes())): + log.T.F("user %0x from %s allowed to query for privileged event", + ws.AuthedBytes(), ws.RealRemote()) + default: + return normalize.Restricted.F("authenticated user %0x does not have"+" authorization for requested filters", + ws.AuthedBytes()) + } + } + } + var events event.Ts + log.D.F("query from %s %0x,%s", ws.RealRemote(), ws.AuthedBytes(), f.Serialize()) + if events, err = sto.QueryEvents(c, f); err != nil { + log.E.F("eventstore: %v", err) + if errors.Is(err, badger.ErrDBClosed) { + return + } + continue + } + if aut := ws.Authed(); ws.IsAuthed() { + var mutes event.Ts + if mutes, err = sto.QueryEvents(c, &filter.T{Authors: tag.New(aut), + Kinds: kinds.New(kind.MuteList)}); !chk.E(err) { + var mutePubs []B + for _, ev := range mutes { + for _, t := range ev.Tags.F() { + if equals(t.Key(), B("p")) { + var p B + if p, err = hex.Dec(S(t.Value())); chk.E(err) { + continue + } + mutePubs = append(mutePubs, p) + } + } + } + var tmp event.Ts + for _, ev := range events { + for _, pk := range mutePubs { + if equals(ev.PubKey, pk) { + continue + } + tmp = append(tmp, ev) + } + } + events = tmp + } + } + sort.Slice(events, func(i, j int) bool { + return events[i].CreatedAt.Int() > events[j].CreatedAt.Int() + }) + for _, ev := range events { + if s.options.SkipEventFunc != nil && s.options.SkipEventFunc(ev) { + continue + } + i-- + if i < 0 { + break + } + var res *eventenvelope.Result + if res, err = eventenvelope.NewResultWith(env.Subscription.T, ev); chk.E(err) { + return + } + if err = res.Write(ws); chk.E(err) { + return + } + } + } + if err = eoseenvelope.NewFrom(env.Subscription).Write(ws); chk.E(err) { + return + } + if env.Filters != allowed { + return + } + setListener(env.Subscription.String(), ws, env.Filters) + return +} + +func (s *Server) doClose(ws *web.Socket, req B) (note B) { + var err E + var rem B + env := closeenvelope.New() + if rem, err = env.UnmarshalJSON(req); chk.E(err) { + return B(err.Error()) + } + if len(rem) > 0 { + log.I.F("extra '%s'", rem) + } + if env.ID.String() == "" { + return B("CLOSE has no ") + } + removeListenerId(ws, env.ID.String()) + return +} + +func (s *Server) doAuth(ws *web.Socket, req B) (msg B) { + if auther, ok := s.relay.(relay.Authenticator); ok && auther.AuthEnabled() { + svcUrl := auther.ServiceUrl(ws.Req()) + if svcUrl == "" { + return + } + log.T.F("received auth response,%s", req) + var err E + var rem B + env := authenvelope.NewResponse() + if rem, err = env.UnmarshalJSON(req); chk.E(err) { + return + } + if len(rem) > 0 { + log.I.F("extra '%s'", rem) + } + var valid bool + if valid, err = auth.Validate(env.Event, B(ws.Challenge()), svcUrl); chk.E(err) { + if err := okenvelope.NewFrom(env.Event.ID, false, + normalize.Error.F(err.Error())).Write(ws); chk.E(err) { + return B(err.Error()) + } + return normalize.Error.F(err.Error()) + } else if !valid { + if err = okenvelope.NewFrom(env.Event.ID, false, + normalize.Error.F("failed to authenticate")).Write(ws); chk.E(err) { + return B(err.Error()) + } + return normalize.Restricted.F("auth response does not validate") + } else { + if err = okenvelope.NewFrom(env.Event.ID, true, B{}).Write(ws); chk.E(err) { + return + } + log.D.F("%s authed to pubkey,%0x", ws.RealRemote(), env.Event.PubKey) + ws.SetAuthed(S(env.Event.PubKey)) + } + } + return +} + +func (s *Server) HandleWebsocket(w http.ResponseWriter, r *http.Request) { + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + log.E.F("failed to upgrade websocket: %v", err) + return + } + s.clientsMu.Lock() + defer s.clientsMu.Unlock() + s.clients[conn] = struct{}{} + ticker := time.NewTicker(pingPeriod) + ip := conn.RemoteAddr().String() + var realIP S + if realIP = r.Header.Get("X-Forwarded-For"); realIP != "" { + ip = realIP + } else if realIP = r.Header.Get("X-Real-Ip"); realIP != "" { + ip = realIP + } + log.T.F("connected from %s", ip) + ws := challenge(conn, r, ip) + if s.options.PerConnectionLimiter != nil { + ws.SetLimiter(rate.NewLimiter(s.options.PerConnectionLimiter.Limit(), + s.options.PerConnectionLimiter.Burst())) + } + ctx, cancel := context.Cancel(context.Bg()) + sto := s.relay.Storage(ctx) + go func() { + defer func() { + cancel() + ticker.Stop() + s.clientsMu.Lock() + if _, ok := s.clients[conn]; ok { + chk.E(conn.Close()) + delete(s.clients, conn) + removeListener(ws) + } + s.clientsMu.Unlock() + }() + conn.SetReadLimit(maxMessageSize) + chk.E(conn.SetReadDeadline(time.Now().Add(pongWait))) + conn.SetPongHandler(func(S) E { + chk.E(conn.SetReadDeadline(time.Now().Add(pongWait))) + return nil + }) + if ws.AuthRequested() && len(ws.Authed()) == 0 { + log.I.F("requesting auth from client from %s", ws.RealRemote()) + if err = authenvelope.NewChallengeWith(ws.Challenge()).Write(ws); chk.E(err) { + return + } + return + } + var message B + var typ N + for { + typ, message, err = conn.ReadMessage() + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure, + websocket.CloseGoingAway, websocket.CloseNoStatusReceived, + websocket.CloseAbnormalClosure) { + log.W.F("unexpected close error from %s: %v", + r.Header.Get("X-Forwarded-For"), err) + } + break + } + if ws.Limiter() != nil { + if err := ws.Limiter().Wait(context.TODO()); chk.T(err) { + log.W.F("unexpected limiter error %v", err) + continue + } + } + if typ == websocket.PingMessage { + if err = ws.WriteMessage(websocket.PongMessage, nil); chk.E(err) { + } + continue + } + go s.handleMessage(ctx, ws, message, sto) + } + }() + go func() { + defer func() { + cancel() + ticker.Stop() + chk.E(conn.Close()) + }() + var err E + for { + select { + case <-ticker.C: + err = conn.WriteControl(websocket.PingMessage, nil, time.Now().Add(writeWait)) + if err != nil { + log.E.F("error writing ping: %v; closing websocket", err) + return + } + ws.RealRemote() + case <-ctx.Done(): + return + } + } + }() +} + +func (s *Server) handleMessage(c Ctx, ws *web.Socket, msg B, sto store.I) { + var notice B + var err E + var t S + var rem B + if t, rem, err = envelopes.Identify(msg); chk.E(err) { + notice = B(err.Error()) + } + switch t { + case eventenvelope.L: + notice = s.doEvent(c, ws, rem, sto) + case countenvelope.L: + notice = s.doCount(c, ws, rem, sto) + case reqenvelope.L: + notice = s.doReq(c, ws, rem, sto) + case closeenvelope.L: + notice = s.doClose(ws, rem) + case authenvelope.L: + notice = s.doAuth(ws, rem) + default: + if cwh, ok := s.relay.(relay.WebSocketHandler); ok { + cwh.HandleUnknownType(ws, t, rem) + } else { + notice = B(fmt.Sprintf("unknown envelope type %s\n%s", t, rem)) + } + } + if len(notice) > 0 { + log.D.F("notice %s", notice) + if err = noticeenvelope.NewFrom(notice).Write(ws); chk.E(err) { + } + } +} + func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { if r.Header.Get("Upgrade") == "websocket" { s.HandleWebsocket(w, r) @@ -119,14 +943,9 @@ func (s *Server) Start(host S, port int, started ...chan bool) E { return err } s.Addr = ln.Addr().String() - s.httpServer = &http.Server{ - Handler: cors.Default().Handler(s), - Addr: addr, - WriteTimeout: 7 * time.Second, - ReadTimeout: 7 * time.Second, - IdleTimeout: 28 * time.Second, - } - // notify caller that we're starting + s.httpServer = &http.Server{Handler: cors.Default().Handler(s), Addr: addr, + WriteTimeout: 7 * time.Second, ReadTimeout: 7 * time.Second, + IdleTimeout: 28 * time.Second} for _, startedC := range started { close(startedC) } @@ -136,11 +955,6 @@ func (s *Server) Start(host S, port int, started ...chan bool) E { return nil } -// Shutdown sends a websocket close control message to all connected clients. -// -// If the realy is ShutdownAware, Shutdown calls its OnShutdown, passing the context as is. -// Note that the HTTP server make some time to shutdown and so the context deadline, -// if any, may have been shortened by the time OnShutdown is called. func (s *Server) Shutdown() { log.I.Ln("shutting down relay") s.Cancel() @@ -148,38 +962,21 @@ func (s *Server) Shutdown() { defer s.clientsMu.Unlock() for conn := range s.clients { log.I.Ln("disconnecting", conn.RemoteAddr()) - conn.WriteControl(websocket.CloseMessage, nil, time.Now().Add(time.Second)) - conn.Close() + chk.E(conn.WriteControl(websocket.CloseMessage, nil, time.Now().Add(time.Second))) + chk.E(conn.Close()) delete(s.clients, conn) } log.W.Ln("closing event store") - s.relay.Storage(s.Ctx).Close() + chk.E(s.relay.Storage(s.Ctx).Close()) log.W.Ln("shutting down relay listener") - s.httpServer.Shutdown(s.Ctx) + chk.E(s.httpServer.Shutdown(s.Ctx)) if f, ok := s.relay.(relay.ShutdownAware); ok { f.OnShutdown(s.Ctx) } } -type Option func(*Options) - -type Options struct { - perConnectionLimiter *rate.Limiter - skipEventFunc func(*event.T) bool +func (s *Server) Router() *http.ServeMux { + return s.serveMux } -func DefaultOptions() *Options { - return &Options{} -} - -func WithPerConnectionLimiter(rps rate.Limit, burst N) Option { - return func(o *Options) { - o.perConnectionLimiter = rate.NewLimiter(rps, burst) - } -} - -func WithSkipEventFunc(skipEventFunc func(*event.T) bool) Option { - return func(o *Options) { - o.skipEventFunc = skipEventFunc - } -} +func fprintf(w io.Writer, format S, a ...any) { _, _ = fmt.Fprintf(w, format, a...) } diff --git a/realy/version.go b/realy/version.go new file mode 100644 index 0000000..5f66d33 --- /dev/null +++ b/realy/version.go @@ -0,0 +1,6 @@ +package realy + +import _ "embed" + +//go:embed version +var version S diff --git a/relay/wrapper/relay_interface.go b/relay/wrapper/relay_interface.go index 2409ad3..78da863 100644 --- a/relay/wrapper/relay_interface.go +++ b/relay/wrapper/relay_interface.go @@ -21,13 +21,13 @@ type RelayInterface interface { QuerySync(c Ctx, f *filter.T, opts ...ws.SubscriptionOption) ([]*event.T, E) } -type RelayWrapper struct { +type Relay struct { store.I } -var _ RelayInterface = (*RelayWrapper)(nil) +var _ RelayInterface = (*Relay)(nil) -func (w RelayWrapper) Publish(c Ctx, evt *event.T) (err E) { +func (w Relay) Publish(c Ctx, evt *event.T) (err E) { if evt.Kind.IsEphemeral() { // do not store ephemeral events return nil @@ -127,7 +127,7 @@ func (w RelayWrapper) Publish(c Ctx, evt *event.T) (err E) { return } -func (w RelayWrapper) QuerySync(c Ctx, f *filter.T, +func (w Relay) QuerySync(c Ctx, f *filter.T, opts ...ws.SubscriptionOption) ([]*event.T, E) { evs, err := w.I.QueryEvents(c, f)