This commit introduces significant improvements to the Sprocket system, including: - Detailed documentation in `readme.adoc` for manual updates and failure handling. - Implementation of automatic disablement of Sprocket on failure, with periodic checks for recovery. - Enhanced logging for event rejection when Sprocket is disabled or not running. These changes ensure better user guidance and system resilience during Sprocket failures.
258 lines
6.6 KiB
Go
258 lines
6.6 KiB
Go
package app
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
"lol.mleku.dev/chk"
|
|
"lol.mleku.dev/log"
|
|
"next.orly.dev/pkg/acl"
|
|
"next.orly.dev/pkg/encoders/envelopes/authenvelope"
|
|
"next.orly.dev/pkg/encoders/envelopes/eventenvelope"
|
|
"next.orly.dev/pkg/encoders/envelopes/okenvelope"
|
|
"next.orly.dev/pkg/encoders/kind"
|
|
"next.orly.dev/pkg/encoders/reason"
|
|
"next.orly.dev/pkg/utils"
|
|
)
|
|
|
|
func (l *Listener) HandleEvent(msg []byte) (err error) {
|
|
log.D.F("handling event: %s", msg)
|
|
// decode the envelope
|
|
env := eventenvelope.NewSubmission()
|
|
if msg, err = env.Unmarshal(msg); chk.E(err) {
|
|
return
|
|
}
|
|
defer func() {
|
|
if env != nil && env.E != nil {
|
|
env.E.Free()
|
|
}
|
|
}()
|
|
if len(msg) > 0 {
|
|
log.I.F("extra '%s'", msg)
|
|
}
|
|
|
|
// Check if sprocket is enabled and process event through it
|
|
if l.sprocketManager != nil && l.sprocketManager.IsEnabled() {
|
|
if l.sprocketManager.IsDisabled() {
|
|
// Sprocket is disabled due to failure - reject all events
|
|
log.W.F("sprocket is disabled, rejecting event %0x", env.E.ID)
|
|
if err = Ok.Error(
|
|
l, env, "sprocket disabled - events rejected until sprocket is restored",
|
|
); chk.E(err) {
|
|
return
|
|
}
|
|
return
|
|
}
|
|
|
|
if !l.sprocketManager.IsRunning() {
|
|
// Sprocket is enabled but not running - reject all events
|
|
log.W.F("sprocket is enabled but not running, rejecting event %0x", env.E.ID)
|
|
if err = Ok.Error(
|
|
l, env, "sprocket not running - events rejected until sprocket starts",
|
|
); chk.E(err) {
|
|
return
|
|
}
|
|
return
|
|
}
|
|
|
|
// Process event through sprocket
|
|
response, sprocketErr := l.sprocketManager.ProcessEvent(env.E)
|
|
if chk.E(sprocketErr) {
|
|
log.E.F("sprocket processing failed: %v", sprocketErr)
|
|
if err = Ok.Error(
|
|
l, env, "sprocket processing failed",
|
|
); chk.E(err) {
|
|
return
|
|
}
|
|
return
|
|
}
|
|
|
|
// Handle sprocket response
|
|
switch response.Action {
|
|
case "accept":
|
|
// Continue with normal processing
|
|
log.D.F("sprocket accepted event %0x", env.E.ID)
|
|
case "reject":
|
|
// Return OK false with message
|
|
if err = okenvelope.NewFrom(
|
|
env.Id(), false,
|
|
reason.Error.F(response.Msg),
|
|
).Write(l); chk.E(err) {
|
|
return
|
|
}
|
|
return
|
|
case "shadowReject":
|
|
// Return OK true but abort processing
|
|
if err = Ok.Ok(l, env, ""); chk.E(err) {
|
|
return
|
|
}
|
|
log.D.F("sprocket shadow rejected event %0x", env.E.ID)
|
|
return
|
|
default:
|
|
log.W.F("unknown sprocket action: %s", response.Action)
|
|
// Default to accept for unknown actions
|
|
}
|
|
}
|
|
// check the event ID is correct
|
|
calculatedId := env.E.GetIDBytes()
|
|
if !utils.FastEqual(calculatedId, env.E.ID) {
|
|
if err = Ok.Invalid(
|
|
l, env, "event id is computed incorrectly, "+
|
|
"event has ID %0x, but when computed it is %0x",
|
|
env.E.ID, calculatedId,
|
|
); chk.E(err) {
|
|
return
|
|
}
|
|
return
|
|
}
|
|
// verify the signature
|
|
var ok bool
|
|
if ok, err = env.Verify(); chk.T(err) {
|
|
if err = Ok.Error(
|
|
l, env, fmt.Sprintf(
|
|
"failed to verify signature: %s",
|
|
err.Error(),
|
|
),
|
|
); chk.E(err) {
|
|
return
|
|
}
|
|
} else if !ok {
|
|
if err = Ok.Invalid(
|
|
l, env,
|
|
"signature is invalid",
|
|
); chk.E(err) {
|
|
return
|
|
}
|
|
return
|
|
}
|
|
// check permissions of user
|
|
accessLevel := acl.Registry.GetAccessLevel(l.authedPubkey.Load(), l.remote)
|
|
switch accessLevel {
|
|
case "none":
|
|
log.D.F(
|
|
"handle event: sending 'OK,false,auth-required...' to %s", l.remote,
|
|
)
|
|
if err = okenvelope.NewFrom(
|
|
env.Id(), false,
|
|
reason.AuthRequired.F("auth required for write access"),
|
|
).Write(l); chk.E(err) {
|
|
// return
|
|
}
|
|
log.D.F("handle event: sending challenge to %s", l.remote)
|
|
if err = authenvelope.NewChallengeWith(l.challenge.Load()).
|
|
Write(l); chk.E(err) {
|
|
return
|
|
}
|
|
return
|
|
case "read":
|
|
log.D.F(
|
|
"handle event: sending 'OK,false,auth-required:...' to %s",
|
|
l.remote,
|
|
)
|
|
if err = okenvelope.NewFrom(
|
|
env.Id(), false,
|
|
reason.AuthRequired.F("auth required for write access"),
|
|
).Write(l); chk.E(err) {
|
|
return
|
|
}
|
|
log.D.F("handle event: sending challenge to %s", l.remote)
|
|
if err = authenvelope.NewChallengeWith(l.challenge.Load()).
|
|
Write(l); chk.E(err) {
|
|
return
|
|
}
|
|
return
|
|
default:
|
|
// user has write access or better, continue
|
|
// log.D.F("user has %s access", accessLevel)
|
|
}
|
|
// check for protected tag (NIP-70)
|
|
protectedTag := env.E.Tags.GetFirst([]byte("-"))
|
|
if protectedTag != nil && acl.Registry.Active.Load() != "none" {
|
|
// check that the pubkey of the event matches the authed pubkey
|
|
if !utils.FastEqual(l.authedPubkey.Load(), env.E.Pubkey) {
|
|
if err = Ok.Blocked(
|
|
l, env,
|
|
"protected tag may only be published by user authed to the same pubkey",
|
|
); chk.E(err) {
|
|
return
|
|
}
|
|
return
|
|
}
|
|
}
|
|
// if the event is a delete, process the delete
|
|
if env.E.Kind == kind.EventDeletion.K {
|
|
if err = l.HandleDelete(env); err != nil {
|
|
if strings.HasPrefix(err.Error(), "blocked:") {
|
|
errStr := err.Error()[len("blocked: "):len(err.Error())]
|
|
if err = Ok.Error(
|
|
l, env, errStr,
|
|
); chk.E(err) {
|
|
return
|
|
}
|
|
return
|
|
}
|
|
}
|
|
} else {
|
|
// check if the event was deleted
|
|
if err = l.CheckForDeleted(env.E, l.Admins); err != nil {
|
|
if strings.HasPrefix(err.Error(), "blocked:") {
|
|
errStr := err.Error()[len("blocked: "):len(err.Error())]
|
|
if err = Ok.Error(
|
|
l, env, errStr,
|
|
); chk.E(err) {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
// store the event - use a separate context to prevent cancellation issues
|
|
saveCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
defer cancel()
|
|
// log.I.F("saving event %0x, %s", env.E.ID, env.E.Serialize())
|
|
if _, _, err = l.SaveEvent(saveCtx, env.E); err != nil {
|
|
if strings.HasPrefix(err.Error(), "blocked:") {
|
|
errStr := err.Error()[len("blocked: "):len(err.Error())]
|
|
if err = Ok.Error(
|
|
l, env, errStr,
|
|
); chk.E(err) {
|
|
return
|
|
}
|
|
return
|
|
}
|
|
chk.E(err)
|
|
return
|
|
}
|
|
// Send a success response storing
|
|
if err = Ok.Ok(l, env, ""); chk.E(err) {
|
|
return
|
|
}
|
|
// Deliver the event to subscribers immediately after sending OK response
|
|
// Clone the event to prevent corruption when the original is freed
|
|
clonedEvent := env.E.Clone()
|
|
go l.publishers.Deliver(clonedEvent)
|
|
log.D.F("saved event %0x", env.E.ID)
|
|
var isNewFromAdmin bool
|
|
for _, admin := range l.Admins {
|
|
if utils.FastEqual(admin, env.E.Pubkey) {
|
|
isNewFromAdmin = true
|
|
break
|
|
}
|
|
}
|
|
if isNewFromAdmin {
|
|
log.I.F("new event from admin %0x", env.E.Pubkey)
|
|
// if a follow list was saved, reconfigure ACLs now that it is persisted
|
|
if env.E.Kind == kind.FollowList.K ||
|
|
env.E.Kind == kind.RelayListMetadata.K {
|
|
// Run ACL reconfiguration asynchronously to prevent blocking websocket operations
|
|
go func() {
|
|
if err := acl.Registry.Configure(); chk.E(err) {
|
|
log.E.F("failed to reconfigure ACL: %v", err)
|
|
}
|
|
}()
|
|
}
|
|
}
|
|
return
|
|
}
|