implement cluster replication

todo: need to add chain of senders in a header to a header to prevent unnecessary sends
This commit is contained in:
2025-07-31 15:55:07 +01:00
parent 6935575654
commit c62d685fa4
18 changed files with 312 additions and 109 deletions

View File

@@ -190,6 +190,5 @@ func Post(f string, ur *url.URL, sign signer.I) (err error) {
if io.Copy(os.Stdout, res.Body); chk.E(err) {
return
}
fmt.Println()
return
}

View File

@@ -42,8 +42,7 @@ type C struct {
Private bool `env:"ORLY_PRIVATE" usage:"do not spider for user metadata because the relay is private and this would leak relay memberships" default:"false"`
Whitelist []string `env:"ORLY_WHITELIST" usage:"only allow connections from this list of IP addresses"`
RelaySecret string `env:"ORLY_SECRET_KEY" usage:"secret key for relay cluster replication authentication"`
PeerRelayURLs []string `env:"ORLY_PEER_RELAY_URLS" usage:"list of peer relays URLs that new events are pushed to"`
PeerRelayKeys []string `env:"ORLY_PEER_RELAY_KEYS" usage:"list of peer relay public keys that have full read/write privilege"`
PeerRelays []string `env:"ORLY_PEER_RELAYS" usage:"list of peer relays URLs that new events are pushed to in format <pubkey>|<url>"`
}
// New creates and initializes a new configuration object for the relay

View File

@@ -1,8 +1,17 @@
package relay
import (
"bytes"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"orly.dev/pkg/crypto/ec/secp256k1"
"orly.dev/pkg/protocol/httpauth"
"orly.dev/pkg/utils/chk"
"orly.dev/pkg/utils/log"
realy_lol "orly.dev/pkg/version"
"regexp"
"strings"
@@ -17,6 +26,21 @@ var (
NIP20prefixmatcher = regexp.MustCompile(`^\w+: `)
)
var userAgent = fmt.Sprintf("orly/%s", realy_lol.V)
type WriteCloser struct {
*bytes.Buffer
}
func (w *WriteCloser) Close() error {
w.Buffer.Reset()
return nil
}
func NewWriteCloser(w []byte) *WriteCloser {
return &WriteCloser{bytes.NewBuffer(w)}
}
// AddEvent processes an incoming event, saves it if valid, and delivers it to
// subscribers.
//
@@ -55,6 +79,7 @@ var (
// relevant message.
func (s *Server) AddEvent(
c context.T, rl relay.I, ev *event.E, hr *http.Request, origin string,
pubkey []byte,
) (accepted bool, message []byte) {
if ev == nil {
@@ -85,6 +110,62 @@ func (s *Server) AddEvent(
}
// notify subscribers
s.listeners.Deliver(ev)
// push the new event to replicas if replicas are configured, and the relay
// has an identity key.
//
// TODO: add the chain of pubkeys of replicas that send and were received from replicas sending so they can
// be skipped for large (5+) clusters.
var err error
if len(s.Peers.Addresses) > 0 &&
len(s.Peers.I.Sec()) == secp256k1.SecKeyBytesLen {
evb := ev.Marshal(nil)
var payload io.ReadCloser
payload = NewWriteCloser(evb)
for i, a := range s.Peers.Addresses {
// the peer address index is the same as the list of pubkeys
// (they're unpacked from a string containing both, appended at the
// same time), so if the pubkey the http event endpoint sent us here
// matches the index of this address, we can skip it.
if bytes.Equal(s.Peers.Pubkeys[i], pubkey) {
log.I.F(
"not sending back to replica that just sent us this event %0x",
ev.ID,
)
continue
}
var ur *url.URL
if ur, err = url.Parse(a + "/api/event"); chk.E(err) {
continue
}
var r *http.Request
r = &http.Request{
Method: "POST",
URL: ur,
Proto: "HTTP/1.1",
ProtoMajor: 1,
ProtoMinor: 1,
Header: make(http.Header),
Body: payload,
ContentLength: int64(len(evb)),
Host: ur.Host,
}
r.Header.Add("User-Agent", userAgent)
if err = httpauth.AddNIP98Header(
r, ur, "POST", "", s.Peers.I, 0,
); chk.E(err) {
continue
}
r.GetBody = func() (rc io.ReadCloser, err error) {
rc = payload
return
}
client := &http.Client{}
if _, err = client.Do(r); chk.E(err) {
continue
}
log.I.F("event pushed to replica %s", ur.String())
}
}
accepted = true
return
}

66
pkg/app/relay/peers.go Normal file
View File

@@ -0,0 +1,66 @@
package relay
import (
"orly.dev/pkg/crypto/p256k"
"orly.dev/pkg/encoders/bech32encoding"
"orly.dev/pkg/interfaces/signer"
"orly.dev/pkg/utils/chk"
"orly.dev/pkg/utils/keys"
"orly.dev/pkg/utils/log"
"strings"
)
// Peers is a structure that keeps the information required when peer
// replication is enabled.
//
// - Addresses are the relay addresses that will be pushed new events when
// accepted. From ORLY_PEER_RELAYS first field after the |.
//
// - Pubkeys are the relay peer public keys that we will send any event to
// including privileged type. From ORLY_PEER_RELAYS before the |.
//
// - I - the signer of this relay, generated from the nsec in
// ORLY_SECRET_KEY.
type Peers struct {
Addresses []string
Pubkeys [][]byte
signer.I
}
// Init accepts the lists which will come from config.C for peer relay settings
// and populate the Peers with this data after decoding it.
func (p *Peers) Init(
addresses []string, sec string,
) (err error) {
for _, address := range addresses {
split := strings.Split(address, "@")
if len(split) != 2 {
log.E.F("invalid peer address: %s", address)
continue
}
p.Addresses = append(p.Addresses, split[1])
var pk []byte
if pk, err = keys.DecodeNpubOrHex(split[0]); chk.D(err) {
continue
}
p.Pubkeys = append(p.Pubkeys, pk)
log.I.F("peer %s added; pubkey: %0x", split[1], pk)
}
p.I = &p256k.Signer{}
var s []byte
if s, err = keys.DecodeNsecOrHex(sec); chk.E(err) {
return
}
if err = p.I.InitSec(s); chk.E(err) {
return
}
var npub []byte
if npub, err = bech32encoding.BinToNpub(p.I.Pub()); chk.E(err) {
return
}
log.I.F(
"relay peer initialized, relay's npub: %s",
npub,
)
return
}

View File

@@ -39,6 +39,7 @@ type Server struct {
listeners *publish.S
*config.C
*Lists
*Peers
Mux *servemux.S
}
@@ -102,7 +103,11 @@ func NewServer(
options: op,
C: sp.C,
Lists: new(Lists),
Peers: new(Peers),
}
chk.E(
s.Peers.Init(sp.C.PeerRelays, sp.C.RelaySecret),
)
s.listeners = publish.New(socketapi.New(s), openapi.NewPublisher(s))
go func() {
if err := s.relay.Init(); chk.E(err) {

View File

@@ -108,9 +108,9 @@ func (s *Server) SpiderFetch(
if !noFetch && len(s.C.SpiderSeeds) > 0 {
// we need to search the spider seeds.
// Break up pubkeys into batches of 32
for i := 0; i < len(pubkeys); i += 32 {
end := i + 32
// Break up pubkeys into batches of 128
for i := 0; i < len(pubkeys); i += 128 {
end := i + 128
if end > len(pubkeys) {
end = len(pubkeys)
}

View File

@@ -2,41 +2,19 @@ package relay
import (
"bytes"
"orly.dev/pkg/crypto/ec/bech32"
"orly.dev/pkg/encoders/bech32encoding"
"orly.dev/pkg/encoders/hex"
"orly.dev/pkg/encoders/kind"
"orly.dev/pkg/encoders/kinds"
"orly.dev/pkg/utils/chk"
"orly.dev/pkg/utils/keys"
"orly.dev/pkg/utils/log"
)
func (s *Server) Spider(noFetch ...bool) (err error) {
var ownersPubkeys [][]byte
for _, v := range s.C.Owners {
var prf []byte
var pk []byte
var bits5 []byte
if prf, bits5, err = bech32.DecodeNoLimit([]byte(v)); chk.D(err) {
// try hex then
if _, err = hex.DecBytes(pk, []byte(v)); chk.E(err) {
log.W.F(
"owner key %s is neither bech32 npub nor hex",
v,
)
continue
}
} else {
if !bytes.Equal(prf, bech32encoding.NpubHRP) {
log.W.F(
"owner key %s is neither bech32 npub nor hex",
v,
)
continue
}
if pk, err = bech32.ConvertBits(bits5, 5, 8, false); chk.E(err) {
continue
}
if pk, err = keys.DecodeNpubOrHex(v); chk.E(err) {
continue
}
// owners themselves are on the OwnersFollowed list as first level
ownersPubkeys = append(ownersPubkeys, pk)

View File

@@ -10,9 +10,8 @@ import (
)
func (s *Server) UserAuth(
r *http.Request, remote string,
tolerance ...time.Duration,
) (authed bool, pubkey []byte) {
r *http.Request, remote string, tolerance ...time.Duration,
) (authed bool, pubkey []byte, super bool) {
var valid bool
var err error
var tolerate time.Duration
@@ -35,5 +34,17 @@ func (s *Server) UserAuth(
return
}
}
// if the client is one of the relay cluster replicas, also set the super
// flag to indicate that privilege checks can be bypassed.
if len(s.Peers.Pubkeys) > 0 {
for _, pk := range s.Peers.Pubkeys {
if bytes.Equal(pk, pubkey) {
authed = true
super = true
pubkey = pk
return
}
}
}
return
}

View File

@@ -23,13 +23,14 @@ type I interface {
) (allowed *filters.T, accept bool, modified bool)
AddEvent(
c context.T, rl relay.I, ev *event.E, hr *http.Request, origin string,
pubkey []byte,
) (accepted bool, message []byte)
AdminAuth(
r *http.Request, remote string, tolerance ...time.Duration,
) (authed bool, pubkey []byte)
UserAuth(
r *http.Request, remote string, tolerance ...time.Duration,
) (authed bool, pubkey []byte)
) (authed bool, pubkey []byte, super bool)
Context() context.T
Publisher() *publish.S
Publish(c context.T, evt *event.E) (err error)

View File

@@ -20,8 +20,8 @@ const (
NIP98Prefix = "Nostr"
)
// MakeNIP98Event creates a new NIP-98 event. If expiry is given, method is ignored, otherwise
// either option is the same.
// MakeNIP98Event creates a new NIP-98 event. If expiry is given, method is
// ignored; otherwise either option is the same.
func MakeNIP98Event(u, method, hash string, expiry int64) (ev *event.E) {
var t []*tag.T
t = append(t, tag.New("u", u))
@@ -47,18 +47,30 @@ func MakeNIP98Event(u, method, hash string, expiry int64) (ev *event.E) {
return
}
func CreateNIP98Blob(
ur, method, hash string, expiry int64, sign signer.I,
) (blob string, err error) {
ev := MakeNIP98Event(ur, method, hash, expiry)
if err = ev.Sign(sign); chk.E(err) {
return
}
log.T.F("nip-98 http auth event:\n%s\n", ev.SerializeIndented())
blob = base64.URLEncoding.EncodeToString(ev.Serialize())
return
}
// AddNIP98Header creates a NIP-98 http auth event and adds the standard header to a provided
// http.Request.
func AddNIP98Header(
r *http.Request, ur *url.URL, method, hash string,
sign signer.I, expiry int64,
) (err error) {
ev := MakeNIP98Event(ur.String(), method, hash, expiry)
if err = ev.Sign(sign); chk.E(err) {
var b64 string
if b64, err = CreateNIP98Blob(
ur.String(), method, hash, expiry, sign,
); chk.E(err) {
return
}
log.T.F("nip-98 http auth event:\n%s\n", ev.SerializeIndented())
b64 := base64.URLEncoding.EncodeToString(ev.Serialize())
r.Header.Add(HeaderKey, "Nostr "+b64)
return
}

View File

@@ -5,30 +5,5 @@ import (
)
func TestMakeNIP98Request_ValidateNIP98Request(t *testing.T) {
// lol.SetLogLevel("trace")
// sign := new(p256k.Signer)
// err := sign.Generate()
// if chk.E(err) {
// t.Fatal(err)
// }
// // var ur *url.URL
// // if ur, err = url.Parse("https://example.com/getnpubs?a=b&c=d"); chk.E(err) {
// // t.Fatal(err)
// // }
// var r *http.Request
// // if r, err = MakeNIP98GetRequest(ur, "test/0.0.0", sign); chk.E(err) {
// // t.Fatal(err)
// // }
// var pk []byte
// var valid bool
// if valid, pk, err = CheckAuth(r, nil); chk.E(err) {
// t.Fatal(err)
// }
// if !valid {
// t.Fatal("request event signature not valid")
// }
// if !bytes.Equal(pk, sign.Pub()) {
// t.Fatalf("unexpected pubkey in nip-98 http auth event: %0x expected %0x",
// pk, sign.Pub())
// }
}

View File

@@ -19,9 +19,10 @@ var ErrMissingKey = fmt.Errorf(
"'%s' key missing from request header", HeaderKey,
)
// CheckAuth verifies a received http.Request has got a valid authentication event in it, withan
// optional specification for tolerance of before and after, and provides the public key that
// should be verified to be authorized to access the resource associated with the request.
// CheckAuth verifies a received http.Request has got a valid authentication
// event in it, with an optional specification for tolerance of before and
// after, and provides the public key that should be verified to be authorized
// to access the resource associated with the request.
func CheckAuth(r *http.Request, tolerance ...time.Duration) (
valid bool,
pubkey []byte, err error,
@@ -70,7 +71,7 @@ func CheckAuth(r *http.Request, tolerance ...time.Duration) (
err = errorf.E("rem", rem)
return
}
log.T.F("received http auth event:\n%s\n", ev.SerializeIndented())
// log.T.F("received http auth event:\n%s\n", ev.SerializeIndented())
// The kind MUST be 27235.
if !ev.Kind.Equal(kind.HTTPAuth) {
err = errorf.E(
@@ -80,7 +81,8 @@ func CheckAuth(r *http.Request, tolerance ...time.Duration) (
)
return
}
// if there is an expiration timestamp it supersedes the created_at for validity.
// if there is an expiration timestamp, check it supersedes the
// created_at for validity.
exp := ev.Tags.GetAll(tag.New("expiration"))
if exp.Len() > 1 {
err = errorf.E(
@@ -106,8 +108,8 @@ func CheckAuth(r *http.Request, tolerance ...time.Duration) (
}
expiring = true
} else {
// The created_at timestamp MUST be within a reasonable time window (suggestion 60
// seconds)
// The created_at timestamp MUST be within a reasonable time window
// (suggestion 60 seconds)
ts := ev.CreatedAt.I64()
tn := time.Now().Unix()
if ts < tn-tolerate || ts > tn+tolerate {
@@ -126,10 +128,11 @@ func CheckAuth(r *http.Request, tolerance ...time.Duration) (
return
}
uts := ut.ToSliceOfTags()
// The u tag MUST be exactly the same as the absolute request URL (including query
// parameters).
// The u tag MUST be exactly the same as the absolute request URL
// (including query parameters).
proto := r.URL.Scheme
// if this came through a proxy we need to get the protocol to match the event
// if this came through a proxy, we need to get the protocol to match
// the event
if p := r.Header.Get("X-Forwarded-Proto"); p != "" {
proto = p
}
@@ -138,11 +141,10 @@ func CheckAuth(r *http.Request, tolerance ...time.Duration) (
}
fullUrl := proto + "://" + r.Host + r.URL.RequestURI()
evUrl := string(uts[0].Value())
// log.I.S(r)
log.T.F("full URL: %s event u tag value: %s", fullUrl, evUrl)
if expiring {
// if it is expiring, the URL only needs to be the same prefix to allow its use with
// multiple endpoints.
// if it is expiring, the URL only needs to be the same prefix to
// allow its use with multiple endpoints.
if !strings.HasPrefix(fullUrl, evUrl) {
err = errorf.E(
"request URL %s is not prefixed with the u tag URL %s",
@@ -158,7 +160,8 @@ func CheckAuth(r *http.Request, tolerance ...time.Duration) (
return
}
if !expiring {
// The method tag MUST be the same HTTP method used for the requested resource.
// The method tag MUST be the same HTTP method used for the
// requested resource.
mt := ev.Tags.GetAll(tag.New("method"))
if mt.Len() != 1 {
err = errorf.E(

View File

@@ -98,14 +98,12 @@ func (x *Operations) RegisterEvent(api huma.API) {
) {
r := ctx.Value("http-request").(*http.Request)
remote := helpers.GetRemoteFromReq(r)
log.T.F(
"%s %s %s", r.URL.String(),
remote, input.Body,
)
var authed bool
var authed, super bool
var pubkey []byte
if x.I.AuthRequired() {
authed, pubkey = x.UserAuth(r, remote)
authed, pubkey, super = x.UserAuth(r, remote)
if !authed {
err = huma.Error401Unauthorized("Not Authorized")
return
@@ -118,6 +116,14 @@ func (x *Operations) RegisterEvent(api huma.API) {
)
return
}
log.T.C(
func() string {
return fmt.Sprintf(
"%s %s %s", r.URL.String(),
remote, ev.Marshal(nil),
)
},
)
// these aliases make it so most of the following code can be copied
// verbatim from its counterpart in socketapi.HandleEvent, with the
// aid of a different implementation of the openapi.OK type.
@@ -156,7 +162,7 @@ func (x *Operations) RegisterEvent(api huma.API) {
}
// check that relay policy allows this event
accept, notice, _ := x.I.AcceptEvent(c, env, r, pubkey, remote)
if !accept {
if !accept && !super {
if err = Ok.Blocked(
a, env, notice,
); chk.E(err) {
@@ -166,7 +172,9 @@ func (x *Operations) RegisterEvent(api huma.API) {
}
// check for protected tag (NIP-70)
protectedTag := ev.Tags.GetFirst(tag.New("-"))
if protectedTag != nil && a.AuthRequired() {
// if the super flag was set protected is ignored because the relay
// cluster replicas must replicate this event (and all events).
if protectedTag != nil && a.AuthRequired() && !super {
// check that the pubkey of the event matches the authed pubkey
if !bytes.Equal(pubkey, ev.Pubkey) {
if err = Ok.Blocked(
@@ -212,13 +220,13 @@ func (x *Operations) RegisterEvent(api huma.API) {
return
}
// If we found the referenced event, check if the author
// matches
// If we found the referenced event, check if the
// author matches
if len(referencedEvents) > 0 {
referencedEvent := referencedEvents[0]
// Check if the author of the deletion event matches the
// author of the referenced event
// Check if the author of the deletion event
// matches the author of the referenced event
if !bytes.Equal(
referencedEvent.Pubkey, env.Pubkey,
) {
@@ -242,8 +250,8 @@ func (x *Operations) RegisterEvent(api huma.API) {
return
}
// Use DeleteEvent to actually delete the referenced
// event
// Use DeleteEvent to actually delete the
// referenced event
if err = sto.DeleteEvent(c, eid); chk.E(err) {
if err = Ok.Error(
a, env,
@@ -404,8 +412,8 @@ func (x *Operations) RegisterEvent(api huma.API) {
}
// Check if this event has been deleted before
if ev.Kind.K != kind.Deletion.K {
// Create a filter to check for deletion events that reference this
// event ID
// Create a filter to check for deletion events that
// reference this event ID
f := filter.New()
f.Kinds.K = []*kind.T{kind.Deletion}
f.Tags.AppendTags(tag.New([]byte{'e'}, ev.ID))
@@ -426,11 +434,9 @@ func (x *Operations) RegisterEvent(api huma.API) {
}
}
var reason []byte
ok, reason = x.I.AddEvent(
c, x.Relay(), ev, r, remote,
)
log.I.F("event %0x added %v %s", ev.ID, ok, reason)
if !ok {
ok, reason = x.I.AddEvent(c, x.Relay(), ev, r, remote, pubkey)
log.I.F("http API event %0x added %v %s", ev.ID, ok, reason)
if !ok && err != nil {
if err = Ok.Error(
a, env, err.Error(),
); chk.E(err) {

View File

@@ -610,12 +610,12 @@ Returns events as a JSON array of event objects.`
) {
r := ctx.Value("http-request").(*http.Request)
remote := helpers.GetRemoteFromReq(r)
var authed bool
var authed, super bool
var pubkey []byte
// if auth is required and not public readable, the request is not
// authorized.
if x.I.AuthRequired() && !x.I.PublicReadable() {
authed, pubkey = x.UserAuth(r, remote)
authed, pubkey, super = x.UserAuth(r, remote)
if !authed {
err = huma.Error401Unauthorized("Not Authorized")
return
@@ -652,7 +652,8 @@ Returns events as a JSON array of event objects.`
continue
}
// filter events the authed pubkey is not privileged to fetch.
if x.AuthRequired() && len(pubkey) > 0 {
// relay replicas don't have this limitation.
if x.AuthRequired() && len(pubkey) > 0 && !super {
var tmp event.S
for _, ev := range events {
if !auth.CheckPrivilege(pubkey, ev) {

View File

@@ -55,7 +55,7 @@ Many browsers have a limited number of SSE channels that can be open at once, so
var authed bool
var pubkey []byte
if x.I.AuthRequired() && !x.I.PublicReadable() {
authed, pubkey = x.UserAuth(r, remote)
authed, pubkey, _ = x.UserAuth(r, remote)
if !authed {
err = huma.Error401Unauthorized("Not Authorized")
return

View File

@@ -54,6 +54,7 @@ func (m *mockServer) AcceptReq(
func (m *mockServer) AddEvent(
c ctx.T, rl relay.I, ev *event.E, hr *http.Request, origin string,
pubkey []byte,
) (accepted bool, message []byte) {
return true, nil
}
@@ -66,8 +67,8 @@ func (m *mockServer) AdminAuth(
func (m *mockServer) UserAuth(
r *http.Request, remote string, tolerance ...time.Duration,
) (authed bool, pubkey []byte) {
return false, nil
) (authed bool, pubkey []byte, super bool) {
return false, nil, super
}
func (m *mockServer) Publish(c ctx.T, evt *event.E) (err error) {

View File

@@ -391,7 +391,7 @@ func (a *A) HandleEvent(
}
}
var reason []byte
ok, reason = srv.AddEvent(c, rl, env.E, a.Req(), a.RealRemote())
ok, reason = srv.AddEvent(c, rl, env.E, a.Req(), a.RealRemote(), nil)
log.I.F("event %0x added %v %s", env.E.ID, ok, reason)
if err = okenvelope.NewFrom(env.E.ID, ok).Write(a.Listener); chk.E(err) {
return

65
pkg/utils/keys/keys.go Normal file
View File

@@ -0,0 +1,65 @@
package keys
import (
"bytes"
"orly.dev/pkg/crypto/ec/bech32"
"orly.dev/pkg/encoders/bech32encoding"
"orly.dev/pkg/encoders/hex"
"orly.dev/pkg/utils/chk"
"orly.dev/pkg/utils/log"
)
func DecodeNpubOrHex(v string) (pk []byte, err error) {
var prf []byte
var bits5 []byte
if prf, bits5, err = bech32.DecodeNoLimit([]byte(v)); chk.D(err) {
// try hex then
if _, err = hex.DecBytes(pk, []byte(v)); chk.E(err) {
log.W.F(
"owner key %s is neither bech32 npub nor hex",
v,
)
return
}
// it was hex, return
return
}
if !bytes.Equal(prf, bech32encoding.NpubHRP) {
log.W.F(
"owner key %s is neither bech32 npub nor hex",
v,
)
return
}
if pk, err = bech32.ConvertBits(bits5, 5, 8, false); chk.E(err) {
return
}
return
}
func DecodeNsecOrHex(v string) (sk []byte, err error) {
var prf []byte
var bits5 []byte
if prf, bits5, err = bech32.DecodeNoLimit([]byte(v)); chk.D(err) {
// try hex then
if _, err = hex.DecBytes(sk, []byte(v)); chk.E(err) {
log.W.F(
"owner key %s is neither bech32 nsec nor hex",
v,
)
return
}
return
}
if !bytes.Equal(prf, bech32encoding.NsecHRP) {
log.W.F(
"owner key %s is neither bech32 nsec nor hex",
v,
)
return
}
if sk, err = bech32.ConvertBits(bits5, 5, 8, false); chk.E(err) {
return
}
return
}