Compare commits

...

2 Commits

Author SHA1 Message Date
354a2f1cda Enhance WebSocket write handling and connection management
- Introduced a buffered write channel and a dedicated write worker goroutine to serialize write operations, preventing concurrent write panics.
- Updated the Write and WriteControl methods to send messages through the write channel, improving error handling and connection stability.
- Refactored ping and pong handlers to utilize the new write channel for sending control messages.
- Enhanced publisher logic to manage write channels for WebSocket connections, ensuring efficient message delivery and error handling.
- Bumped version to v0.23.0 to reflect these changes.
2025-11-02 17:02:28 +00:00
0123c2d6f5 Update dependencies and refactor p256k crypto package
- Bumped version of lol.mleku.dev from v1.0.4 to v1.0.5.
- Added new dependencies: p256k1.mleku.dev and several indirect dependencies for improved cryptographic functionality.
- Refactored p256k package to utilize p256k1.mleku.dev/signer for signature operations, replacing the previous btcec implementation.
- Removed the secp256k1.go file, consolidating the crypto logic under the new p256k1 library.
- Updated documentation to reflect changes in the signer interface and usage.
2025-11-02 16:43:58 +00:00
13 changed files with 274 additions and 704 deletions

View File

@@ -83,6 +83,16 @@ whitelist:
remote: remote,
req: r,
startTime: time.Now(),
writeChan: make(chan WriteRequest, 100), // Buffered channel for writes
writeDone: make(chan struct{}),
}
// Start write worker goroutine
go listener.writeWorker()
// Register write channel with publisher
if socketPub := listener.publishers.GetSocketPublisher(); socketPub != nil {
socketPub.SetWriteChan(conn, listener.writeChan)
}
// Check for blacklisted IPs
@@ -110,12 +120,14 @@ whitelist:
return nil
})
// Set ping handler - extends read deadline when pings are received
conn.SetPingHandler(func(string) error {
// Send pong through write channel
conn.SetPingHandler(func(msg string) error {
conn.SetReadDeadline(time.Now().Add(DefaultPongWait))
return conn.WriteControl(websocket.PongMessage, []byte{}, time.Now().Add(DefaultWriteTimeout))
deadline := time.Now().Add(DefaultWriteTimeout)
return listener.WriteControl(websocket.PongMessage, []byte{}, deadline)
})
// Don't pass cancel to Pinger - it should not be able to cancel the connection context
go s.Pinger(ctx, conn, ticker)
go s.Pinger(ctx, listener, ticker)
defer func() {
log.D.F("closing websocket connection from %s", remote)
@@ -123,6 +135,11 @@ whitelist:
cancel()
ticker.Stop()
// Close write channel to signal worker to exit
close(listener.writeChan)
// Wait for write worker to finish
<-listener.writeDone
// Cancel all subscriptions for this connection
log.D.F("cancelling subscriptions for %s", remote)
listener.publishers.Receive(&W{
@@ -222,11 +239,10 @@ whitelist:
}
if typ == websocket.PingMessage {
log.D.F("received PING from %s, sending PONG", remote)
// Create a write context with timeout for pong response
// Send pong through write channel
deadline := time.Now().Add(DefaultWriteTimeout)
conn.SetWriteDeadline(deadline)
pongStart := time.Now()
if err = conn.WriteControl(websocket.PongMessage, msg, deadline); err != nil {
if err = listener.WriteControl(websocket.PongMessage, msg, deadline); err != nil {
pongDuration := time.Since(pongStart)
// Check if this is a timeout vs a connection error
@@ -279,7 +295,7 @@ whitelist:
}
func (s *Server) Pinger(
ctx context.Context, conn *websocket.Conn, ticker *time.Ticker,
ctx context.Context, listener *Listener, ticker *time.Ticker,
) {
defer func() {
log.D.F("pinger shutting down")
@@ -295,12 +311,11 @@ func (s *Server) Pinger(
pingCount++
log.D.F("sending PING #%d", pingCount)
// Set write deadline for ping operation
// Send ping through write channel
deadline := time.Now().Add(DefaultWriteTimeout)
conn.SetWriteDeadline(deadline)
pingStart := time.Now()
if err = conn.WriteControl(websocket.PingMessage, []byte{}, deadline); err != nil {
if err = listener.WriteControl(websocket.PingMessage, []byte{}, deadline); err != nil {
pingDuration := time.Since(pingStart)
// Check if this is a timeout vs a connection error

View File

@@ -7,16 +7,20 @@ import (
"time"
"github.com/gorilla/websocket"
"lol.mleku.dev/chk"
"lol.mleku.dev/errorf"
"lol.mleku.dev/log"
"next.orly.dev/pkg/acl"
"next.orly.dev/pkg/database"
"next.orly.dev/pkg/encoders/event"
"next.orly.dev/pkg/encoders/filter"
"next.orly.dev/pkg/protocol/publish"
"next.orly.dev/pkg/utils"
"next.orly.dev/pkg/utils/atomic"
)
// WriteRequest represents a write operation to be performed by the write worker
type WriteRequest = publish.WriteRequest
type Listener struct {
*Server
conn *websocket.Conn
@@ -28,6 +32,8 @@ type Listener struct {
startTime time.Time
isBlacklisted bool // Marker to identify blacklisted IPs
blacklistTimeout time.Time // When to timeout blacklisted connections
writeChan chan WriteRequest // Channel for write requests
writeDone chan struct{} // Closed when write worker exits
// Diagnostics: per-connection counters
msgCount int
reqCount int
@@ -40,75 +46,80 @@ func (l *Listener) Ctx() context.Context {
return l.ctx
}
// writeWorker is the single goroutine that handles all writes to the websocket connection.
// This serializes all writes to prevent concurrent write panics.
func (l *Listener) writeWorker() {
defer close(l.writeDone)
for {
select {
case <-l.ctx.Done():
return
case req, ok := <-l.writeChan:
if !ok {
return
}
deadline := req.Deadline
if deadline.IsZero() {
deadline = time.Now().Add(DefaultWriteTimeout)
}
l.conn.SetWriteDeadline(deadline)
writeStart := time.Now()
var err error
if req.IsControl {
err = l.conn.WriteControl(req.MsgType, req.Data, deadline)
} else {
err = l.conn.WriteMessage(req.MsgType, req.Data)
}
if err != nil {
writeDuration := time.Since(writeStart)
log.E.F("ws->%s write worker FAILED: len=%d duration=%v error=%v",
l.remote, len(req.Data), writeDuration, err)
// Check for connection errors - if so, stop the worker
isConnectionError := strings.Contains(err.Error(), "use of closed network connection") ||
strings.Contains(err.Error(), "broken pipe") ||
strings.Contains(err.Error(), "connection reset") ||
websocket.IsCloseError(err, websocket.CloseAbnormalClosure,
websocket.CloseGoingAway,
websocket.CloseNoStatusReceived)
if isConnectionError {
return
}
// Continue for other errors (timeouts, etc.)
} else {
writeDuration := time.Since(writeStart)
if writeDuration > time.Millisecond*100 {
log.D.F("ws->%s write worker SLOW: len=%d duration=%v",
l.remote, len(req.Data), writeDuration)
}
}
}
}
}
func (l *Listener) Write(p []byte) (n int, err error) {
start := time.Now()
msgLen := len(p)
// Log message attempt with content preview (first 200 chars for diagnostics)
preview := string(p)
if len(preview) > 200 {
preview = preview[:200] + "..."
// Send write request to channel - non-blocking with timeout
select {
case <-l.ctx.Done():
return 0, l.ctx.Err()
case l.writeChan <- WriteRequest{Data: p, MsgType: websocket.TextMessage, IsControl: false}:
return len(p), nil
case <-time.After(DefaultWriteTimeout):
log.E.F("ws->%s write channel timeout", l.remote)
return 0, errorf.E("write channel timeout")
}
log.T.F(
"ws->%s attempting write: len=%d preview=%q", l.remote, msgLen, preview,
)
}
// Use a separate context with timeout for writes to prevent race conditions
// where the main connection context gets cancelled while writing events
deadline := time.Now().Add(DefaultWriteTimeout)
l.conn.SetWriteDeadline(deadline)
// Attempt the write operation
writeStart := time.Now()
if err = l.conn.WriteMessage(websocket.TextMessage, p); err != nil {
writeDuration := time.Since(writeStart)
totalDuration := time.Since(start)
// Log detailed failure information
log.E.F(
"ws->%s WRITE FAILED: len=%d duration=%v write_duration=%v error=%v preview=%q",
l.remote, msgLen, totalDuration, writeDuration, err, preview,
)
// Check if this is a context timeout
if strings.Contains(err.Error(), "timeout") || strings.Contains(err.Error(), "deadline") {
log.E.F(
"ws->%s write timeout after %v (limit=%v)", l.remote,
writeDuration, DefaultWriteTimeout,
)
}
// Check connection state
if l.conn != nil {
log.T.F(
"ws->%s connection state during failure: remote_addr=%v",
l.remote, l.req.RemoteAddr,
)
}
chk.E(err) // Still call the original error handler
return
// WriteControl sends a control message through the write channel
func (l *Listener) WriteControl(messageType int, data []byte, deadline time.Time) (err error) {
select {
case <-l.ctx.Done():
return l.ctx.Err()
case l.writeChan <- WriteRequest{Data: data, MsgType: messageType, IsControl: true, Deadline: deadline}:
return nil
case <-time.After(DefaultWriteTimeout):
log.E.F("ws->%s writeControl channel timeout", l.remote)
return errorf.E("writeControl channel timeout")
}
// Log successful write with timing
writeDuration := time.Since(writeStart)
totalDuration := time.Since(start)
n = msgLen
log.T.F(
"ws->%s WRITE SUCCESS: len=%d duration=%v write_duration=%v",
l.remote, n, totalDuration, writeDuration,
)
// Log slow writes for performance diagnostics
if writeDuration > time.Millisecond*100 {
log.T.F(
"ws->%s SLOW WRITE detected: %v (>100ms) len=%d", l.remote,
writeDuration, n,
)
}
return
}
// getManagedACL returns the managed ACL instance if available

View File

@@ -3,7 +3,6 @@ package app
import (
"context"
"fmt"
"strings"
"sync"
"time"
@@ -18,6 +17,7 @@ import (
"next.orly.dev/pkg/encoders/kind"
"next.orly.dev/pkg/interfaces/publisher"
"next.orly.dev/pkg/interfaces/typer"
"next.orly.dev/pkg/protocol/publish"
"next.orly.dev/pkg/utils"
)
@@ -33,6 +33,9 @@ type Subscription struct {
// connections.
type Map map[*websocket.Conn]map[string]Subscription
// WriteChanMap maps websocket connections to their write channels
type WriteChanMap map[*websocket.Conn]chan<- publish.WriteRequest
type W struct {
*websocket.Conn
@@ -69,19 +72,37 @@ type P struct {
Mx sync.RWMutex
// Map is the map of subscribers and subscriptions from the websocket api.
Map
// WriteChans maps websocket connections to their write channels
WriteChans WriteChanMap
}
var _ publisher.I = &P{}
func NewPublisher(c context.Context) (publisher *P) {
return &P{
c: c,
Map: make(Map),
c: c,
Map: make(Map),
WriteChans: make(WriteChanMap, 100),
}
}
func (p *P) Type() (typeName string) { return Type }
// SetWriteChan stores the write channel for a websocket connection
func (p *P) SetWriteChan(conn *websocket.Conn, writeChan chan<- publish.WriteRequest) {
p.Mx.Lock()
defer p.Mx.Unlock()
p.WriteChans[conn] = writeChan
}
// GetWriteChan returns the write channel for a websocket connection
func (p *P) GetWriteChan(conn *websocket.Conn) (chan<- publish.WriteRequest, bool) {
p.Mx.RLock()
defer p.Mx.RUnlock()
ch, ok := p.WriteChans[conn]
return ch, ok
}
// Receive handles incoming messages to manage websocket listener subscriptions
// and associated filters.
//
@@ -269,61 +290,40 @@ func (p *P) Deliver(ev *event.E) {
log.D.F("attempting delivery of event %s (kind=%d, len=%d) to subscription %s @ %s",
hex.Enc(ev.ID), ev.Kind, len(msgData), d.id, d.sub.remote)
// Use a separate context with timeout for writes to prevent race conditions
// where the publisher context gets cancelled while writing events
deadline := time.Now().Add(DefaultWriteTimeout)
d.w.SetWriteDeadline(deadline)
// Get write channel for this connection
p.Mx.RLock()
writeChan, hasChan := p.GetWriteChan(d.w)
stillSubscribed := p.Map[d.w] != nil
p.Mx.RUnlock()
deliveryStart := time.Now()
if err = d.w.WriteMessage(websocket.TextMessage, msgData); err != nil {
deliveryDuration := time.Since(deliveryStart)
// Log detailed failure information
log.E.F("subscription delivery FAILED: event=%s to=%s sub=%s duration=%v error=%v",
hex.Enc(ev.ID), d.sub.remote, d.id, deliveryDuration, err)
// Check for timeout specifically
isTimeout := strings.Contains(err.Error(), "timeout") || strings.Contains(err.Error(), "deadline exceeded")
if isTimeout {
log.E.F("subscription delivery TIMEOUT: event=%s to=%s after %v (limit=%v)",
hex.Enc(ev.ID), d.sub.remote, deliveryDuration, DefaultWriteTimeout)
}
// Only close connection on permanent errors, not transient timeouts
// WebSocket write errors typically indicate connection issues, but we should
// distinguish between timeouts (client might be slow) and connection errors
isConnectionError := strings.Contains(err.Error(), "use of closed network connection") ||
strings.Contains(err.Error(), "broken pipe") ||
strings.Contains(err.Error(), "connection reset") ||
websocket.IsCloseError(err, websocket.CloseAbnormalClosure,
websocket.CloseGoingAway,
websocket.CloseNoStatusReceived)
if isConnectionError {
log.D.F("removing failed subscriber connection due to connection error: %s", d.sub.remote)
p.removeSubscriber(d.w)
_ = d.w.Close()
} else if isTimeout {
// For timeouts, log but don't immediately close - give it another chance
// The read deadline will catch dead connections eventually
log.W.F("subscription delivery timeout for %s (client may be slow), skipping event but keeping connection", d.sub.remote)
} else {
// Unknown error - be conservative and close
log.D.F("removing failed subscriber connection due to unknown error: %s", d.sub.remote)
p.removeSubscriber(d.w)
_ = d.w.Close()
}
if !stillSubscribed {
log.D.F("skipping delivery to %s - connection no longer subscribed", d.sub.remote)
continue
}
deliveryDuration := time.Since(deliveryStart)
log.D.F("subscription delivery SUCCESS: event=%s to=%s sub=%s duration=%v len=%d",
hex.Enc(ev.ID), d.sub.remote, d.id, deliveryDuration, len(msgData))
if !hasChan {
log.D.F("skipping delivery to %s - no write channel available", d.sub.remote)
continue
}
// Log slow deliveries for performance monitoring
if deliveryDuration > time.Millisecond*50 {
log.D.F("SLOW subscription delivery: event=%s to=%s duration=%v (>50ms)",
hex.Enc(ev.ID), d.sub.remote, deliveryDuration)
// Send to write channel - non-blocking with timeout
select {
case <-p.c.Done():
continue
case writeChan <- publish.WriteRequest{Data: msgData, MsgType: websocket.TextMessage, IsControl: false}:
log.D.F("subscription delivery QUEUED: event=%s to=%s sub=%s len=%d",
hex.Enc(ev.ID), d.sub.remote, d.id, len(msgData))
case <-time.After(DefaultWriteTimeout):
log.E.F("subscription delivery TIMEOUT: event=%s to=%s sub=%s (write channel full)",
hex.Enc(ev.ID), d.sub.remote, d.id)
// Check if connection is still valid
p.Mx.RLock()
stillSubscribed = p.Map[d.w] != nil
p.Mx.RUnlock()
if !stillSubscribed {
log.D.F("removing failed subscriber connection due to channel timeout: %s", d.sub.remote)
p.removeSubscriber(d.w)
}
}
}
}
@@ -340,6 +340,7 @@ func (p *P) removeSubscriberId(ws *websocket.Conn, id string) {
// Check the actual map after deletion, not the original reference
if len(p.Map[ws]) == 0 {
delete(p.Map, ws)
delete(p.WriteChans, ws)
}
}
}
@@ -350,6 +351,7 @@ func (p *P) removeSubscriber(ws *websocket.Conn) {
defer p.Mx.Unlock()
clear(p.Map[ws])
delete(p.Map, ws)
delete(p.WriteChans, ws)
}
// canSeePrivateEvent checks if the authenticated user can see an event with a private tag

8
go.mod
View File

@@ -20,13 +20,18 @@ require (
golang.org/x/lint v0.0.0-20241112194109-818c5a804067
golang.org/x/net v0.46.0
honnef.co/go/tools v0.6.1
lol.mleku.dev v1.0.4
lol.mleku.dev v1.0.5
lukechampine.com/frand v1.5.1
p256k1.mleku.dev v1.0.1
)
require (
github.com/BurntSushi/toml v1.5.0 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.3.6 // indirect
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/decred/dcrd/crypto/blake256 v1.0.0 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect
github.com/dgraph-io/ristretto/v2 v2.3.0 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/felixge/fgprof v0.9.5 // indirect
@@ -35,6 +40,7 @@ require (
github.com/google/flatbuffers v25.9.23+incompatible // indirect
github.com/google/pprof v0.0.0-20251007162407-5df77e3f7d1d // indirect
github.com/klauspost/compress v1.18.1 // indirect
github.com/minio/sha256-simd v1.0.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/templexxx/cpu v0.1.1 // indirect
go.opentelemetry.io/auto/sdk v1.2.1 // indirect

16
go.sum
View File

@@ -2,6 +2,10 @@ github.com/BurntSushi/toml v1.5.0 h1:W5quZX/G/csjUnuI8SUYlsHs9M38FC7znL0lIO+DvMg
github.com/BurntSushi/toml v1.5.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho=
github.com/adrg/xdg v0.5.3 h1:xRnxJXne7+oWDatRhR1JLnvuccuIeCoBu2rtuLqQB78=
github.com/adrg/xdg v0.5.3/go.mod h1:nlTsY+NNiCBGCK2tpm09vRqfVzrc2fLmXGpBLF0zlTQ=
github.com/btcsuite/btcd/btcec/v2 v2.3.6 h1:IzlsEr9olcSRKB/n7c4351F3xHKxS2lma+1UFGCYd4E=
github.com/btcsuite/btcd/btcec/v2 v2.3.6/go.mod h1:m22FrOAiuxl/tht9wIqAoGHcbnCCaPWyauO8y2LGGtQ=
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1 h1:q0rUy8C/TYNBQS1+CGKw68tLOFYSNEs0TFnxxnS9+4U=
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1/go.mod h1:7SFka0XMvUgj3hfZtydOrQY2mwhPclbT2snogU7SQQc=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chromedp/cdproto v0.0.0-20230802225258-3cf4e6d46a89/go.mod h1:GKljq0VrfU4D5yc+2qA6OVr8pmO/MBbPEWqWQ/oqGEs=
@@ -16,6 +20,10 @@ github.com/chzyer/test v1.0.0/go.mod h1:2JlltgoNkt4TW/z9V/IzDdFaMTM2JPIi26O1pF38
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/decred/dcrd/crypto/blake256 v1.0.0 h1:/8DMNYp9SGi5f0w7uCm6d6M4OU2rGFK09Y2A4Xv7EE0=
github.com/decred/dcrd/crypto/blake256 v1.0.0/go.mod h1:sQl2p6Y26YV+ZOcSTP6thNdn47hh8kt6rqSlvmrXFAc=
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 h1:YLtO71vCjJRCBcrPMtQ9nqBsqpA1m5sE92cU+pd5Mcc=
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1/go.mod h1:hyedUtir6IdtD/7lIxGeCxkaw7y45JueMRL4DIyJDKs=
github.com/dgraph-io/badger/v4 v4.8.0 h1:JYph1ChBijCw8SLeybvPINizbDKWZ5n/GYbz2yhN/bs=
github.com/dgraph-io/badger/v4 v4.8.0/go.mod h1:U6on6e8k/RTbUWxqKR0MvugJuVmkxSNc79ap4917h4w=
github.com/dgraph-io/ristretto/v2 v2.3.0 h1:qTQ38m7oIyd4GAed/QkUZyPFNMnvVWyazGXRwvOt5zk=
@@ -60,6 +68,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/ledongthuc/pdf v0.0.0-20220302134840-0c2507a12d80/go.mod h1:imJHygn/1yfhB7XSJJKlFZKl/J+dCPAknuiaGOshXAs=
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/minio/sha256-simd v1.0.1 h1:6kaan5IFmwTNynnKKpDHe6FWHohJOHhCPchzK49dzMM=
github.com/minio/sha256-simd v1.0.1/go.mod h1:Pz6AKMiUdngCLpeTL/RJY1M9rUuPMYujV5xJjtbRSN8=
github.com/orisano/pixelmatch v0.0.0-20220722002657-fb0b55479cde/go.mod h1:nZgzbfBr3hhjoZnS66nKrHmduYNpc34ny7RK4z5/HM0=
github.com/pkg/profile v1.7.0 h1:hnbDkaNWPCLMO9wGLdBFTIZvzDrDfBM2072E1S9gJkA=
github.com/pkg/profile v1.7.0/go.mod h1:8Uer0jas47ZQMJ7VD+OHknK4YDY07LPUC6dEvqDjvNo=
@@ -138,7 +148,9 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.6.1 h1:R094WgE8K4JirYjBaOpz/AvTyUu/3wbmAoskKN/pxTI=
honnef.co/go/tools v0.6.1/go.mod h1:3puzxxljPCe8RGJX7BIy1plGbxEOZni5mR2aXe3/uk4=
lol.mleku.dev v1.0.4 h1:SOngs7erj8J3nXz673kYFgXQHFO+jkCI1E2iOlpyzV8=
lol.mleku.dev v1.0.4/go.mod h1:DQ0WnmkntA9dPLCXgvtIgYt5G0HSqx3wSTLolHgWeLA=
lol.mleku.dev v1.0.5 h1:irwfwz+Scv74G/2OXmv05YFKOzUNOVZ735EAkYgjgM8=
lol.mleku.dev v1.0.5/go.mod h1:JlsqP0CZDLKRyd85XGcy79+ydSRqmFkrPzYFMYxQ+zs=
lukechampine.com/frand v1.5.1 h1:fg0eRtdmGFIxhP5zQJzM1lFDbD6CUfu/f+7WgAZd5/w=
lukechampine.com/frand v1.5.1/go.mod h1:4VstaWc2plN4Mjr10chUD46RAVGWhpkZ5Nja8+Azp0Q=
p256k1.mleku.dev v1.0.1 h1:4ZQ+2xNfKpL6+e9urKP6f/QdHKKUNIEsqvFwogpluZw=
p256k1.mleku.dev v1.0.1/go.mod h1:gY2ybEebhiSgSDlJ8ERgAe833dn2EDqs7aBsvwpgu0s=

View File

@@ -4,22 +4,18 @@ package p256k
import (
"lol.mleku.dev/log"
"next.orly.dev/pkg/crypto/p256k/btcec"
p256k1signer "p256k1.mleku.dev/signer"
)
func init() {
log.T.Ln("using btcec signature library")
log.T.Ln("using p256k1.mleku.dev/signer (pure Go/Btcec)")
}
// BTCECSigner is always available but enabling it disables the use of
// github.com/bitcoin-core/secp256k1 CGO signature implementation and points it at the btec
// version.
// Signer is an alias for the BtcecSigner type from p256k1.mleku.dev/signer (btcec version).
// This is used when CGO is not available.
type Signer = p256k1signer.BtcecSigner
type Signer = btcec.Signer
type Keygen = btcec.Keygen
// Keygen is an alias for the P256K1Gen type from p256k1.mleku.dev/signer (btcec version).
type Keygen = p256k1signer.P256K1Gen
func NewKeygen() (k *Keygen) { return new(Keygen) }
var NewSecFromHex = btcec.NewSecFromHex[string]
var NewPubFromHex = btcec.NewPubFromHex[string]
var HexToBin = btcec.HexToBin
var NewKeygen = p256k1signer.NewP256K1Gen

View File

@@ -1,6 +1,9 @@
// Package p256k is a signer interface that (by default) uses the
// bitcoin/libsecp256k1 library for fast signature creation and verification of
// the BIP-340 nostr X-only signatures and public keys, and ECDH.
// Package p256k provides a signer interface that uses p256k1.mleku.dev library for
// fast signature creation and verification of BIP-340 nostr X-only signatures and
// public keys, and ECDH.
//
// Currently the ECDH is only implemented with the btcec library.
// The package provides type aliases to p256k1.mleku.dev/signer:
// - cgo: Uses the CGO-optimized version from p256k1.mleku.dev
// - btcec: Uses the btcec version from p256k1.mleku.dev
// - default: Uses the pure Go version from p256k1.mleku.dev
package p256k

View File

@@ -0,0 +1,41 @@
//go:build !cgo
package p256k
import (
"lol.mleku.dev/chk"
"next.orly.dev/pkg/encoders/hex"
"next.orly.dev/pkg/interfaces/signer"
p256k1signer "p256k1.mleku.dev/signer"
)
func NewSecFromHex[V []byte | string](skh V) (sign signer.I, err error) {
sk := make([]byte, len(skh)/2)
if _, err = hex.DecBytes(sk, []byte(skh)); chk.E(err) {
return
}
sign = p256k1signer.NewBtcecSigner()
if err = sign.InitSec(sk); chk.E(err) {
return
}
return
}
func NewPubFromHex[V []byte | string](pkh V) (sign signer.I, err error) {
pk := make([]byte, len(pkh)/2)
if _, err = hex.DecBytes(pk, []byte(pkh)); chk.E(err) {
return
}
sign = p256k1signer.NewBtcecSigner()
if err = sign.InitPub(pk); chk.E(err) {
return
}
return
}
func HexToBin(hexStr string) (b []byte, err error) {
if b, err = hex.DecAppend(b, []byte(hexStr)); chk.E(err) {
return
}
return
}

View File

@@ -6,6 +6,7 @@ import (
"lol.mleku.dev/chk"
"next.orly.dev/pkg/encoders/hex"
"next.orly.dev/pkg/interfaces/signer"
p256k1signer "p256k1.mleku.dev/signer"
)
func NewSecFromHex[V []byte | string](skh V) (sign signer.I, err error) {
@@ -13,7 +14,7 @@ func NewSecFromHex[V []byte | string](skh V) (sign signer.I, err error) {
if _, err = hex.DecBytes(sk, []byte(skh)); chk.E(err) {
return
}
sign = &Signer{}
sign = p256k1signer.NewP256K1Signer()
if err = sign.InitSec(sk); chk.E(err) {
return
}
@@ -25,7 +26,7 @@ func NewPubFromHex[V []byte | string](pkh V) (sign signer.I, err error) {
if _, err = hex.DecBytes(pk, []byte(pkh)); chk.E(err) {
return
}
sign = &Signer{}
sign = p256k1signer.NewP256K1Signer()
if err = sign.InitPub(pk); chk.E(err) {
return
}

View File

@@ -2,139 +2,19 @@
package p256k
import "C"
import (
"lol.mleku.dev/chk"
"lol.mleku.dev/errorf"
"lol.mleku.dev/log"
"next.orly.dev/pkg/crypto/ec"
"next.orly.dev/pkg/crypto/ec/secp256k1"
"next.orly.dev/pkg/interfaces/signer"
p256k1signer "p256k1.mleku.dev/signer"
)
func init() {
log.T.Ln("using bitcoin/secp256k1 signature library")
log.T.Ln("using p256k1.mleku.dev/signer (CGO)")
}
// Signer implements the signer.I interface.
//
// Either the Sec or Pub must be populated, the former is for generating
// signatures, the latter is for verifying them.
//
// When using this library only for verification, a constructor that converts
// from bytes to PubKey is needed prior to calling Verify.
type Signer struct {
// SecretKey is the secret key.
SecretKey *SecKey
// PublicKey is the public key.
PublicKey *PubKey
// BTCECSec is needed for ECDH as currently the CGO bindings don't include it
BTCECSec *btcec.SecretKey
skb, pkb []byte
}
// Signer is an alias for the P256K1Signer type from p256k1.mleku.dev/signer (cgo version).
type Signer = p256k1signer.P256K1Signer
var _ signer.I = &Signer{}
// Keygen is an alias for the P256K1Gen type from p256k1.mleku.dev/signer (cgo version).
type Keygen = p256k1signer.P256K1Gen
// Generate a new Signer key pair using the CGO bindings to libsecp256k1
func (s *Signer) Generate() (err error) {
var cs *Sec
var cx *XPublicKey
if s.skb, s.pkb, cs, cx, err = Generate(); chk.E(err) {
return
}
s.SecretKey = &cs.Key
s.PublicKey = cx.Key
s.BTCECSec, _ = btcec.PrivKeyFromBytes(s.skb)
return
}
func (s *Signer) InitSec(skb []byte) (err error) {
var cs *Sec
var cx *XPublicKey
// var cp *PublicKey
if s.pkb, cs, cx, err = FromSecretBytes(skb); chk.E(err) {
if err.Error() != "provided secret generates a public key with odd Y coordinate, fixed version returned" {
log.E.Ln(err)
return
}
}
s.skb = skb
s.SecretKey = &cs.Key
s.PublicKey = cx.Key
// s.ECPublicKey = cp.Key
// needed for ecdh
s.BTCECSec, _ = btcec.PrivKeyFromBytes(s.skb)
return
}
func (s *Signer) InitPub(pub []byte) (err error) {
var up *Pub
if up, err = PubFromBytes(pub); chk.E(err) {
return
}
s.PublicKey = &up.Key
s.pkb = up.PubB()
return
}
func (s *Signer) Sec() (b []byte) {
if s == nil {
return nil
}
return s.skb
}
func (s *Signer) Pub() (b []byte) {
if s == nil {
return nil
}
return s.pkb
}
// func (s *Signer) ECPub() (b []byte) { return s.pkb }
func (s *Signer) Sign(msg []byte) (sig []byte, err error) {
if s.SecretKey == nil {
err = errorf.E("p256k: I secret not initialized")
return
}
u := ToUchar(msg)
if sig, err = Sign(u, s.SecretKey); chk.E(err) {
return
}
return
}
func (s *Signer) Verify(msg, sig []byte) (valid bool, err error) {
if s.PublicKey == nil {
err = errorf.E("p256k: Pubkey not initialized")
return
}
var uMsg, uSig *Uchar
if uMsg, err = Msg(msg); chk.E(err) {
return
}
if uSig, err = Sig(sig); chk.E(err) {
return
}
valid = Verify(uMsg, uSig, s.PublicKey)
if !valid {
err = errorf.E("p256k: invalid signature")
}
return
}
func (s *Signer) ECDH(pubkeyBytes []byte) (secret []byte, err error) {
var pub *secp256k1.PublicKey
if pub, err = secp256k1.ParsePubKey(
append(
[]byte{0x02},
pubkeyBytes...,
),
); chk.E(err) {
return
}
secret = btcec.GenerateSharedSecret(s.BTCECSec, pub)
return
}
func (s *Signer) Zero() { Zero(s.SecretKey) }
var NewKeygen = p256k1signer.NewP256K1Gen

View File

@@ -1,426 +0,0 @@
//go:build cgo
package p256k
import (
"crypto/rand"
"unsafe"
"lol.mleku.dev/chk"
"lol.mleku.dev/errorf"
"lol.mleku.dev/log"
"next.orly.dev/pkg/crypto/ec/schnorr"
"next.orly.dev/pkg/crypto/ec/secp256k1"
"next.orly.dev/pkg/crypto/sha256"
)
/*
#cgo LDFLAGS: -lsecp256k1
#include <secp256k1.h>
#include <secp256k1_schnorrsig.h>
#include <secp256k1_extrakeys.h>
*/
import "C"
type (
Context = C.secp256k1_context
Uchar = C.uchar
Cint = C.int
SecKey = C.secp256k1_keypair
PubKey = C.secp256k1_xonly_pubkey
ECPubKey = C.secp256k1_pubkey
)
var (
ctx *Context
)
func CreateContext() *Context {
return C.secp256k1_context_create(
C.SECP256K1_CONTEXT_SIGN |
C.SECP256K1_CONTEXT_VERIFY,
)
}
func GetRandom() (u *Uchar) {
rnd := make([]byte, 32)
_, _ = rand.Read(rnd)
return ToUchar(rnd)
}
func AssertLen(b []byte, length int, name string) (err error) {
if len(b) != length {
err = errorf.E("%s should be %d bytes, got %d", name, length, len(b))
}
return
}
func RandomizeContext(ctx *C.secp256k1_context) {
C.secp256k1_context_randomize(ctx, GetRandom())
return
}
func CreateRandomContext() (c *Context) {
c = CreateContext()
RandomizeContext(c)
return
}
func init() {
if ctx = CreateContext(); ctx == nil {
panic("failed to create secp256k1 context")
}
}
func ToUchar(b []byte) (u *Uchar) { return (*Uchar)(unsafe.Pointer(&b[0])) }
type Sec struct {
Key SecKey
}
func GenSec() (sec *Sec, err error) {
if _, _, sec, _, err = Generate(); chk.E(err) {
return
}
return
}
func SecFromBytes(sk []byte) (sec *Sec, err error) {
sec = new(Sec)
if C.secp256k1_keypair_create(ctx, &sec.Key, ToUchar(sk)) != 1 {
err = errorf.E("failed to parse private key")
return
}
return
}
func (s *Sec) Sec() *SecKey { return &s.Key }
func (s *Sec) Pub() (p *Pub, err error) {
p = new(Pub)
if C.secp256k1_keypair_xonly_pub(ctx, &p.Key, nil, s.Sec()) != 1 {
err = errorf.E("pubkey derivation failed")
return
}
return
}
// type PublicKey struct {
// Key *C.secp256k1_pubkey
// }
//
// func NewPublicKey() *PublicKey {
// return &PublicKey{
// Key: &C.secp256k1_pubkey{},
// }
// }
type XPublicKey struct {
Key *C.secp256k1_xonly_pubkey
}
func NewXPublicKey() *XPublicKey {
return &XPublicKey{
Key: &C.secp256k1_xonly_pubkey{},
}
}
// FromSecretBytes parses and processes what should be a secret key. If it is a correct key within the curve order, but
// with a public key having an odd Y coordinate, it returns an error with the fixed key.
func FromSecretBytes(skb []byte) (
pkb []byte,
sec *Sec,
pub *XPublicKey,
// ecPub *PublicKey,
err error,
) {
xpkb := make([]byte, schnorr.PubKeyBytesLen)
// clen := C.size_t(secp256k1.PubKeyBytesLenCompressed - 1)
pkb = make([]byte, schnorr.PubKeyBytesLen)
var parity Cint
// ecPub = NewPublicKey()
pub = NewXPublicKey()
sec = &Sec{}
uskb := ToUchar(skb)
res := C.secp256k1_keypair_create(ctx, &sec.Key, uskb)
if res != 1 {
err = errorf.E("failed to create secp256k1 keypair")
return
}
// C.secp256k1_keypair_pub(ctx, ecPub.Key, &sec.Key)
// C.secp256k1_ec_pubkey_serialize(ctx, ToUchar(ecpkb), &clen, ecPub.Key,
// C.SECP256K1_EC_COMPRESSED)
// if ecpkb[0] != 2 {
// log.W.ToSliceOfBytes("odd pubkey from %0x -> %0x", skb, ecpkb)
// Negate(skb)
// uskb = ToUchar(skb)
// res = C.secp256k1_keypair_create(ctx, &sec.Key, uskb)
// if res != 1 {
// err = errorf.E("failed to create secp256k1 keypair")
// return
// }
// C.secp256k1_keypair_pub(ctx, ecPub.Key, &sec.Key)
// C.secp256k1_ec_pubkey_serialize(ctx, ToUchar(ecpkb), &clen, ecPub.Key, C.SECP256K1_EC_COMPRESSED)
// C.secp256k1_keypair_xonly_pub(ctx, pub.Key, &parity, &sec.Key)
// err = errors.New("provided secret generates a public key with odd Y coordinate, fixed version returned")
// }
C.secp256k1_keypair_xonly_pub(ctx, pub.Key, &parity, &sec.Key)
C.secp256k1_xonly_pubkey_serialize(ctx, ToUchar(xpkb), pub.Key)
pkb = xpkb
// log.I.S(sec, pub, skb, pkb)
return
}
// Generate gathers entropy to generate a full set of bytes and CGO values of it and derived from it to perform
// signature and ECDH operations.
func Generate() (
skb, pkb []byte,
sec *Sec,
pub *XPublicKey,
err error,
) {
skb = make([]byte, secp256k1.SecKeyBytesLen)
pkb = make([]byte, schnorr.PubKeyBytesLen)
upkb := ToUchar(pkb)
var parity Cint
pub = NewXPublicKey()
sec = &Sec{}
for {
if _, err = rand.Read(skb); chk.E(err) {
return
}
uskb := ToUchar(skb)
if res := C.secp256k1_keypair_create(ctx, &sec.Key, uskb); res != 1 {
err = errorf.E("failed to create secp256k1 keypair")
continue
}
C.secp256k1_keypair_xonly_pub(ctx, pub.Key, &parity, &sec.Key)
C.secp256k1_xonly_pubkey_serialize(ctx, upkb, pub.Key)
break
}
return
}
// Negate inverts a secret key so an odd prefix bit becomes even and vice versa.
func Negate(uskb []byte) { C.secp256k1_ec_seckey_negate(ctx, ToUchar(uskb)) }
type ECPub struct {
Key ECPubKey
}
// ECPubFromSchnorrBytes converts a BIP-340 public key to its even standard 33 byte encoding.
//
// This function is for the purpose of getting a key to do ECDH from an x-only key.
func ECPubFromSchnorrBytes(xkb []byte) (pub *ECPub, err error) {
if err = AssertLen(xkb, schnorr.PubKeyBytesLen, "pubkey"); chk.E(err) {
return
}
pub = &ECPub{}
p := append([]byte{0}, xkb...)
if C.secp256k1_ec_pubkey_parse(
ctx, &pub.Key, ToUchar(p),
secp256k1.PubKeyBytesLenCompressed,
) != 1 {
err = errorf.E("failed to parse pubkey from %0x", p)
log.I.S(pub)
return
}
return
}
// // ECPubFromBytes parses a pubkey from 33 bytes to the bitcoin-core/secp256k1 struct.
// func ECPubFromBytes(pkb []byte) (pub *ECPub, err error) {
// if err = AssertLen(pkb, secp256k1.PubKeyBytesLenCompressed, "pubkey"); chk.E(err) {
// return
// }
// pub = &ECPub{}
// if C.secp256k1_ec_pubkey_parse(ctx, &pub.Key, ToUchar(pkb),
// secp256k1.PubKeyBytesLenCompressed) != 1 {
// err = errorf.E("failed to parse pubkey from %0x", pkb)
// log.I.S(pub)
// return
// }
// return
// }
// Pub is a schnorr BIP-340 public key.
type Pub struct {
Key PubKey
}
// PubFromBytes creates a public key from raw bytes.
func PubFromBytes(pk []byte) (pub *Pub, err error) {
if err = AssertLen(pk, schnorr.PubKeyBytesLen, "pubkey"); chk.E(err) {
return
}
pub = new(Pub)
if C.secp256k1_xonly_pubkey_parse(ctx, &pub.Key, ToUchar(pk)) != 1 {
err = errorf.E("failed to parse pubkey from %0x", pk)
return
}
return
}
// PubB returns the contained public key as bytes.
func (p *Pub) PubB() (b []byte) {
b = make([]byte, schnorr.PubKeyBytesLen)
C.secp256k1_xonly_pubkey_serialize(ctx, ToUchar(b), &p.Key)
return
}
// Pub returns the public key as a PubKey.
func (p *Pub) Pub() *PubKey { return &p.Key }
// ToBytes returns the contained public key as bytes.
func (p *Pub) ToBytes() (b []byte, err error) {
b = make([]byte, schnorr.PubKeyBytesLen)
if C.secp256k1_xonly_pubkey_serialize(ctx, ToUchar(b), p.Pub()) != 1 {
err = errorf.E("pubkey serialize failed")
return
}
return
}
// Sign a message and return a schnorr BIP-340 64 byte signature.
func Sign(msg *Uchar, sk *SecKey) (sig []byte, err error) {
sig = make([]byte, schnorr.SignatureSize)
c := CreateRandomContext()
if C.secp256k1_schnorrsig_sign32(
c, ToUchar(sig), msg, sk,
GetRandom(),
) != 1 {
err = errorf.E("failed to sign message")
return
}
return
}
// SignFromBytes Signs a message using a provided secret key and message as raw bytes.
func SignFromBytes(msg, sk []byte) (sig []byte, err error) {
var umsg *Uchar
if umsg, err = Msg(msg); chk.E(err) {
return
}
var sec *Sec
if sec, err = SecFromBytes(sk); chk.E(err) {
return
}
return Sign(umsg, sec.Sec())
}
// Msg checks that a message hash is correct, and converts it for use with a Signer.
func Msg(b []byte) (id *Uchar, err error) {
if err = AssertLen(b, sha256.Size, "id"); chk.E(err) {
return
}
id = ToUchar(b)
return
}
// Sig checks that a signature bytes is correct, and converts it for use with a Signer.
func Sig(b []byte) (sig *Uchar, err error) {
if err = AssertLen(b, schnorr.SignatureSize, "sig"); chk.E(err) {
return
}
sig = ToUchar(b)
return
}
// Verify a message signature matches the provided PubKey.
func Verify(msg, sig *Uchar, pk *PubKey) (valid bool) {
return C.secp256k1_schnorrsig_verify(ctx, sig, msg, 32, pk) == 1
}
// VerifyFromBytes a signature from the raw bytes of the message hash, signature and public key
func VerifyFromBytes(msg, sig, pk []byte) (err error) {
var umsg, usig *Uchar
if umsg, err = Msg(msg); chk.E(err) {
return
}
if usig, err = Sig(sig); chk.E(err) {
return
}
var pub *Pub
if pub, err = PubFromBytes(pk); chk.E(err) {
return
}
valid := Verify(umsg, usig, pub.Pub())
if !valid {
err = errorf.E("failed to verify signature")
}
return
}
// Zero wipes the memory of a SecKey by overwriting it three times with random data and then
// zeroing it.
func Zero(sk *SecKey) {
b := (*[96]byte)(unsafe.Pointer(sk))[:96]
for range 3 {
rand.Read(b)
// reverse the order and negate
lb := len(b)
l := lb / 2
for j := range l {
b[j] = ^b[lb-1-j]
}
}
for i := range b {
b[i] = 0
}
}
// Keygen is an implementation of a key miner designed to be used for vanity key generation with X-only BIP-340 keys.
type Keygen struct {
secBytes, comprPubBytes []byte
secUchar, cmprPubUchar *Uchar
sec *Sec
// ecpub *PublicKey
cmprLen C.size_t
}
// NewKeygen allocates the required buffers for deriving a key. This should only be done once to avoid garbage and make
// the key mining as fast as possible.
//
// This allocates everything and creates proper CGO variables needed for the generate function so they only need to be
// allocated once per thread.
func NewKeygen() (k *Keygen) {
k = new(Keygen)
k.cmprLen = C.size_t(secp256k1.PubKeyBytesLenCompressed)
k.secBytes = make([]byte, secp256k1.SecKeyBytesLen)
k.comprPubBytes = make([]byte, secp256k1.PubKeyBytesLenCompressed)
k.secUchar = ToUchar(k.secBytes)
k.cmprPubUchar = ToUchar(k.comprPubBytes)
k.sec = &Sec{}
// k.ecpub = NewPublicKey()
return
}
// Generate takes a pair of buffers for the secret and ec pubkey bytes and gathers new entropy and returns a valid
// secret key and the compressed pubkey bytes for the partial collision search.
//
// The first byte of pubBytes must be sliced off before deriving the hex/Bech32 forms of the nostr public key.
func (k *Keygen) Generate() (
sec *Sec,
pub *XPublicKey,
pubBytes []byte,
err error,
) {
if _, err = rand.Read(k.secBytes); chk.E(err) {
return
}
if res := C.secp256k1_keypair_create(
ctx, &k.sec.Key, k.secUchar,
); res != 1 {
err = errorf.E("failed to create secp256k1 keypair")
return
}
var parity Cint
C.secp256k1_keypair_xonly_pub(ctx, pub.Key, &parity, &sec.Key)
// C.secp256k1_keypair_pub(ctx, k.ecpub.Key, &k.sec.Key)
// C.secp256k1_ec_pubkey_serialize(ctx, k.cmprPubUchar, &k.cmprLen, k.ecpub.Key,
// C.SECP256K1_EC_COMPRESSED)
// pubBytes = k.comprPubBytes
C.secp256k1_xonly_pubkey_serialize(ctx, ToUchar(pubBytes), pub.Key)
// pubBytes =
return
}

View File

@@ -1,11 +1,28 @@
package publish
import (
"time"
"github.com/gorilla/websocket"
"next.orly.dev/pkg/encoders/event"
"next.orly.dev/pkg/interfaces/publisher"
"next.orly.dev/pkg/interfaces/typer"
)
// WriteRequest represents a write operation to be performed by the write worker
type WriteRequest struct {
Data []byte
MsgType int
IsControl bool
Deadline time.Time
}
// WriteChanSetter defines the interface for setting write channels
type WriteChanSetter interface {
SetWriteChan(*websocket.Conn, chan<- WriteRequest)
GetWriteChan(*websocket.Conn) (chan<- WriteRequest, bool)
}
// S is the control structure for the subscription management scheme.
type S struct {
publisher.Publishers
@@ -36,3 +53,15 @@ func (s *S) Receive(msg typer.T) {
}
}
}
// GetSocketPublisher returns the socketapi publisher instance
func (s *S) GetSocketPublisher() WriteChanSetter {
for _, p := range s.Publishers {
if p.Type() == "socketapi" {
if socketPub, ok := p.(WriteChanSetter); ok {
return socketPub
}
}
}
return nil
}

View File

@@ -1 +1 @@
v0.21.4
v0.23.0