- Introduced a new skill for Golang, providing comprehensive guidance on writing, debugging, and best practices for Go programming. - Added reference materials including effective Go guidelines, common patterns, and a quick reference cheat sheet to support users in Go development. - Created a skill creator guide to assist in developing new skills with structured templates and resource management. - Implemented scripts for skill initialization and packaging to streamline the skill creation process.
360 lines
10 KiB
Go
360 lines
10 KiB
Go
package app
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/gorilla/websocket"
|
|
"lol.mleku.dev/chk"
|
|
"lol.mleku.dev/log"
|
|
"next.orly.dev/pkg/acl"
|
|
"next.orly.dev/pkg/encoders/envelopes/eventenvelope"
|
|
"next.orly.dev/pkg/encoders/event"
|
|
"next.orly.dev/pkg/encoders/filter"
|
|
"next.orly.dev/pkg/encoders/hex"
|
|
"next.orly.dev/pkg/encoders/kind"
|
|
"next.orly.dev/pkg/interfaces/publisher"
|
|
"next.orly.dev/pkg/interfaces/typer"
|
|
"next.orly.dev/pkg/protocol/publish"
|
|
"next.orly.dev/pkg/utils"
|
|
)
|
|
|
|
const Type = "socketapi"
|
|
|
|
// WriteChanMap maps websocket connections to their write channels
|
|
type WriteChanMap map[*websocket.Conn]chan publish.WriteRequest
|
|
|
|
type Subscription struct {
|
|
remote string
|
|
AuthedPubkey []byte
|
|
*filter.S
|
|
}
|
|
|
|
// Map is a map of filters associated with a collection of ws.Listener
|
|
// connections.
|
|
type Map map[*websocket.Conn]map[string]Subscription
|
|
|
|
type W struct {
|
|
*websocket.Conn
|
|
|
|
remote string
|
|
|
|
// If Cancel is true, this is a close command.
|
|
Cancel bool
|
|
|
|
// Id is the subscription Id. If Cancel is true, cancel the named
|
|
// subscription, otherwise, cancel the publisher for the socket.
|
|
Id string
|
|
|
|
// The Receiver holds the event channel for receiving notifications or data
|
|
// relevant to this WebSocket connection.
|
|
Receiver event.C
|
|
|
|
// Filters holds a collection of filters used to match or process events
|
|
// associated with this WebSocket connection. It is used to determine which
|
|
// notifications or data should be received by the subscriber.
|
|
Filters *filter.S
|
|
|
|
// AuthedPubkey is the authenticated pubkey associated with the listener (if any).
|
|
AuthedPubkey []byte
|
|
}
|
|
|
|
func (w *W) Type() (typeName string) { return Type }
|
|
|
|
// P is a structure that manages subscriptions and associated filters for
|
|
// websocket listeners. It uses a mutex to synchronize access to a map storing
|
|
// subscriber connections and their filter configurations.
|
|
type P struct {
|
|
c context.Context
|
|
// Mx is the mutex for the Map.
|
|
Mx sync.RWMutex
|
|
// Map is the map of subscribers and subscriptions from the websocket api.
|
|
Map
|
|
// WriteChans maps websocket connections to their write channels
|
|
WriteChans WriteChanMap
|
|
}
|
|
|
|
var _ publisher.I = &P{}
|
|
|
|
func NewPublisher(c context.Context) (publisher *P) {
|
|
return &P{
|
|
c: c,
|
|
Map: make(Map),
|
|
WriteChans: make(WriteChanMap, 100),
|
|
}
|
|
}
|
|
|
|
func (p *P) Type() (typeName string) { return Type }
|
|
|
|
|
|
// Receive handles incoming messages to manage websocket listener subscriptions
|
|
// and associated filters.
|
|
//
|
|
// # Parameters
|
|
//
|
|
// - msg (publisher.Message): The incoming message to process; expected to be of
|
|
// type *W to trigger subscription management actions.
|
|
//
|
|
// # Expected behaviour
|
|
//
|
|
// - Checks if the message is of type *W.
|
|
//
|
|
// - If Cancel is true, removes a subscriber by ID or the entire listener.
|
|
//
|
|
// - Otherwise, adds the subscription to the map under a mutex lock.
|
|
//
|
|
// - Logs actions related to subscription creation or removal.
|
|
func (p *P) Receive(msg typer.T) {
|
|
if m, ok := msg.(*W); ok {
|
|
if m.Cancel {
|
|
if m.Id == "" {
|
|
p.removeSubscriber(m.Conn)
|
|
} else {
|
|
p.removeSubscriberId(m.Conn, m.Id)
|
|
}
|
|
return
|
|
}
|
|
p.Mx.Lock()
|
|
defer p.Mx.Unlock()
|
|
if subs, ok := p.Map[m.Conn]; !ok {
|
|
subs = make(map[string]Subscription)
|
|
subs[m.Id] = Subscription{
|
|
S: m.Filters, remote: m.remote, AuthedPubkey: m.AuthedPubkey,
|
|
}
|
|
p.Map[m.Conn] = subs
|
|
} else {
|
|
subs[m.Id] = Subscription{
|
|
S: m.Filters, remote: m.remote, AuthedPubkey: m.AuthedPubkey,
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Deliver processes and distributes an event to all matching subscribers based on their filter configurations.
|
|
//
|
|
// # Parameters
|
|
//
|
|
// - ev (*event.E): The event to be delivered to subscribed clients.
|
|
//
|
|
// # Expected behaviour
|
|
//
|
|
// Delivers the event to all subscribers whose filters match the event. It
|
|
// applies authentication checks if required by the server and skips delivery
|
|
// for unauthenticated users when events are privileged.
|
|
func (p *P) Deliver(ev *event.E) {
|
|
var err error
|
|
// Snapshot the deliveries under read lock to avoid holding locks during I/O
|
|
p.Mx.RLock()
|
|
type delivery struct {
|
|
w *websocket.Conn
|
|
id string
|
|
sub Subscription
|
|
}
|
|
var deliveries []delivery
|
|
for w, subs := range p.Map {
|
|
for id, subscriber := range subs {
|
|
if subscriber.Match(ev) {
|
|
deliveries = append(
|
|
deliveries, delivery{w: w, id: id, sub: subscriber},
|
|
)
|
|
}
|
|
}
|
|
}
|
|
p.Mx.RUnlock()
|
|
if len(deliveries) > 0 {
|
|
log.D.C(
|
|
func() string {
|
|
return fmt.Sprintf(
|
|
"delivering event %0x to websocket subscribers %d", ev.ID,
|
|
len(deliveries),
|
|
)
|
|
},
|
|
)
|
|
}
|
|
for _, d := range deliveries {
|
|
// If the event is privileged, enforce that the subscriber's authed pubkey matches
|
|
// either the event pubkey or appears in any 'p' tag of the event.
|
|
if kind.IsPrivileged(ev.Kind) {
|
|
if len(d.sub.AuthedPubkey) == 0 {
|
|
// Not authenticated - cannot see privileged events
|
|
log.D.F("subscription delivery DENIED for privileged event %s to %s (not authenticated)",
|
|
hex.Enc(ev.ID), d.sub.remote)
|
|
continue
|
|
}
|
|
|
|
pk := d.sub.AuthedPubkey
|
|
allowed := false
|
|
// Direct author match
|
|
if utils.FastEqual(ev.Pubkey, pk) {
|
|
allowed = true
|
|
} else if ev.Tags != nil {
|
|
for _, pTag := range ev.Tags.GetAll([]byte("p")) {
|
|
// pTag.Value() returns []byte hex string; decode to bytes
|
|
dec, derr := hex.Dec(string(pTag.Value()))
|
|
if derr != nil {
|
|
continue
|
|
}
|
|
if utils.FastEqual(dec, pk) {
|
|
allowed = true
|
|
break
|
|
}
|
|
}
|
|
}
|
|
if !allowed {
|
|
log.D.F("subscription delivery DENIED for privileged event %s to %s (auth mismatch)",
|
|
hex.Enc(ev.ID), d.sub.remote)
|
|
// Skip delivery for this subscriber
|
|
continue
|
|
}
|
|
}
|
|
|
|
// Check for private tags - only deliver to authorized users
|
|
if ev.Tags != nil && ev.Tags.Len() > 0 {
|
|
hasPrivateTag := false
|
|
var privatePubkey []byte
|
|
|
|
for _, t := range *ev.Tags {
|
|
if t.Len() >= 2 {
|
|
keyBytes := t.Key()
|
|
if len(keyBytes) == 7 && string(keyBytes) == "private" {
|
|
hasPrivateTag = true
|
|
privatePubkey = t.Value()
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
if hasPrivateTag {
|
|
canSeePrivate := p.canSeePrivateEvent(d.sub.AuthedPubkey, privatePubkey, d.sub.remote)
|
|
if !canSeePrivate {
|
|
log.D.F("subscription delivery DENIED for private event %s to %s (unauthorized)",
|
|
hex.Enc(ev.ID), d.sub.remote)
|
|
continue
|
|
}
|
|
log.D.F("subscription delivery ALLOWED for private event %s to %s (authorized)",
|
|
hex.Enc(ev.ID), d.sub.remote)
|
|
}
|
|
}
|
|
|
|
var res *eventenvelope.Result
|
|
if res, err = eventenvelope.NewResultWith(d.id, ev); chk.E(err) {
|
|
log.E.F("failed to create event envelope for %s to %s: %v",
|
|
hex.Enc(ev.ID), d.sub.remote, err)
|
|
continue
|
|
}
|
|
|
|
// Log delivery attempt
|
|
msgData := res.Marshal(nil)
|
|
log.D.F("attempting delivery of event %s (kind=%d, len=%d) to subscription %s @ %s",
|
|
hex.Enc(ev.ID), ev.Kind, len(msgData), d.id, d.sub.remote)
|
|
|
|
// Get write channel for this connection
|
|
p.Mx.RLock()
|
|
writeChan, hasChan := p.GetWriteChan(d.w)
|
|
stillSubscribed := p.Map[d.w] != nil
|
|
p.Mx.RUnlock()
|
|
|
|
if !stillSubscribed {
|
|
log.D.F("skipping delivery to %s - connection no longer subscribed", d.sub.remote)
|
|
continue
|
|
}
|
|
|
|
if !hasChan {
|
|
log.D.F("skipping delivery to %s - no write channel available", d.sub.remote)
|
|
continue
|
|
}
|
|
|
|
// Send to write channel - non-blocking with timeout
|
|
select {
|
|
case <-p.c.Done():
|
|
continue
|
|
case writeChan <- publish.WriteRequest{Data: msgData, MsgType: websocket.TextMessage, IsControl: false}:
|
|
log.D.F("subscription delivery QUEUED: event=%s to=%s sub=%s len=%d",
|
|
hex.Enc(ev.ID), d.sub.remote, d.id, len(msgData))
|
|
case <-time.After(DefaultWriteTimeout):
|
|
log.E.F("subscription delivery TIMEOUT: event=%s to=%s sub=%s",
|
|
hex.Enc(ev.ID), d.sub.remote, d.id)
|
|
// Check if connection is still valid
|
|
p.Mx.RLock()
|
|
stillSubscribed = p.Map[d.w] != nil
|
|
p.Mx.RUnlock()
|
|
if !stillSubscribed {
|
|
log.D.F("removing failed subscriber connection: %s", d.sub.remote)
|
|
p.removeSubscriber(d.w)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// removeSubscriberId removes a specific subscription from a subscriber
|
|
// websocket.
|
|
func (p *P) removeSubscriberId(ws *websocket.Conn, id string) {
|
|
p.Mx.Lock()
|
|
defer p.Mx.Unlock()
|
|
var subs map[string]Subscription
|
|
var ok bool
|
|
if subs, ok = p.Map[ws]; ok {
|
|
delete(subs, id)
|
|
// Check the actual map after deletion, not the original reference
|
|
if len(p.Map[ws]) == 0 {
|
|
delete(p.Map, ws)
|
|
// Don't remove write channel here - it's tied to the connection, not subscriptions
|
|
// The write channel will be removed when the connection closes (in handle-websocket.go defer)
|
|
// This allows new subscriptions to be created on the same connection
|
|
}
|
|
}
|
|
}
|
|
|
|
// SetWriteChan stores the write channel for a websocket connection
|
|
// If writeChan is nil, the entry is removed from the map
|
|
func (p *P) SetWriteChan(conn *websocket.Conn, writeChan chan publish.WriteRequest) {
|
|
p.Mx.Lock()
|
|
defer p.Mx.Unlock()
|
|
if writeChan == nil {
|
|
delete(p.WriteChans, conn)
|
|
} else {
|
|
p.WriteChans[conn] = writeChan
|
|
}
|
|
}
|
|
|
|
// GetWriteChan returns the write channel for a websocket connection
|
|
func (p *P) GetWriteChan(conn *websocket.Conn) (chan publish.WriteRequest, bool) {
|
|
p.Mx.RLock()
|
|
defer p.Mx.RUnlock()
|
|
ch, ok := p.WriteChans[conn]
|
|
return ch, ok
|
|
}
|
|
|
|
// removeSubscriber removes a websocket from the P collection.
|
|
func (p *P) removeSubscriber(ws *websocket.Conn) {
|
|
p.Mx.Lock()
|
|
defer p.Mx.Unlock()
|
|
clear(p.Map[ws])
|
|
delete(p.Map, ws)
|
|
delete(p.WriteChans, ws)
|
|
}
|
|
|
|
// canSeePrivateEvent checks if the authenticated user can see an event with a private tag
|
|
func (p *P) canSeePrivateEvent(authedPubkey, privatePubkey []byte, remote string) (canSee bool) {
|
|
// If no authenticated user, deny access
|
|
if len(authedPubkey) == 0 {
|
|
return false
|
|
}
|
|
|
|
// If the authenticated user matches the private tag pubkey, allow access
|
|
if len(privatePubkey) > 0 && utils.FastEqual(authedPubkey, privatePubkey) {
|
|
return true
|
|
}
|
|
|
|
// Check if user is an admin or owner (they can see all private events)
|
|
accessLevel := acl.Registry.GetAccessLevel(authedPubkey, remote)
|
|
if accessLevel == "admin" || accessLevel == "owner" {
|
|
return true
|
|
}
|
|
|
|
// Default deny
|
|
return false
|
|
}
|