Update version to v0.21.0 and enhance relay client functionality
- Bumped version from v0.20.6 to v0.21.0. - Added a `complete` map in the Client struct to track subscription completion status. - Improved event handling in the read loop to manage EOSE messages and subscription closures. - Introduced new tests for filtering, event ordering, and subscription behaviors, enhancing test coverage and reliability.
This commit is contained in:
@@ -14,14 +14,15 @@ import (
|
||||
|
||||
// Client wraps a WebSocket connection to a relay for testing.
|
||||
type Client struct {
|
||||
conn *websocket.Conn
|
||||
url string
|
||||
mu sync.Mutex
|
||||
subs map[string]chan []byte
|
||||
okCh chan []byte // Channel for OK messages
|
||||
countCh chan []byte // Channel for COUNT messages
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
conn *websocket.Conn
|
||||
url string
|
||||
mu sync.Mutex
|
||||
subs map[string]chan []byte
|
||||
complete map[string]bool // Track if subscription is complete (e.g., by ID)
|
||||
okCh chan []byte // Channel for OK messages
|
||||
countCh chan []byte // Channel for COUNT messages
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
// NewClient creates a new test client connected to the relay.
|
||||
@@ -36,13 +37,14 @@ func NewClient(url string) (c *Client, err error) {
|
||||
return
|
||||
}
|
||||
c = &Client{
|
||||
conn: conn,
|
||||
url: url,
|
||||
subs: make(map[string]chan []byte),
|
||||
okCh: make(chan []byte, 100),
|
||||
countCh: make(chan []byte, 100),
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
conn: conn,
|
||||
url: url,
|
||||
subs: make(map[string]chan []byte),
|
||||
complete: make(map[string]bool),
|
||||
okCh: make(chan []byte, 100),
|
||||
countCh: make(chan []byte, 100),
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
}
|
||||
go c.readLoop()
|
||||
return
|
||||
@@ -109,8 +111,17 @@ func (c *Client) readLoop() {
|
||||
if len(raw) >= 2 {
|
||||
if subID, ok := raw[1].(string); ok {
|
||||
if ch, exists := c.subs[subID]; exists {
|
||||
close(ch)
|
||||
delete(c.subs, subID)
|
||||
// Send EOSE message to channel
|
||||
select {
|
||||
case ch <- msg:
|
||||
default:
|
||||
}
|
||||
// For complete subscriptions (by ID), close the channel after EOSE
|
||||
if c.complete[subID] {
|
||||
close(ch)
|
||||
delete(c.subs, subID)
|
||||
delete(c.complete, subID)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -147,6 +158,19 @@ func (c *Client) Subscribe(subID string, filters []interface{}) (ch chan []byte,
|
||||
c.mu.Lock()
|
||||
ch = make(chan []byte, 100)
|
||||
c.subs[subID] = ch
|
||||
// Check if subscription is complete (has 'ids' filter)
|
||||
isComplete := false
|
||||
for _, f := range filters {
|
||||
if fMap, ok := f.(map[string]interface{}); ok {
|
||||
if ids, exists := fMap["ids"]; exists {
|
||||
if idList, ok := ids.([]string); ok && len(idList) > 0 {
|
||||
isComplete = true
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
c.complete[subID] = isComplete
|
||||
c.mu.Unlock()
|
||||
return
|
||||
}
|
||||
@@ -165,6 +189,7 @@ func (c *Client) Unsubscribe(subID string) error {
|
||||
close(ch)
|
||||
}()
|
||||
delete(c.subs, subID)
|
||||
delete(c.complete, subID)
|
||||
}
|
||||
c.mu.Unlock()
|
||||
return c.Send([]interface{}{"CLOSE", subID})
|
||||
@@ -269,14 +294,27 @@ func (c *Client) GetEvents(subID string, filters []interface{}, timeout time.Dur
|
||||
if err = json.Unmarshal(msg, &raw); err != nil {
|
||||
continue
|
||||
}
|
||||
if len(raw) >= 3 && raw[0] == "EVENT" {
|
||||
if evData, ok := raw[2].(map[string]interface{}); ok {
|
||||
evJSON, _ := json.Marshal(evData)
|
||||
ev := event.New()
|
||||
if _, err = ev.Unmarshal(evJSON); err == nil {
|
||||
events = append(events, ev)
|
||||
if len(raw) < 2 {
|
||||
continue
|
||||
}
|
||||
typ, ok := raw[0].(string)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
switch typ {
|
||||
case "EVENT":
|
||||
if len(raw) >= 3 {
|
||||
if evData, ok := raw[2].(map[string]interface{}); ok {
|
||||
evJSON, _ := json.Marshal(evData)
|
||||
ev := event.New()
|
||||
if _, err = ev.Unmarshal(evJSON); err == nil {
|
||||
events = append(events, ev)
|
||||
}
|
||||
}
|
||||
}
|
||||
case "EOSE":
|
||||
// End of stored events - return what we have
|
||||
return events, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user