starting with database stuff

This commit is contained in:
2025-07-04 13:11:57 +01:00
parent e28b163966
commit 972cc7dbee
25 changed files with 1362 additions and 43 deletions

41
.idea/workspace.xml generated
View File

@@ -4,13 +4,32 @@
<option name="autoReloadType" value="ALL" />
</component>
<component name="ChangeListManager">
<list default="true" id="848aca24-a3ec-4e50-a3d5-7b132d168000" name="Changes" comment="add HTTP handler to socketapi and refactor logging configuration">
<change beforePath="$PROJECT_DIR$/.idea/dictionaries/project.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/dictionaries/project.xml" afterDir="false" />
<list default="true" id="848aca24-a3ec-4e50-a3d5-7b132d168000" name="Changes" comment="enhance socketapi with HTTP handler, update project name, and refine logging">
<change afterPath="$PROJECT_DIR$/database/database.go" afterDir="false" />
<change afterPath="$PROJECT_DIR$/database/logger.go" afterDir="false" />
<change afterPath="$PROJECT_DIR$/interfaces/listener/listener.go" afterDir="false" />
<change afterPath="$PROJECT_DIR$/interfaces/typer/typer.go" afterDir="false" />
<change afterPath="$PROJECT_DIR$/reason/reason.go" afterDir="false" />
<change afterPath="$PROJECT_DIR$/server/add-event.go" afterDir="false" />
<change afterPath="$PROJECT_DIR$/socketapi/handleEvent.go" afterDir="false" />
<change afterPath="$PROJECT_DIR$/socketapi/handleMessage.go" afterDir="false" />
<change afterPath="$PROJECT_DIR$/socketapi/ok.go" afterDir="false" />
<change afterPath="$PROJECT_DIR$/socketapi/publisher.go" afterDir="false" />
<change afterPath="$PROJECT_DIR$/ws/connection.go" afterDir="false" />
<change afterPath="$PROJECT_DIR$/ws/listener.go" afterDir="false" />
<change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/servemux/serveMux.go" beforeDir="false" afterPath="$PROJECT_DIR$/servemux/serveMux.go" afterDir="false" />
<change beforePath="$PROJECT_DIR$/go.mod" beforeDir="false" afterPath="$PROJECT_DIR$/go.mod" afterDir="false" />
<change beforePath="$PROJECT_DIR$/go.sum" beforeDir="false" afterPath="$PROJECT_DIR$/go.sum" afterDir="false" />
<change beforePath="$PROJECT_DIR$/helpers/helpers.go" beforeDir="false" afterPath="$PROJECT_DIR$/helpers/helpers.go" afterDir="false" />
<change beforePath="$PROJECT_DIR$/interfaces/server/server.go" beforeDir="false" afterPath="$PROJECT_DIR$/interfaces/server/server.go" afterDir="false" />
<change beforePath="$PROJECT_DIR$/main.go" beforeDir="false" afterPath="$PROJECT_DIR$/main.go" afterDir="false" />
<change beforePath="$PROJECT_DIR$/publish/publisher.go" beforeDir="false" afterPath="$PROJECT_DIR$/publish/publisher.go" afterDir="false" />
<change beforePath="$PROJECT_DIR$/publish/publisher/interface.go" beforeDir="false" afterPath="$PROJECT_DIR$/interfaces/publisher/publisher.go" afterDir="false" />
<change beforePath="$PROJECT_DIR$/server/server.go" beforeDir="false" afterPath="$PROJECT_DIR$/server/server.go" afterDir="false" />
<change beforePath="$PROJECT_DIR$/socketapi/socketapi.go" beforeDir="false" afterPath="$PROJECT_DIR$/socketapi/socketapi.go" afterDir="false" />
<change beforePath="$PROJECT_DIR$/version/version.go" beforeDir="false" afterPath="$PROJECT_DIR$/version/version.go" afterDir="false" />
<change beforePath="$PROJECT_DIR$/store/alias.go" beforeDir="false" afterPath="$PROJECT_DIR$/interfaces/store/alias.go" afterDir="false" />
<change beforePath="$PROJECT_DIR$/store/errors.go" beforeDir="false" afterPath="$PROJECT_DIR$/interfaces/store/errors.go" afterDir="false" />
<change beforePath="$PROJECT_DIR$/store/store_interface.go" beforeDir="false" afterPath="$PROJECT_DIR$/interfaces/store/store_interface.go" afterDir="false" />
</list>
<option name="SHOW_DIALOG" value="false" />
<option name="HIGHLIGHT_CONFLICTS" value="true" />
@@ -76,7 +95,7 @@
"go.import.settings.migrated": "true",
"go.sdk.automatically.set": "true",
"junie.onboarding.icon.badge.shown": "true",
"last_opened_file_path": "/home/david/src/not.realy.lol",
"last_opened_file_path": "/home/david/src/not.realy.lol/database",
"node.js.detected.package.eslint": "true",
"node.js.selected.package.eslint": "(autodetect)",
"nodejs_package_manager_path": "npm",
@@ -84,8 +103,17 @@
}
}]]></component>
<component name="RecentsManager">
<key name="GoMoveDeclarationDialog.RECENT_KEY">
<recent name="$PROJECT_DIR$/server" />
</key>
<key name="CopyFile.RECENT_KEYS">
<recent name="$PROJECT_DIR$/database" />
<recent name="$PROJECT_DIR$" />
<recent name="$PROJECT_DIR$/socketapi" />
<recent name="$PROJECT_DIR$/interfaces" />
</key>
<key name="MoveFile.RECENT_KEYS">
<recent name="$PROJECT_DIR$/interfaces" />
</key>
</component>
<component name="RunManager" selected="Go Build.go build not.realy.lol">
@@ -156,7 +184,8 @@
<MESSAGE value="getting started on the server" />
<MESSAGE value="implement relayinfo" />
<MESSAGE value="add HTTP handler to socketapi and refactor logging configuration" />
<option name="LAST_COMMIT_MESSAGE" value="add HTTP handler to socketapi and refactor logging configuration" />
<MESSAGE value="enhance socketapi with HTTP handler, update project name, and refine logging" />
<option name="LAST_COMMIT_MESSAGE" value="enhance socketapi with HTTP handler, update project name, and refine logging" />
<option name="NON_MODAL_COMMIT_POSTPONE_SLOW_CHECKS" value="false" />
</component>
<component name="VgoProject">

110
database/database.go Normal file
View File

@@ -0,0 +1,110 @@
package database
import (
"github.com/dgraph-io/badger/v4"
"io"
"not.realy.lol/chk"
"not.realy.lol/context"
"not.realy.lol/event"
"not.realy.lol/eventid"
"not.realy.lol/eventidserial"
"not.realy.lol/filter"
"not.realy.lol/interfaces/store"
"not.realy.lol/log"
"not.realy.lol/lol"
"not.realy.lol/units"
)
type D struct {
ctx context.T
cancel context.F
dataDir string
Logger *logger
*badger.DB
seq *badger.Sequence
}
func New(ctx context.T, cancel context.F, dataDir, logLevel string) (
d *D, err error,
) {
d = &D{
ctx: ctx,
cancel: cancel,
dataDir: dataDir,
Logger: NewLogger(lol.GetLogLevel(logLevel), dataDir),
DB: nil,
seq: nil,
}
opts := badger.DefaultOptions(d.dataDir)
opts.BlockCacheSize = int64(units.Gb)
opts.BlockSize = units.Gb
opts.CompactL0OnClose = true
opts.LmaxCompaction = true
if d.DB, err = badger.Open(opts); chk.E(err) {
return
}
log.I.Ln("getting event sequence lease", d.dataDir)
if d.seq, err = d.DB.GetSequence([]byte("EVENTS"), 1000); chk.E(err) {
return
}
go func() {
<-d.ctx.Done()
d.cancel()
d.seq.Release()
d.DB.Close()
}()
return
}
// Path returns the path where the database files are stored.
func (d *D) Path() string { return d.dataDir }
func (d *D) Wipe() (err error) {
// TODO implement me
panic("implement me")
}
func (d *D) QueryEvents(c context.T, f *filter.T) (evs event.S, err error) {
// TODO implement me
panic("implement me")
}
func (d *D) QueryForIds(c context.T, f *filter.T) (
evs []store.IdTsPk, err error,
) {
// TODO implement me
panic("implement me")
}
func (d *D) DeleteEvent(
c context.T, ev *eventid.T, noTombstone ...bool,
) (err error) {
// TODO implement me
panic("implement me")
}
func (d *D) SaveEvent(c context.T, ev *event.E) (err error) {
// TODO implement me
panic("implement me")
}
func (d *D) Import(r io.Reader) {
// TODO implement me
panic("implement me")
}
func (d *D) Export(c context.T, w io.Writer, pubkeys ...[]byte) {
// TODO implement me
panic("implement me")
}
func (d *D) SetLogLevel(level string) {
d.Logger.SetLogLevel(lol.GetLogLevel(level))
}
func (d *D) EventIdsBySerial(start uint64, count int) (
evs []eventidserial.E, err error,
) {
// TODO implement me
panic("implement me")
}

69
database/logger.go Normal file
View File

@@ -0,0 +1,69 @@
package database
import (
"fmt"
"runtime"
"strings"
"go.uber.org/atomic"
"not.realy.lol/log"
"not.realy.lol/lol"
)
// NewLogger creates a new badger logger.
func NewLogger(logLevel int, label string) (l *logger) {
log.T.Ln("getting logger for", label)
l = &logger{Label: label}
l.Level.Store(int32(logLevel))
return
}
type logger struct {
Level atomic.Int32
Label string
}
// SetLogLevel atomically adjusts the log level to the given log level code.
func (l *logger) SetLogLevel(level int) {
l.Level.Store(int32(level))
}
// Errorf is a log printer for this level of message.
func (l *logger) Errorf(s string, i ...interface{}) {
if l.Level.Load() >= lol.Error {
s = l.Label + ": " + s
txt := fmt.Sprintf(s, i...)
_, file, line, _ := runtime.Caller(2)
log.E.F("%s\n%s:%d", strings.TrimSpace(txt), file, line)
}
}
// Warningf is a log printer for this level of message.
func (l *logger) Warningf(s string, i ...interface{}) {
if l.Level.Load() >= lol.Warn {
s = l.Label + ": " + s
txt := fmt.Sprintf(s, i...)
_, file, line, _ := runtime.Caller(2)
log.W.F("%s\n%s:%d", strings.TrimSpace(txt), file, line)
}
}
// Infof is a log printer for this level of message.
func (l *logger) Infof(s string, i ...interface{}) {
if l.Level.Load() >= lol.Info {
s = l.Label + ": " + s
txt := fmt.Sprintf(s, i...)
_, file, line, _ := runtime.Caller(2)
log.I.F("%s\n%s:%d", strings.TrimSpace(txt), file, line)
}
}
// Debugf is a log printer for this level of message.
func (l *logger) Debugf(s string, i ...interface{}) {
if l.Level.Load() >= lol.Debug {
s = l.Label + ": " + s
txt := fmt.Sprintf(s, i...)
_, file, line, _ := runtime.Caller(2)
log.D.F("%s\n%s:%d", strings.TrimSpace(txt), file, line)
}
}

22
go.mod
View File

@@ -6,7 +6,11 @@ require (
github.com/adrg/xdg v0.5.3
github.com/danielgtaylor/huma/v2 v2.34.1
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc
github.com/dgraph-io/badger/v4 v4.7.0
github.com/fasthttp/websocket v1.5.12
github.com/fatih/color v1.18.0
github.com/gobwas/httphead v0.1.0
github.com/gobwas/ws v1.2.1
github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0
github.com/minio/sha256-simd v1.0.1
github.com/pkg/profile v1.7.0
@@ -20,13 +24,31 @@ require (
)
require (
github.com/andybalholm/brotli v1.1.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/dgraph-io/ristretto/v2 v2.2.0 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/felixge/fgprof v0.9.5 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/gobwas/pool v0.2.1 // indirect
github.com/google/flatbuffers v25.2.10+incompatible // indirect
github.com/google/pprof v0.0.0-20250630185457-6e76a2b096b5 // indirect
github.com/klauspost/compress v1.18.0 // indirect
github.com/klauspost/cpuid/v2 v2.2.11 // indirect
github.com/mattn/go-colorable v0.1.14 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/savsgio/gotils v0.0.0-20240704082632-aef3928b8a38 // indirect
github.com/templexxx/cpu v0.1.1 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasthttp v1.62.0 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/otel v1.35.0 // indirect
go.opentelemetry.io/otel/metric v1.35.0 // indirect
go.opentelemetry.io/otel/trace v1.35.0 // indirect
golang.org/x/net v0.40.0 // indirect
golang.org/x/sys v0.33.0 // indirect
google.golang.org/protobuf v1.36.6 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

57
go.sum
View File

@@ -1,5 +1,9 @@
github.com/adrg/xdg v0.5.3 h1:xRnxJXne7+oWDatRhR1JLnvuccuIeCoBu2rtuLqQB78=
github.com/adrg/xdg v0.5.3/go.mod h1:nlTsY+NNiCBGCK2tpm09vRqfVzrc2fLmXGpBLF0zlTQ=
github.com/andybalholm/brotli v1.1.1 h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7XdTA=
github.com/andybalholm/brotli v1.1.1/go.mod h1:05ib4cKhjx3OQYUY22hTVd34Bc8upXjOLL2rKwwZBoA=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chromedp/cdproto v0.0.0-20230802225258-3cf4e6d46a89/go.mod h1:GKljq0VrfU4D5yc+2qA6OVr8pmO/MBbPEWqWQ/oqGEs=
github.com/chromedp/chromedp v0.9.2/go.mod h1:LkSXJKONWTCHAfQasKFUZI+mxqS4tZqhmtGzzhLsnLs=
github.com/chromedp/sysutil v1.0.0/go.mod h1:kgWmDdq8fTzXYcKIBqIYvRRTnYb9aNS9moAV0xufSww=
@@ -15,14 +19,36 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgraph-io/badger/v4 v4.7.0 h1:Q+J8HApYAY7UMpL8d9owqiB+odzEc0zn/aqOD9jhc6Y=
github.com/dgraph-io/badger/v4 v4.7.0/go.mod h1:He7TzG3YBy3j4f5baj5B7Zl2XyfNe5bl4Udl0aPemVA=
github.com/dgraph-io/ristretto/v2 v2.2.0 h1:bkY3XzJcXoMuELV8F+vS8kzNgicwQFAaGINAEJdWGOM=
github.com/dgraph-io/ristretto/v2 v2.2.0/go.mod h1:RZrm63UmcBAaYWC1DotLYBmTvgkrs0+XhBd7Npn7/zI=
github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da h1:aIftn67I1fkbMa512G+w+Pxci9hJPB8oMnkcP3iZF38=
github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/fasthttp/websocket v1.5.12 h1:e4RGPpWW2HTbL3zV0Y/t7g0ub294LkiuXXUuTOUInlE=
github.com/fasthttp/websocket v1.5.12/go.mod h1:I+liyL7/4moHojiOgUOIKEWm9EIxHqxZChS+aMFltyg=
github.com/fatih/color v1.18.0 h1:S8gINlzdQ840/4pfAwic/ZE0djQEH3wM94VfqLTZcOM=
github.com/fatih/color v1.18.0/go.mod h1:4FelSpRwEGDpQ12mAdzqdOukCy4u8WUtOY6lkT/6HfU=
github.com/felixge/fgprof v0.9.3/go.mod h1:RdbpDgzqYVh/T9fPELJyV7EYJuHB55UTEULNun8eiPw=
github.com/felixge/fgprof v0.9.5 h1:8+vR6yu2vvSKn08urWyEuxx75NWPEvybbkBirEpsbVY=
github.com/felixge/fgprof v0.9.5/go.mod h1:yKl+ERSa++RYOs32d8K6WEXCB4uXdLls4ZaZPpayhMM=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/gobwas/httphead v0.1.0 h1:exrUm0f4YX0L7EBwZHuCF4GDp8aJfVeBrlLQrs6NqWU=
github.com/gobwas/httphead v0.1.0/go.mod h1:O/RXo79gxV8G+RqlR/otEwx4Q36zl9rqC5u12GKvMCM=
github.com/gobwas/pool v0.2.1 h1:xfeeEhW7pwmX8nuLVlqbzVc7udMDrwetjEv+TZIz1og=
github.com/gobwas/pool v0.2.1/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw=
github.com/gobwas/ws v1.2.1 h1:F2aeBZrm2NDsc7vbovKrWSogd4wvfAxg0FQ89/iqOTk=
github.com/gobwas/ws v1.2.1/go.mod h1:hRKAFb8wOxFROYNsT1bqfWnhX+b5MFeJM9r2ZSwg/KY=
github.com/google/flatbuffers v25.2.10+incompatible h1:F3vclr7C3HpB1k9mxCGRMXq6FdUalZ6H/pNX4FP1v0Q=
github.com/google/flatbuffers v25.2.10+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/pprof v0.0.0-20211214055906-6f57359322fd/go.mod h1:KgnwoLYCZ8IQu3XUZ8Nc/bM9CCZFOyjUNOSygVozoDg=
github.com/google/pprof v0.0.0-20240227163752-401108e1b7e7/go.mod h1:czg5+yv1E0ZGTi6S6vVK1mke0fV+FaUhNGcd6VRS9Ik=
github.com/google/pprof v0.0.0-20250630185457-6e76a2b096b5 h1:xhMrHhTJ6zxu3gA4enFM9MLn9AY7613teCdFnlUVbSQ=
@@ -34,8 +60,14 @@ github.com/ianlancetaylor/demangle v0.0.0-20230524184225-eabc099b10ab/go.mod h1:
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0 h1:iQTw/8FWTuc7uiaSepXwyf3o52HaUYcV+Tu66S3F5GA=
github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0/go.mod h1:1NbS8ALrpOvjt0rHPNLyCIeMtbizbir8U//inJ+zuB8=
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
github.com/klauspost/cpuid/v2 v2.2.11 h1:0OwqZRYI2rFrjS4kvkDnqJkKHdHaRnCm68/DY4OxRzU=
github.com/klauspost/cpuid/v2 v2.2.11/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/ledongthuc/pdf v0.0.0-20220302134840-0c2507a12d80/go.mod h1:imJHygn/1yfhB7XSJJKlFZKl/J+dCPAknuiaGOshXAs=
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE=
@@ -50,8 +82,12 @@ github.com/pkg/profile v1.7.0/go.mod h1:8Uer0jas47ZQMJ7VD+OHknK4YDY07LPUC6dEvqDj
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
github.com/rs/cors v1.11.1 h1:eU3gRzXLRK57F5rKMGMZURNdIG4EoAmX8k94r9wXWHA=
github.com/rs/cors v1.11.1/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU=
github.com/savsgio/gotils v0.0.0-20240704082632-aef3928b8a38 h1:D0vL7YNisV2yqE55+q0lFuGse6U8lxlg7fYTctlT5Gc=
github.com/savsgio/gotils v0.0.0-20240704082632-aef3928b8a38/go.mod h1:sM7Mt7uEoCeFSCBM+qBrqvEo+/9vdmj19wzp3yzUhmg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
@@ -63,19 +99,38 @@ github.com/templexxx/cpu v0.1.1 h1:isxHaxBXpYFWnk2DReuKkigaZyrjs2+9ypIdGP4h+HI=
github.com/templexxx/cpu v0.1.1/go.mod h1:w7Tb+7qgcAlIyX4NhLuDKt78AHA5SzPmq0Wj6HiEnnk=
github.com/templexxx/xhex v0.0.0-20200614015412-aed53437177b h1:XeDLE6c9mzHpdv3Wb1+pWBaWv/BlHK0ZYIu/KaL6eHg=
github.com/templexxx/xhex v0.0.0-20200614015412-aed53437177b/go.mod h1:7rwmCH0wC2fQvNEvPZ3sKXukhyCTyiaZ5VTZMQYpZKQ=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasthttp v1.62.0 h1:8dKRBX/y2rCzyc6903Zu1+3qN0H/d2MsxPPmVNamiH0=
github.com/valyala/fasthttp v1.62.0/go.mod h1:FCINgr4GKdKqV8Q0xv8b+UxPV+H/O5nNFo3D+r54Htg=
github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU=
github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E=
go-simpler.org/env v0.12.0 h1:kt/lBts0J1kjWJAnB740goNdvwNxt5emhYngL0Fzufs=
go-simpler.org/env v0.12.0/go.mod h1:cc/5Md9JCUM7LVLtN0HYjPTDcI3Q8TDaPlNTAlDU+WI=
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
go.opentelemetry.io/otel v1.35.0 h1:xKWKPxrxB6OtMCbmMY021CqC45J+3Onta9MqjhnusiQ=
go.opentelemetry.io/otel v1.35.0/go.mod h1:UEqy8Zp11hpkUrL73gSlELM0DupHoiq72dR+Zqel/+Y=
go.opentelemetry.io/otel/metric v1.35.0 h1:0znxYu2SNyuMSQT4Y9WDWej0VpcsxkuklLa4/siN90M=
go.opentelemetry.io/otel/metric v1.35.0/go.mod h1:nKVFgxBZ2fReX6IlyW28MgZojkoAkJGaE8CpgeAU3oE=
go.opentelemetry.io/otel/trace v1.35.0 h1:dPpEfJu1sDIqruz7BHFG3c7528f6ddfSWfFDVt/xgMs=
go.opentelemetry.io/otel/trace v1.35.0/go.mod h1:WUk7DtFp1Aw2MkvqGdwiXYDZZNvA/1J8o6xRXLrIkyc=
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b h1:M2rDM6z3Fhozi9O7NWsxAkg/yqS/lQJ6PmkyIV3YP+o=
golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b/go.mod h1:3//PLf8L/X+8b4vuAfHzxeRUl04Adcb341+IGKfnqS8=
golang.org/x/net v0.40.0 h1:79Xs7wF06Gbdcg4kdCCIQArK11Z1hr5POQ6+fIYHNuY=
golang.org/x/net v0.40.0/go.mod h1:y0hY0exeL2Pku80/zKK7tpntoX23cqL3Oa6njdgRtds=
golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220310020820-b874c991c1a5/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw=
golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY=
google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@@ -24,6 +24,7 @@ func GetRemoteFromReq(r *http.Request) (rr string) {
remoteAddress = r.Header.Get("Forwarded")
if remoteAddress == "" {
rr = r.RemoteAddr
return
} else {
splitted := strings.Split(remoteAddress, ", ")
if len(splitted) >= 1 {

View File

@@ -0,0 +1,7 @@
package listener
type I interface {
Write(p []byte) (n int, err error)
Close() error
Remote() string
}

View File

@@ -2,16 +2,13 @@ package publisher
import (
"not.realy.lol/event"
"not.realy.lol/interfaces/typer"
)
type Message interface {
Type() string
}
type I interface {
Message
typer.T
Deliver(ev *event.E)
Receive(msg Message)
Receive(msg typer.T)
}
type Publishers []I

View File

@@ -2,8 +2,18 @@ package server
import (
"net/http"
"not.realy.lol/context"
"not.realy.lol/event"
"not.realy.lol/interfaces/store"
)
type I interface {
HandleRelayInfo(w http.ResponseWriter, r *http.Request)
Context() context.T
HandleRelayInfo(
w http.ResponseWriter, r *http.Request,
)
Storage() store.I
AddEvent(
c context.T, ev *event.E, hr *http.Request, remote string,
) (accepted bool, message []byte)
}

View File

@@ -20,11 +20,11 @@ import (
// I am a type for a persistence layer for nostr events handled by a relay.
type I interface {
Initer
Pather
io.Closer
Pather
Wiper
Querier
Querent
Deleter
Saver
@@ -33,17 +33,6 @@ type I interface {
Syncer
LogLeveler
EventIdSerialer
Accountant
}
type Initer interface {
// Init is called at the very beginning by [Server.Start], after
// [relay.Init], allowing a storage to initialize its internal resources.
//
// The parameters can be used by the database implementations to set custom
// parameters such as cache management and other relevant parameters to the
// specific implementation.
Init(path string) (err error)
}
type Pather interface {

10
interfaces/typer/typer.go Normal file
View File

@@ -0,0 +1,10 @@
// Package typer is an interface for interfaces to use to identify their type simply for
// aggregating multiple self-registered interfaces such that the top level can recognise the
// type of a message and match it to the type of handler.
package typer
type T interface {
// Type returns a type identifier string to allow multiple self-registering publisher.I to
// be used with an abstraction to allow multiple APIs to publish.
Type() string
}

10
main.go
View File

@@ -8,6 +8,7 @@ import (
"not.realy.lol/chk"
"not.realy.lol/config"
"not.realy.lol/context"
"not.realy.lol/database"
"not.realy.lol/interrupt"
"not.realy.lol/log"
"not.realy.lol/lol"
@@ -51,6 +52,12 @@ func main() {
wg := &sync.WaitGroup{}
c, cancel := context.Cancel(context.Bg())
interrupt.AddHandler(func() { cancel() })
var sto *database.D
if sto, err = database.New(
c, cancel, cfg.DataDir, cfg.LogLevel,
); chk.E(err) {
return
}
serveMux := servemux.New()
s := &server.S{
Ctx: c,
@@ -59,10 +66,11 @@ func main() {
Addr: net.JoinHostPort(cfg.Listen, strconv.Itoa(cfg.Port)),
Mux: serveMux,
Cfg: cfg,
Store: sto,
}
wg.Add(1)
interrupt.AddHandler(func() { s.Shutdown() })
socketapi.New(s, "/{$}", serveMux)
socketapi.New(s, "/{$}", serveMux, socketapi.DefaultSocketParams())
if err = s.Start(); chk.E(err) {
os.Exit(1)
}

View File

@@ -1,26 +1,30 @@
// Package publisher is a singleton package that keeps track of subscriptions in
// both websockets and http SSE, including managing the authentication state of
// a connection.
// Package publisher is a singleton package that keeps track of subscriptions
// from relevant API connections.
package publish
import (
"not.realy.lol/event"
"not.realy.lol/publish/publisher"
"not.realy.lol/interfaces/publisher"
"not.realy.lol/interfaces/typer"
)
var P = &S{}
func (s *S) Register(p publisher.I) {
s.Publishers = append(s.Publishers, p)
}
// S is the control structure for the subscription management scheme.
type S struct {
publisher.Publishers
}
// New creates a new publish.S.
// New creates a new publisher.
func New(p ...publisher.I) (s *S) {
s = &S{Publishers: p}
return
}
var _ publisher.I = &S{}
func (s *S) Type() string { return "publish" }
func (s *S) Deliver(ev *event.E) {
@@ -30,7 +34,7 @@ func (s *S) Deliver(ev *event.E) {
}
}
func (s *S) Receive(msg publisher.Message) {
func (s *S) Receive(msg typer.T) {
t := msg.Type()
for _, p := range s.Publishers {
if p.Type() == t {

54
reason/reason.go Normal file
View File

@@ -0,0 +1,54 @@
package reason
import (
"bytes"
"errors"
"fmt"
)
// R is the machine-readable prefix before the colon in an OK or CLOSED envelope message.
// Below are the most common kinds that are mentioned in NIP-01.
type R []byte
var (
AuthRequired = R("auth-required")
PoW = R("pow")
Duplicate = R("duplicate")
Blocked = R("blocked")
RateLimited = R("rate-limited")
Invalid = R("invalid")
Error = R("error")
Unsupported = R("unsupported")
Restricted = R("restricted")
)
// S returns the R as a string
func (r R) S() string { return string(r) }
// B returns the R as a byte slice.
func (r R) B() []byte { return r }
// IsPrefix returns whether a text contains the same R prefix.
func (r R) IsPrefix(reason []byte) bool {
return bytes.HasPrefix(reason, r.B())
}
// F allows creation of a full R text with a printf style format.
func (r R) F(format string, params ...any) []byte {
return Msg(r, format, params...)
}
// Err allows creation of the error reason as an error
func (r R) Err(format string, params ...any) error {
b := Msg(r, format, params...)
return errors.New(string(b))
}
// Msg constructs a properly formatted message with a machine-readable prefix for OK and CLOSED
// envelopes.
func Msg(prefix R, format string, params ...any) []byte {
if len(prefix) < 1 {
prefix = Error
}
return []byte(fmt.Sprintf(prefix.S()+": "+format, params...))
}

17
server/add-event.go Normal file
View File

@@ -0,0 +1,17 @@
package server
import (
"net/http"
"not.realy.lol/context"
"not.realy.lol/event"
)
func (s *S) AddEvent(
c context.T, ev *event.E, hr *http.Request, remote string,
) (accepted bool, message []byte) {
// TODO implement me
panic("implement me")
}

View File

@@ -9,10 +9,9 @@ import (
"not.realy.lol/chk"
"not.realy.lol/config"
"not.realy.lol/context"
"not.realy.lol/helpers"
"not.realy.lol/interfaces/store"
"not.realy.lol/log"
"not.realy.lol/servemux"
"not.realy.lol/store"
"sync"
"time"
)
@@ -29,6 +28,8 @@ type S struct {
huma.API
}
func (s *S) Storage() store.I { return s.Store }
func (s *S) Init() {}
func (s *S) Start() (err error) {
@@ -62,8 +63,6 @@ func (s *S) Start() (err error) {
// ServeHTTP is the server http.Handler.
func (s *S) ServeHTTP(w http.ResponseWriter, r *http.Request) {
remote := helpers.GetRemoteFromReq(r)
log.T.F("server.S.ServeHTTP to %s", remote)
s.Mux.ServeHTTP(w, r)
}
@@ -76,3 +75,5 @@ func (s *S) Shutdown() {
chk.E(s.HTTPServer.Shutdown(s.Ctx))
s.WG.Done()
}
func (s *S) Context() context.T { return s.Ctx }

255
socketapi/handleEvent.go Normal file
View File

@@ -0,0 +1,255 @@
package socketapi
import (
"bytes"
"github.com/minio/sha256-simd"
"not.realy.lol/chk"
"not.realy.lol/context"
"not.realy.lol/envelopes/eventenvelope"
"not.realy.lol/envelopes/okenvelope"
"not.realy.lol/event"
"not.realy.lol/filter"
"not.realy.lol/hex"
"not.realy.lol/interfaces/server"
"not.realy.lol/interfaces/store"
"not.realy.lol/ints"
"not.realy.lol/kind"
"not.realy.lol/log"
"not.realy.lol/tag"
)
func (a *A) HandleEvent(r []byte, s server.I, remote string) (msg []byte) {
log.T.F("%s handleEvent %s", remote, r)
var err error
var ok bool
var rem []byte
sto := s.Storage()
if sto == nil {
panic("no event store has been set to store event")
}
env := eventenvelope.NewSubmission()
if rem, err = env.Unmarshal(r); chk.E(err) {
return
}
if len(rem) > 0 {
log.T.F("%s extra '%s'", remote, rem)
}
if err = a.VerifyEvent(env); chk.E(err) {
return
}
if env.E.Kind.K == kind.Deletion.K {
if err = a.CheckDelete(a.Context(), env, sto); chk.E(err) {
return
}
}
var reason []byte
ok, reason = s.AddEvent(
a.Context(), env.E, a.Listener.Req(), remote,
)
log.T.F("%s <- event added %v", remote, ok)
if err = okenvelope.NewFrom(
env.Id(), ok, reason,
).Write(a.Listener); chk.E(err) {
return
}
return
}
func (a *A) VerifyEvent(env *eventenvelope.Submission) (err error) {
if !bytes.Equal(env.GetIDBytes(), env.Id()) {
if err = Ok.Invalid(
a, env, "event id is computed incorrectly",
); chk.E(err) {
return
}
return
}
var ok bool
if ok, err = env.Verify(); chk.T(err) {
if err = Ok.Error(
a, env, "failed to verify signature", err,
); chk.T(err) {
return
}
return
} else if !ok {
if err = Ok.Error(a, env, "signature is invalid", err); chk.T(err) {
return
}
return
}
return
}
func (a *A) CheckDelete(
c context.T, env *eventenvelope.Submission, sto store.I,
) (err error) {
log.I.F("delete event\n%s", env.E.Serialize())
for _, t := range env.Tags.ToSliceOfTags() {
var res []*event.E
if t.Len() >= 2 {
switch {
case bytes.Equal(t.Key(), []byte("e")):
evId := make([]byte, sha256.Size)
if _, err = hex.DecBytes(evId, t.Value()); chk.E(err) {
continue
}
res, err = sto.QueryEvents(c, &filter.T{IDs: tag.New(evId)})
if err != nil {
if err = Ok.Error(
a, env, "failed to query for target event",
); chk.T(err) {
return
}
return
}
for i := range res {
if res[i].Kind.Equal(kind.Deletion) {
if err = Ok.Blocked(
a, env,
"not processing or storing delete event containing delete event references",
); chk.E(err) {
return
}
return
}
if !bytes.Equal(res[i].Pubkey, env.E.Pubkey) {
if err = Ok.Blocked(
a, env,
"cannot delete other users' events (delete by e tag)",
); chk.E(err) {
return
}
return
}
}
case bytes.Equal(t.Key(), []byte("a")):
split := bytes.Split(t.Value(), []byte{':'})
if len(split) != 3 {
continue
}
var pk []byte
if pk, err = hex.DecAppend(nil, split[1]); chk.E(err) {
if err = Ok.Invalid(
a, env,
"delete event a tag pubkey value invalid: %s",
t.Value(),
); chk.T(err) {
}
return
}
kin := ints.New(uint16(0))
if _, err = kin.Unmarshal(split[0]); chk.E(err) {
if err = Ok.Invalid(
a, env,
"delete event a tag kind value invalid: %s", t.Value(),
); chk.T(err) {
return
}
return
}
kk := kind.New(kin.Uint16())
if kk.Equal(kind.Deletion) {
if err = Ok.Blocked(
a, env, "delete event kind may not be deleted",
); chk.E(err) {
return
}
return
}
if !kk.IsParameterizedReplaceable() {
if err = Ok.Error(
a, env,
"delete tags with a tags containing non-parameterized-replaceable events cannot be processed",
); chk.E(err) {
return
}
return
}
if !bytes.Equal(pk, env.E.Pubkey) {
log.I.S(pk, env.E.Pubkey, env.E)
if err = Ok.Blocked(
a, env,
"cannot delete other users' events (delete by a tag)",
); chk.E(err) {
return
}
return
}
f := filter.New()
f.Kinds.K = []*kind.T{kk}
f.Authors.Append(pk)
f.Tags.AppendTags(tag.New([]byte{'#', 'd'}, split[2]))
if res, err = sto.QueryEvents(c, f); err != nil {
if err = Ok.Error(
a, env,
"failed to query for target event",
); chk.T(err) {
return
}
return
}
}
}
if len(res) < 1 {
continue
}
var resTmp event.S
for _, v := range res {
if env.E.CreatedAt.U64() >= v.CreatedAt.U64() {
resTmp = append(resTmp, v)
}
}
res = resTmp
for _, target := range res {
var skip bool
if skip, err = a.ProcessDelete(c, target, env, sto); skip {
continue
} else if err != nil {
return
}
}
res = nil
}
if err = okenvelope.NewFrom(env.Id(), true).Write(a.Listener); chk.E(err) {
return
}
return
}
func (a *A) ProcessDelete(
c context.T, target *event.E, env *eventenvelope.Submission,
sto store.I,
) (skip bool, err error) {
if target.Kind.K == kind.Deletion.K {
if err = Ok.Error(
a, env, "cannot delete delete event %s", env.Id,
); chk.E(err) {
return
}
}
if target.CreatedAt.Int() > env.E.CreatedAt.Int() {
if err = Ok.Error(
a, env,
"not deleting\n%d%\nbecause delete event is older\n%d",
target.CreatedAt.Int(), env.E.CreatedAt.Int(),
); chk.E(err) {
return
}
skip = true
}
if !bytes.Equal(target.Pubkey, env.Pubkey) {
if err = Ok.Error(a, env, "only author can delete event"); chk.E(err) {
return
}
return
}
if err = sto.DeleteEvent(c, target.EventId()); chk.T(err) {
if err = Ok.Error(a, env, err.Error()); chk.T(err) {
return
}
return
}
return
}

View File

@@ -0,0 +1,45 @@
package socketapi
import (
"fmt"
"not.realy.lol/chk"
"not.realy.lol/envelopes"
"not.realy.lol/envelopes/authenvelope"
"not.realy.lol/envelopes/closeenvelope"
"not.realy.lol/envelopes/eventenvelope"
"not.realy.lol/envelopes/noticeenvelope"
"not.realy.lol/envelopes/reqenvelope"
"not.realy.lol/log"
)
func (a *A) HandleMessage(msg []byte, remote string) {
log.T.F("received message from %s\n%s", remote, msg)
var notice []byte
var err error
var t string
var rem []byte
if t, rem = envelopes.Identify(msg); chk.E(err) {
notice = []byte(err.Error())
}
switch t {
case eventenvelope.L:
notice = a.HandleEvent(rem, a.I, remote)
case reqenvelope.L:
// notice = a.HandleReq(
// a.Context(), rem, a.Server, a.Listener.AuthedBytes(), remote,
// )
case closeenvelope.L:
// notice = a.HandleClose(rem, a.Server)
case authenvelope.L:
// notice = a.HandleAuth(rem, a.Server)
default:
notice = []byte(fmt.Sprintf("unknown envelope type %s\n%s", t, rem))
}
if len(notice) > 0 {
log.D.F("notice->%s %s", remote, notice)
if err = noticeenvelope.NewFrom(notice).Write(a.Listener); err != nil {
return
}
}
}

110
socketapi/ok.go Normal file
View File

@@ -0,0 +1,110 @@
package socketapi
import (
"not.realy.lol/envelopes/eid"
"not.realy.lol/envelopes/okenvelope"
"not.realy.lol/reason"
)
type OK func(a *A, env eid.Ider, format string, params ...any) (err error)
type OKs struct {
AuthRequired OK
PoW OK
Duplicate OK
Blocked OK
RateLimited OK
Invalid OK
Error OK
Unsupported OK
Restricted OK
}
var Ok = OKs{
AuthRequired: func(
a *A, env eid.Ider, format string, params ...any,
) (err error) {
rr := reason.AuthRequired.F(format, params...)
r := okenvelope.NewFrom(
env.Id(), false, rr,
)
r.Write(a.Listener)
return reason.AuthRequired.Err(format, params...)
},
PoW: func(a *A, env eid.Ider, format string, params ...any) (err error) {
rr := reason.PoW.F(format, params...)
r := okenvelope.NewFrom(
env.Id(), false, rr,
)
r.Write(a.Listener)
return reason.PoW.Err(format, params...)
},
Duplicate: func(
a *A, env eid.Ider, format string, params ...any,
) (err error) {
rr := reason.Duplicate.F(format, params...)
r := okenvelope.NewFrom(
env.Id(), false, rr,
)
r.Write(a.Listener)
return reason.Duplicate.Err(format, params...)
},
Blocked: func(
a *A, env eid.Ider, format string, params ...any,
) (err error) {
rr := reason.Blocked.F(format, params...)
r := okenvelope.NewFrom(
env.Id(), false, rr,
)
r.Write(a.Listener)
return reason.Blocked.Err(format, params...)
},
RateLimited: func(
a *A, env eid.Ider, format string, params ...any,
) (err error) {
rr := reason.RateLimited.F(format, params...)
r := okenvelope.NewFrom(
env.Id(), false, rr,
)
r.Write(a.Listener)
return reason.RateLimited.Err(format, params...)
},
Invalid: func(
a *A, env eid.Ider, format string, params ...any,
) (err error) {
rr := reason.Invalid.F(format, params...)
r := okenvelope.NewFrom(
env.Id(), false, rr,
)
r.Write(a.Listener)
return reason.Invalid.Err(format, params...)
},
Error: func(a *A, env eid.Ider, format string, params ...any) (err error) {
rr := reason.Error.F(format, params...)
r := okenvelope.NewFrom(
env.Id(), false, rr,
)
r.Write(a.Listener)
return reason.Error.Err(format, params...)
},
Unsupported: func(
a *A, env eid.Ider, format string, params ...any,
) (err error) {
rr := reason.Unsupported.F(format, params...)
r := okenvelope.NewFrom(
env.Id(), false, rr,
)
r.Write(a.Listener)
return reason.Unsupported.Err(format, params...)
},
Restricted: func(
a *A, env eid.Ider, format string, params ...any,
) (err error) {
rr := reason.Restricted.F(format, params...)
r := okenvelope.NewFrom(
env.Id(), false, rr,
)
r.Write(a.Listener)
return reason.Restricted.Err(format, params...)
},
}

130
socketapi/publisher.go Normal file
View File

@@ -0,0 +1,130 @@
package socketapi
import (
"not.realy.lol/interfaces/listener"
"not.realy.lol/interfaces/typer"
"not.realy.lol/log"
"regexp"
"sync"
"not.realy.lol/chk"
"not.realy.lol/envelopes/eventenvelope"
"not.realy.lol/event"
"not.realy.lol/filters"
"not.realy.lol/interfaces/publisher"
"not.realy.lol/publish"
)
const Type = "socketapi"
var (
NIP20prefixmatcher = regexp.MustCompile(`^\w+: `)
)
// Map is a map of filters associated with a collection of ws.Listener connections.
type Map map[listener.I]map[string]*filters.T
type W struct {
listener.I
// If Cancel is true, this is a close command.
Cancel bool
// Id is the subscription Id. If Cancel is true, cancel the named
// subscription, otherwise, cancel the publisher for the socket.
Id string
Receiver event.C
Filters *filters.T
}
func (w *W) Type() string { return Type }
type Close struct {
listener.I
Id string
}
type S struct {
// Mx is the mutex for the Map.
Mx sync.Mutex
// Map is the map of subscribers and subscriptions from the websocket api.
Map
}
var _ publisher.I = &S{}
func init() {
publish.P.Register(NewPublisher())
}
func NewPublisher() *S { return &S{Map: make(Map)} }
func (p *S) Type() string { return Type }
func (p *S) Receive(msg typer.T) {
if m, ok := msg.(*W); ok {
if m.Cancel {
if m.Id == "" {
log.T.F("removing subscriber %s", m.I.Remote())
p.removeSubscriber(m.I)
} else {
log.T.F(
"removing subscription %s of %s",
m.Id, m.I.Remote(),
)
p.removeSubscriberId(m.I, m.Id)
}
return
}
p.Mx.Lock()
if subs, ok := p.Map[m.I]; !ok {
subs = make(map[string]*filters.T)
p.Map[m.I] = subs
} else {
subs[m.Id] = m.Filters
}
p.Mx.Unlock()
}
}
func (p *S) Deliver(ev *event.E) {
var err error
p.Mx.Lock()
for w, subs := range p.Map {
for id, subscriber := range subs {
if !subscriber.Match(ev) {
continue
}
var res *eventenvelope.Result
if res, err = eventenvelope.NewResultWith(id, ev); chk.E(err) {
continue
}
if err = res.Write(w); chk.E(err) {
continue
}
}
}
p.Mx.Unlock()
}
// removeSubscriberId removes a specific subscription from a subscriber websocket.
func (p *S) removeSubscriberId(ws listener.I, id string) {
p.Mx.Lock()
var subs map[string]*filters.T
var ok bool
if subs, ok = p.Map[ws]; ok {
delete(p.Map[ws], id)
_ = subs
if len(subs) == 0 {
delete(p.Map, ws)
}
}
p.Mx.Unlock()
}
// removeSubscriber removes a websocket from the S collection.
func (p *S) removeSubscriber(ws listener.I) {
p.Mx.Lock()
clear(p.Map[ws])
delete(p.Map, ws)
p.Mx.Unlock()
}

View File

@@ -1,24 +1,56 @@
package socketapi
import (
"github.com/fasthttp/websocket"
"net/http"
"not.realy.lol/chk"
"not.realy.lol/context"
"not.realy.lol/helpers"
"not.realy.lol/interfaces/server"
"not.realy.lol/log"
"not.realy.lol/publish"
"not.realy.lol/servemux"
"not.realy.lol/units"
"not.realy.lol/ws"
"strings"
"time"
)
type SocketParams struct {
WriteWait time.Duration
PongWait time.Duration
PingWait time.Duration
MaxMessageSize int64
}
func DefaultSocketParams() *SocketParams {
return &SocketParams{
WriteWait: 10 * time.Second,
PongWait: 60 * time.Second,
PingWait: 30 * time.Second,
MaxMessageSize: 1 * units.Mb,
}
}
type A struct {
Ctx context.T
server.I
// Web is an optional web server that appears on `/` with no Upgrade for
// websockets or Accept for application/nostr+json present.
Web http.Handler
*SocketParams
Listener *ws.Listener
}
func New(s server.I, path string, sm *servemux.S) {
a := &A{I: s}
var Upgrader = websocket.Upgrader{
ReadBufferSize: 1024, WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
return true
},
}
func New(s server.I, path string, sm *servemux.S, socketParams *SocketParams) {
a := &A{I: s, SocketParams: socketParams}
sm.Handle(path, a)
return
}
@@ -28,10 +60,9 @@ func New(s server.I, path string, sm *servemux.S) {
// processes WebSocket upgrade requests when applicable.
func (a *A) ServeHTTP(w http.ResponseWriter, r *http.Request) {
remote := helpers.GetRemoteFromReq(r)
log.T.F("socketAPI handling %s", remote)
if r.Header.Get("Upgrade") != "websocket" &&
r.Header.Get("Accept") == "application/nostr+json" {
log.T.F("serving realy info %s", remote)
log.T.F("serving relayinfo %s", remote)
a.I.HandleRelayInfo(w, r)
return
}
@@ -43,4 +74,109 @@ func (a *A) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
return
}
var err error
ticker := time.NewTicker(a.PingWait)
var cancel context.F
a.Ctx, cancel = context.Cancel(a.I.Context())
var conn *websocket.Conn
if conn, err = Upgrader.Upgrade(w, r, nil); err != nil {
log.E.F("%s failed to upgrade websocket: %v", remote, err)
return
}
log.T.F(
"upgraded to websocket %s (remote %s local %s)", remote,
conn.RemoteAddr(), conn.LocalAddr(),
)
a.Listener = ws.NewListener(conn, r)
defer func() {
log.T.F("remote %s closed connection", remote)
cancel()
ticker.Stop()
publish.P.Receive(
&W{
Cancel: true,
I: a.Listener,
},
)
chk.E(a.Listener.Close())
}()
conn.SetReadLimit(a.MaxMessageSize)
chk.E(conn.SetReadDeadline(time.Now().Add(a.PongWait)))
conn.SetPongHandler(
func(string) error {
chk.E(conn.SetReadDeadline(time.Now().Add(a.PongWait)))
return nil
},
)
go a.Pinger(a.Ctx, ticker, cancel, remote)
var message []byte
var typ int
for {
select {
case <-a.Ctx.Done():
log.I.F("%s closing connection", remote)
a.Listener.Close()
return
default:
}
typ, message, err = conn.ReadMessage()
if err != nil {
if strings.Contains(
err.Error(), "use of closed network connection",
) {
return
}
if websocket.IsUnexpectedCloseError(
err,
websocket.CloseNormalClosure,
websocket.CloseGoingAway,
websocket.CloseNoStatusReceived,
websocket.CloseAbnormalClosure,
) {
log.W.F(
"unexpected close error from %s: %v",
a.Listener.Request.Header.Get("X-Forwarded-For"), err,
)
}
return
}
if typ == websocket.PingMessage {
log.T.F("pinging %s", remote)
if _, err = a.Listener.Write(nil); chk.E(err) {
}
continue
}
go a.HandleMessage(message, remote)
}
}
func (a *A) Pinger(
ctx context.T, ticker *time.Ticker, cancel context.F, remote string,
) {
log.T.F("running pinger for %s", remote)
defer func() {
cancel()
ticker.Stop()
_ = a.Listener.Conn.Close()
log.T.F("stopped pinger for %s", remote)
}()
var err error
for {
select {
case <-ticker.C:
err = a.Listener.Conn.WriteControl(
websocket.PingMessage, nil,
time.Now().Add(a.PingWait),
)
if err != nil {
log.E.F(
"%s error writing ping: %v; closing websocket", remote, err,
)
return
}
case <-ctx.Done():
return
}
}
}

198
ws/connection.go Normal file
View File

@@ -0,0 +1,198 @@
package ws
import (
"bytes"
"compress/flate"
"crypto/tls"
"errors"
"io"
"net"
"net/http"
"github.com/gobwas/httphead"
"github.com/gobwas/ws"
"github.com/gobwas/ws/wsflate"
"github.com/gobwas/ws/wsutil"
"not.realy.lol/chk"
"not.realy.lol/context"
"not.realy.lol/errorf"
"not.realy.lol/log"
)
// Connection is an outbound client -> relay connection.
type Connection struct {
conn net.Conn
enableCompression bool
controlHandler wsutil.FrameHandlerFunc
flateReader *wsflate.Reader
reader *wsutil.Reader
flateWriter *wsflate.Writer
writer *wsutil.Writer
msgStateR *wsflate.MessageState
msgStateW *wsflate.MessageState
}
// NewConnection creates a new Connection.
func NewConnection(
c context.T, url string, requestHeader http.Header,
tlsConfig *tls.Config,
) (*Connection, error) {
dialer := ws.Dialer{
Header: ws.HandshakeHeaderHTTP(requestHeader),
Extensions: []httphead.Option{
wsflate.DefaultParameters.Option(),
},
TLSConfig: tlsConfig,
}
conn, _, hs, err := dialer.Dial(c, url)
if err != nil {
return nil, errorf.E("failed to dial: %w", err)
}
enableCompression := false
state := ws.StateClientSide
for _, extension := range hs.Extensions {
if string(extension.Name) == wsflate.ExtensionName {
enableCompression = true
state |= ws.StateExtended
break
}
}
// reader
var flateReader *wsflate.Reader
var msgStateR wsflate.MessageState
if enableCompression {
msgStateR.SetCompressed(true)
flateReader = wsflate.NewReader(
nil, func(r io.Reader) wsflate.Decompressor {
return flate.NewReader(r)
},
)
}
controlHandler := wsutil.ControlFrameHandler(conn, ws.StateClientSide)
reader := &wsutil.Reader{
Source: conn,
State: state,
OnIntermediate: controlHandler,
CheckUTF8: false,
Extensions: []wsutil.RecvExtension{
&msgStateR,
},
}
// writer
var flateWriter *wsflate.Writer
var msgStateW wsflate.MessageState
if enableCompression {
msgStateW.SetCompressed(true)
flateWriter = wsflate.NewWriter(
nil, func(w io.Writer) wsflate.Compressor {
fw, err := flate.NewWriter(w, 4)
if err != nil {
log.E.F("Failed to create flate writer: %v", err)
}
return fw
},
)
}
writer := wsutil.NewWriter(conn, state, ws.OpText)
writer.SetExtensions(&msgStateW)
return &Connection{
conn: conn,
enableCompression: enableCompression,
controlHandler: controlHandler,
flateReader: flateReader,
reader: reader,
msgStateR: &msgStateR,
flateWriter: flateWriter,
writer: writer,
msgStateW: &msgStateW,
}, nil
}
// WriteMessage dispatches a message through the Connection.
func (cn *Connection) WriteMessage(c context.T, data []byte) error {
select {
case <-c.Done():
return errors.New("context canceled")
default:
}
if cn.msgStateW.IsCompressed() && cn.enableCompression {
cn.flateWriter.Reset(cn.writer)
if _, err := io.Copy(
cn.flateWriter, bytes.NewReader(data),
); chk.T(err) {
return errorf.E("failed to write message: %w", err)
}
if err := cn.flateWriter.Close(); chk.T(err) {
return errorf.E("failed to close flate writer: %w", err)
}
} else {
if _, err := io.Copy(cn.writer, bytes.NewReader(data)); chk.T(err) {
return errorf.E("failed to write message: %w", err)
}
}
if err := cn.writer.Flush(); chk.T(err) {
return errorf.E("failed to flush writer: %w", err)
}
return nil
}
// ReadMessage picks up the next incoming message on a Connection.
func (cn *Connection) ReadMessage(c context.T, buf io.Writer) error {
for {
select {
case <-c.Done():
return errors.New("context canceled")
default:
}
h, err := cn.reader.NextFrame()
if err != nil {
cn.conn.Close()
return errorf.E("failed to advance frame: %w", err)
}
if h.OpCode.IsControl() {
if err := cn.controlHandler(h, cn.reader); chk.T(err) {
return errorf.E("failed to handle control frame: %w", err)
}
} else if h.OpCode == ws.OpBinary ||
h.OpCode == ws.OpText {
break
}
if err := cn.reader.Discard(); chk.T(err) {
return errorf.E("failed to discard: %w", err)
}
}
if cn.msgStateR.IsCompressed() && cn.enableCompression {
cn.flateReader.Reset(cn.reader)
if _, err := io.Copy(buf, cn.flateReader); chk.T(err) {
return errorf.E("failed to read message: %w", err)
}
} else {
if _, err := io.Copy(buf, cn.reader); chk.T(err) {
return errorf.E("failed to read message: %w", err)
}
}
return nil
}
// Close the Connection.
func (cn *Connection) Close() error {
return cn.conn.Close()
}

62
ws/listener.go Normal file
View File

@@ -0,0 +1,62 @@
// Package ws implements nostr websockets with their authentication state.
package ws
import (
"net/http"
"not.realy.lol/helpers"
"strings"
"sync"
"github.com/fasthttp/websocket"
"go.uber.org/atomic"
)
// Listener is a websocket implementation for a relay listener.
type Listener struct {
mutex sync.Mutex
Conn *websocket.Conn
Request *http.Request
remote atomic.String
}
// NewListener creates a new Listener for listening for inbound connections for
// a relay.
func NewListener(
conn *websocket.Conn,
req *http.Request,
) (ws *Listener) {
ws = &Listener{Conn: conn, Request: req}
ws.remote.Store(helpers.GetRemoteFromReq(req))
return
}
// Write a message to send to a client.
func (ws *Listener) Write(p []byte) (n int, err error) {
ws.mutex.Lock()
defer ws.mutex.Unlock()
err = ws.Conn.WriteMessage(websocket.TextMessage, p)
if err != nil {
n = len(p)
if strings.Contains(err.Error(), "close sent") {
_ = ws.Close()
err = nil
return
}
}
return
}
// Remote returns the stored remote address of the client.
func (ws *Listener) Remote() string {
return ws.remote.Load()
}
// Req returns the http.Request associated with the client connection to the
// Listener.
func (ws *Listener) Req() *http.Request {
return ws.Request
}
// Close the Listener connection from the Listener side.
func (ws *Listener) Close() (err error) { return ws.Conn.Close() }