Add DMRelaysList.K to kind.go and improve error messages with remote address in connection.go
- Added `DMRelaysList.K` constant to `pkg/encoders/kind/kind.go` - Removed unused `"errors"` import from `pkg/protocol/ws/connection.go` - Updated error messages in `WriteMessage`, `ReadMessage`, and related functions in `pkg/protocol/ws/connection.go` to include the remote address for better debugging - Changed error handling from `chk.E(err)` to `chk.T(err)` in `pkg/encoders/envelopes/eventenvelope/eventenvelope.go` - Updated ticker interval from 30 minutes to 1 hour in `pkg/app/relay/server.go`
This commit is contained in:
@@ -191,7 +191,7 @@ func (s *Server) Start(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
// start up a spider run to trigger every 30 minutes
|
// start up a spider run to trigger every 30 minutes
|
||||||
ticker := time.NewTicker(30 * time.Minute)
|
ticker := time.NewTicker(time.Hour)
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
|||||||
@@ -158,7 +158,7 @@ func (en *Result) Unmarshal(b []byte) (r []byte, err error) {
|
|||||||
// envelope into it.
|
// envelope into it.
|
||||||
func ParseResult(b []byte) (t *Result, rem []byte, err error) {
|
func ParseResult(b []byte) (t *Result, rem []byte, err error) {
|
||||||
t = NewResult()
|
t = NewResult()
|
||||||
if rem, err = t.Unmarshal(b); chk.E(err) {
|
if rem, err = t.Unmarshal(b); chk.T(err) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
|
|||||||
@@ -378,6 +378,7 @@ var Map = map[uint16]string{
|
|||||||
SearchRelaysList.K: "SearchRelaysList",
|
SearchRelaysList.K: "SearchRelaysList",
|
||||||
InterestsList.K: "InterestsList",
|
InterestsList.K: "InterestsList",
|
||||||
UserEmojiList.K: "UserEmojiList",
|
UserEmojiList.K: "UserEmojiList",
|
||||||
|
DMRelaysList.K: "DMRelaysList",
|
||||||
FileStorageServerList.K: "FileStorageServerList",
|
FileStorageServerList.K: "FileStorageServerList",
|
||||||
NWCWalletInfo.K: "NWCWalletInfo",
|
NWCWalletInfo.K: "NWCWalletInfo",
|
||||||
LightningPubRPC.K: "LightningPubRPC",
|
LightningPubRPC.K: "LightningPubRPC",
|
||||||
|
|||||||
@@ -4,7 +4,6 @@ import (
|
|||||||
"bytes"
|
"bytes"
|
||||||
"compress/flate"
|
"compress/flate"
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"errors"
|
|
||||||
"github.com/gobwas/httphead"
|
"github.com/gobwas/httphead"
|
||||||
"github.com/gobwas/ws"
|
"github.com/gobwas/ws"
|
||||||
"github.com/gobwas/ws/wsflate"
|
"github.com/gobwas/ws/wsflate"
|
||||||
@@ -113,7 +112,10 @@ func NewConnection(
|
|||||||
func (cn *Connection) WriteMessage(c context.T, data []byte) (err error) {
|
func (cn *Connection) WriteMessage(c context.T, data []byte) (err error) {
|
||||||
select {
|
select {
|
||||||
case <-c.Done():
|
case <-c.Done():
|
||||||
return errors.New("context canceled")
|
return errorf.E(
|
||||||
|
"%s context canceled",
|
||||||
|
cn.conn.RemoteAddr(),
|
||||||
|
)
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
if cn.msgStateW.IsCompressed() && cn.enableCompression {
|
if cn.msgStateW.IsCompressed() && cn.enableCompression {
|
||||||
@@ -121,19 +123,35 @@ func (cn *Connection) WriteMessage(c context.T, data []byte) (err error) {
|
|||||||
if _, err := io.Copy(
|
if _, err := io.Copy(
|
||||||
cn.flateWriter, bytes.NewReader(data),
|
cn.flateWriter, bytes.NewReader(data),
|
||||||
); chk.T(err) {
|
); chk.T(err) {
|
||||||
return errorf.E("failed to write message: %w", err)
|
return errorf.E(
|
||||||
|
"%s failed to write message: %w",
|
||||||
|
cn.conn.RemoteAddr(),
|
||||||
|
err,
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := cn.flateWriter.Close(); chk.T(err) {
|
if err := cn.flateWriter.Close(); chk.T(err) {
|
||||||
return errorf.E("failed to close flate writer: %w", err)
|
return errorf.E(
|
||||||
|
"%s failed to close flate writer: %w",
|
||||||
|
cn.conn.RemoteAddr(),
|
||||||
|
err,
|
||||||
|
)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if _, err := io.Copy(cn.writer, bytes.NewReader(data)); chk.T(err) {
|
if _, err := io.Copy(cn.writer, bytes.NewReader(data)); chk.T(err) {
|
||||||
return errorf.E("failed to write message: %w", err)
|
return errorf.E(
|
||||||
|
"%s failed to write message: %w",
|
||||||
|
cn.conn.RemoteAddr(),
|
||||||
|
err,
|
||||||
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err := cn.writer.Flush(); chk.T(err) {
|
if err := cn.writer.Flush(); chk.T(err) {
|
||||||
return errorf.E("failed to flush writer: %w", err)
|
return errorf.E(
|
||||||
|
"%s failed to flush writer: %w",
|
||||||
|
cn.conn.RemoteAddr(),
|
||||||
|
err,
|
||||||
|
)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -143,34 +161,57 @@ func (cn *Connection) ReadMessage(c context.T, buf io.Writer) (err error) {
|
|||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-c.Done():
|
case <-c.Done():
|
||||||
return errors.New("context canceled")
|
return errorf.D(
|
||||||
|
"%s context canceled",
|
||||||
|
cn.conn.RemoteAddr(),
|
||||||
|
)
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
h, err := cn.reader.NextFrame()
|
h, err := cn.reader.NextFrame()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cn.conn.Close()
|
cn.conn.Close()
|
||||||
return errorf.E("failed to advance frame: %w", err)
|
return errorf.E(
|
||||||
|
"%s failed to advance frame: %s",
|
||||||
|
cn.conn.RemoteAddr(),
|
||||||
|
err.Error(),
|
||||||
|
)
|
||||||
}
|
}
|
||||||
if h.OpCode.IsControl() {
|
if h.OpCode.IsControl() {
|
||||||
if err := cn.controlHandler(h, cn.reader); chk.T(err) {
|
if err := cn.controlHandler(h, cn.reader); chk.T(err) {
|
||||||
return errorf.E("failed to handle control frame: %w", err)
|
return errorf.E(
|
||||||
|
"%s failed to handle control frame: %w",
|
||||||
|
cn.conn.RemoteAddr(),
|
||||||
|
err,
|
||||||
|
)
|
||||||
}
|
}
|
||||||
} else if h.OpCode == ws.OpBinary ||
|
} else if h.OpCode == ws.OpBinary ||
|
||||||
h.OpCode == ws.OpText {
|
h.OpCode == ws.OpText {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
if err := cn.reader.Discard(); chk.T(err) {
|
if err := cn.reader.Discard(); chk.T(err) {
|
||||||
return errorf.E("failed to discard: %w", err)
|
return errorf.E(
|
||||||
|
"%s failed to discard: %w",
|
||||||
|
cn.conn.RemoteAddr(),
|
||||||
|
err,
|
||||||
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if cn.msgStateR.IsCompressed() && cn.enableCompression {
|
if cn.msgStateR.IsCompressed() && cn.enableCompression {
|
||||||
cn.flateReader.Reset(cn.reader)
|
cn.flateReader.Reset(cn.reader)
|
||||||
if _, err := io.Copy(buf, cn.flateReader); chk.T(err) {
|
if _, err := io.Copy(buf, cn.flateReader); chk.T(err) {
|
||||||
return errorf.E("failed to read message: %w", err)
|
return errorf.E(
|
||||||
|
"%s failed to read message: %w",
|
||||||
|
cn.conn.RemoteAddr(),
|
||||||
|
err,
|
||||||
|
)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if _, err := io.Copy(buf, cn.reader); chk.T(err) {
|
if _, err := io.Copy(buf, cn.reader); chk.T(err) {
|
||||||
return errorf.E("failed to read message: %w", err)
|
return errorf.E(
|
||||||
|
"%s failed to read message: %w",
|
||||||
|
cn.conn.RemoteAddr(),
|
||||||
|
err,
|
||||||
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
Reference in New Issue
Block a user