Files
next.orly.dev/app/sprocket.go
mleku e14b89bc8b Enhance Sprocket functionality and error handling
This commit introduces significant improvements to the Sprocket system, including:

- Detailed documentation in `readme.adoc` for manual updates and failure handling.
- Implementation of automatic disablement of Sprocket on failure, with periodic checks for recovery.
- Enhanced logging for event rejection when Sprocket is disabled or not running.

These changes ensure better user guidance and system resilience during Sprocket failures.
2025-10-09 19:55:20 +01:00

614 lines
15 KiB
Go

package app
import (
"bufio"
"context"
"encoding/json"
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
"strings"
"sync"
"time"
"github.com/adrg/xdg"
"lol.mleku.dev/chk"
"lol.mleku.dev/log"
"next.orly.dev/pkg/encoders/event"
)
// SprocketResponse represents a response from the sprocket script
type SprocketResponse struct {
ID string `json:"id"`
Action string `json:"action"` // accept, reject, or shadowReject
Msg string `json:"msg"` // NIP-20 response message (only used for reject)
}
// SprocketManager handles sprocket script execution and management
type SprocketManager struct {
ctx context.Context
cancel context.CancelFunc
configDir string
scriptPath string
currentCmd *exec.Cmd
currentCancel context.CancelFunc
mutex sync.RWMutex
isRunning bool
enabled bool
disabled bool // true when sprocket is disabled due to failure
stdin io.WriteCloser
stdout io.ReadCloser
stderr io.ReadCloser
responseChan chan SprocketResponse
}
// NewSprocketManager creates a new sprocket manager
func NewSprocketManager(ctx context.Context, appName string, enabled bool) *SprocketManager {
configDir := filepath.Join(xdg.ConfigHome, appName)
scriptPath := filepath.Join(configDir, "sprocket.sh")
ctx, cancel := context.WithCancel(ctx)
sm := &SprocketManager{
ctx: ctx,
cancel: cancel,
configDir: configDir,
scriptPath: scriptPath,
enabled: enabled,
disabled: false,
responseChan: make(chan SprocketResponse, 100), // Buffered channel for responses
}
// Start the sprocket script if it exists and is enabled
if enabled {
go sm.startSprocketIfExists()
// Start periodic check for sprocket script availability
go sm.periodicCheck()
}
return sm
}
// disableSprocket disables sprocket due to failure
func (sm *SprocketManager) disableSprocket() {
sm.mutex.Lock()
defer sm.mutex.Unlock()
if !sm.disabled {
sm.disabled = true
log.W.F("sprocket disabled due to failure - all events will be rejected (script location: %s)", sm.scriptPath)
}
}
// enableSprocket re-enables sprocket and attempts to start it
func (sm *SprocketManager) enableSprocket() {
sm.mutex.Lock()
defer sm.mutex.Unlock()
if sm.disabled {
sm.disabled = false
log.I.F("sprocket re-enabled, attempting to start")
// Attempt to start sprocket in background
go func() {
if _, err := os.Stat(sm.scriptPath); err == nil {
if err := sm.StartSprocket(); err != nil {
log.E.F("failed to restart sprocket: %v", err)
sm.disableSprocket()
} else {
log.I.F("sprocket restarted successfully")
}
} else {
log.W.F("sprocket script still not found, keeping disabled")
sm.disableSprocket()
}
}()
}
}
// periodicCheck periodically checks if sprocket script becomes available
func (sm *SprocketManager) periodicCheck() {
ticker := time.NewTicker(30 * time.Second) // Check every 30 seconds
defer ticker.Stop()
for {
select {
case <-sm.ctx.Done():
return
case <-ticker.C:
sm.mutex.RLock()
disabled := sm.disabled
running := sm.isRunning
sm.mutex.RUnlock()
// Only check if sprocket is disabled or not running
if disabled || !running {
if _, err := os.Stat(sm.scriptPath); err == nil {
// Script is available, try to enable/restart
if disabled {
sm.enableSprocket()
} else if !running {
// Script exists but sprocket isn't running, try to start
go func() {
if err := sm.StartSprocket(); err != nil {
log.E.F("failed to restart sprocket: %v", err)
sm.disableSprocket()
} else {
log.I.F("sprocket restarted successfully")
}
}()
}
}
}
}
}
}
// startSprocketIfExists starts the sprocket script if the file exists
func (sm *SprocketManager) startSprocketIfExists() {
if _, err := os.Stat(sm.scriptPath); err == nil {
if err := sm.StartSprocket(); err != nil {
log.E.F("failed to start sprocket: %v", err)
sm.disableSprocket()
}
} else {
log.W.F("sprocket script not found at %s, disabling sprocket", sm.scriptPath)
sm.disableSprocket()
}
}
// StartSprocket starts the sprocket script
func (sm *SprocketManager) StartSprocket() error {
sm.mutex.Lock()
defer sm.mutex.Unlock()
if sm.isRunning {
return fmt.Errorf("sprocket is already running")
}
if _, err := os.Stat(sm.scriptPath); os.IsNotExist(err) {
return fmt.Errorf("sprocket script does not exist")
}
// Create a new context for this command
cmdCtx, cmdCancel := context.WithCancel(sm.ctx)
// Make the script executable
if err := os.Chmod(sm.scriptPath, 0755); chk.E(err) {
cmdCancel()
return fmt.Errorf("failed to make script executable: %v", err)
}
// Start the script
cmd := exec.CommandContext(cmdCtx, sm.scriptPath)
cmd.Dir = sm.configDir
// Set up stdio pipes for communication
stdin, err := cmd.StdinPipe()
if chk.E(err) {
cmdCancel()
return fmt.Errorf("failed to create stdin pipe: %v", err)
}
stdout, err := cmd.StdoutPipe()
if chk.E(err) {
cmdCancel()
stdin.Close()
return fmt.Errorf("failed to create stdout pipe: %v", err)
}
stderr, err := cmd.StderrPipe()
if chk.E(err) {
cmdCancel()
stdin.Close()
stdout.Close()
return fmt.Errorf("failed to create stderr pipe: %v", err)
}
// Start the command
if err := cmd.Start(); chk.E(err) {
cmdCancel()
stdin.Close()
stdout.Close()
stderr.Close()
return fmt.Errorf("failed to start sprocket: %v", err)
}
sm.currentCmd = cmd
sm.currentCancel = cmdCancel
sm.stdin = stdin
sm.stdout = stdout
sm.stderr = stderr
sm.isRunning = true
// Start response reader in background
go sm.readResponses()
// Log stderr output in background
go sm.logOutput(stdout, stderr)
// Monitor the process
go sm.monitorProcess()
log.I.F("sprocket started (pid=%d)", cmd.Process.Pid)
return nil
}
// StopSprocket stops the sprocket script gracefully, with SIGKILL fallback
func (sm *SprocketManager) StopSprocket() error {
sm.mutex.Lock()
defer sm.mutex.Unlock()
if !sm.isRunning || sm.currentCmd == nil {
return fmt.Errorf("sprocket is not running")
}
// Close stdin first to signal the script to exit
if sm.stdin != nil {
sm.stdin.Close()
}
// Cancel the context
if sm.currentCancel != nil {
sm.currentCancel()
}
// Wait for graceful shutdown with timeout
done := make(chan error, 1)
go func() {
done <- sm.currentCmd.Wait()
}()
select {
case <-done:
// Process exited gracefully
log.I.F("sprocket stopped gracefully")
case <-time.After(5 * time.Second):
// Force kill after 5 seconds
log.W.F("sprocket did not stop gracefully, sending SIGKILL")
if err := sm.currentCmd.Process.Kill(); chk.E(err) {
log.E.F("failed to kill sprocket process: %v", err)
}
<-done // Wait for the kill to complete
}
// Clean up pipes
if sm.stdin != nil {
sm.stdin.Close()
sm.stdin = nil
}
if sm.stdout != nil {
sm.stdout.Close()
sm.stdout = nil
}
if sm.stderr != nil {
sm.stderr.Close()
sm.stderr = nil
}
sm.isRunning = false
sm.currentCmd = nil
sm.currentCancel = nil
return nil
}
// RestartSprocket stops and starts the sprocket script
func (sm *SprocketManager) RestartSprocket() error {
if sm.isRunning {
if err := sm.StopSprocket(); chk.E(err) {
return fmt.Errorf("failed to stop sprocket: %v", err)
}
// Give it a moment to fully stop
time.Sleep(100 * time.Millisecond)
}
return sm.StartSprocket()
}
// UpdateSprocket updates the sprocket script and restarts it with zero downtime
func (sm *SprocketManager) UpdateSprocket(scriptContent string) error {
// Ensure config directory exists
if err := os.MkdirAll(sm.configDir, 0755); chk.E(err) {
return fmt.Errorf("failed to create config directory: %v", err)
}
// If script content is empty, delete the script and stop
if strings.TrimSpace(scriptContent) == "" {
if sm.isRunning {
if err := sm.StopSprocket(); chk.E(err) {
log.E.F("failed to stop sprocket before deletion: %v", err)
}
}
if _, err := os.Stat(sm.scriptPath); err == nil {
if err := os.Remove(sm.scriptPath); chk.E(err) {
return fmt.Errorf("failed to delete sprocket script: %v", err)
}
log.I.F("sprocket script deleted")
}
return nil
}
// Create backup of existing script if it exists
if _, err := os.Stat(sm.scriptPath); err == nil {
timestamp := time.Now().Format("20060102150405")
backupPath := sm.scriptPath + "." + timestamp
if err := os.Rename(sm.scriptPath, backupPath); chk.E(err) {
log.W.F("failed to create backup: %v", err)
} else {
log.I.F("created backup: %s", backupPath)
}
}
// Write new script to temporary file first
tempPath := sm.scriptPath + ".tmp"
if err := os.WriteFile(tempPath, []byte(scriptContent), 0755); chk.E(err) {
return fmt.Errorf("failed to write temporary sprocket script: %v", err)
}
// If sprocket is running, do zero-downtime update
if sm.isRunning {
// Atomically replace the script file
if err := os.Rename(tempPath, sm.scriptPath); chk.E(err) {
os.Remove(tempPath) // Clean up temp file
return fmt.Errorf("failed to replace sprocket script: %v", err)
}
log.I.F("sprocket script updated atomically")
// Restart the sprocket process
return sm.RestartSprocket()
} else {
// Not running, just replace the file
if err := os.Rename(tempPath, sm.scriptPath); chk.E(err) {
os.Remove(tempPath) // Clean up temp file
return fmt.Errorf("failed to replace sprocket script: %v", err)
}
log.I.F("sprocket script updated")
return nil
}
}
// GetSprocketStatus returns the current status of the sprocket
func (sm *SprocketManager) GetSprocketStatus() map[string]interface{} {
sm.mutex.RLock()
defer sm.mutex.RUnlock()
status := map[string]interface{}{
"is_running": sm.isRunning,
"script_exists": false,
"script_path": sm.scriptPath,
}
if _, err := os.Stat(sm.scriptPath); err == nil {
status["script_exists"] = true
// Get script content
if content, err := os.ReadFile(sm.scriptPath); err == nil {
status["script_content"] = string(content)
}
// Get file info
if info, err := os.Stat(sm.scriptPath); err == nil {
status["script_modified"] = info.ModTime()
}
}
if sm.isRunning && sm.currentCmd != nil && sm.currentCmd.Process != nil {
status["pid"] = sm.currentCmd.Process.Pid
}
return status
}
// GetSprocketVersions returns a list of all sprocket script versions
func (sm *SprocketManager) GetSprocketVersions() ([]map[string]interface{}, error) {
versions := []map[string]interface{}{}
// Check for current script
if _, err := os.Stat(sm.scriptPath); err == nil {
if info, err := os.Stat(sm.scriptPath); err == nil {
if content, err := os.ReadFile(sm.scriptPath); err == nil {
versions = append(versions, map[string]interface{}{
"name": "sprocket.sh",
"path": sm.scriptPath,
"modified": info.ModTime(),
"content": string(content),
"is_current": true,
})
}
}
}
// Check for backup versions
dir := filepath.Dir(sm.scriptPath)
files, err := os.ReadDir(dir)
if chk.E(err) {
return versions, nil
}
for _, file := range files {
if strings.HasPrefix(file.Name(), "sprocket.sh.") && !file.IsDir() {
path := filepath.Join(dir, file.Name())
if info, err := os.Stat(path); err == nil {
if content, err := os.ReadFile(path); err == nil {
versions = append(versions, map[string]interface{}{
"name": file.Name(),
"path": path,
"modified": info.ModTime(),
"content": string(content),
"is_current": false,
})
}
}
}
}
return versions, nil
}
// DeleteSprocketVersion deletes a specific sprocket version
func (sm *SprocketManager) DeleteSprocketVersion(filename string) error {
// Don't allow deleting the current script
if filename == "sprocket.sh" {
return fmt.Errorf("cannot delete current sprocket script")
}
path := filepath.Join(sm.configDir, filename)
if err := os.Remove(path); chk.E(err) {
return fmt.Errorf("failed to delete sprocket version: %v", err)
}
log.I.F("deleted sprocket version: %s", filename)
return nil
}
// logOutput logs the output from stdout and stderr
func (sm *SprocketManager) logOutput(stdout, stderr io.ReadCloser) {
defer stdout.Close()
defer stderr.Close()
go func() {
io.Copy(os.Stdout, stdout)
}()
go func() {
io.Copy(os.Stderr, stderr)
}()
}
// ProcessEvent sends an event to the sprocket script and waits for a response
func (sm *SprocketManager) ProcessEvent(evt *event.E) (*SprocketResponse, error) {
sm.mutex.RLock()
if !sm.isRunning || sm.stdin == nil {
sm.mutex.RUnlock()
return nil, fmt.Errorf("sprocket is not running")
}
stdin := sm.stdin
sm.mutex.RUnlock()
// Serialize the event to JSON
eventJSON, err := json.Marshal(evt)
if chk.E(err) {
return nil, fmt.Errorf("failed to serialize event: %v", err)
}
// Send the event JSON to the sprocket script
// The final ']' should be the only thing after the event's raw JSON
if _, err := stdin.Write(eventJSON); chk.E(err) {
return nil, fmt.Errorf("failed to write event to sprocket: %v", err)
}
// Wait for response with timeout
select {
case response := <-sm.responseChan:
return &response, nil
case <-time.After(5 * time.Second):
return nil, fmt.Errorf("sprocket response timeout")
case <-sm.ctx.Done():
return nil, fmt.Errorf("sprocket context cancelled")
}
}
// readResponses reads JSONL responses from the sprocket script
func (sm *SprocketManager) readResponses() {
if sm.stdout == nil {
return
}
scanner := bufio.NewScanner(sm.stdout)
for scanner.Scan() {
line := scanner.Text()
if line == "" {
continue
}
var response SprocketResponse
if err := json.Unmarshal([]byte(line), &response); chk.E(err) {
log.E.F("failed to parse sprocket response: %v", err)
continue
}
// Send response to channel (non-blocking)
select {
case sm.responseChan <- response:
default:
log.W.F("sprocket response channel full, dropping response")
}
}
if err := scanner.Err(); chk.E(err) {
log.E.F("error reading sprocket responses: %v", err)
}
}
// IsEnabled returns whether sprocket is enabled
func (sm *SprocketManager) IsEnabled() bool {
return sm.enabled
}
// IsRunning returns whether sprocket is currently running
func (sm *SprocketManager) IsRunning() bool {
sm.mutex.RLock()
defer sm.mutex.RUnlock()
return sm.isRunning
}
// IsDisabled returns whether sprocket is disabled due to failure
func (sm *SprocketManager) IsDisabled() bool {
sm.mutex.RLock()
defer sm.mutex.RUnlock()
return sm.disabled
}
// monitorProcess monitors the sprocket process and cleans up when it exits
func (sm *SprocketManager) monitorProcess() {
if sm.currentCmd == nil {
return
}
err := sm.currentCmd.Wait()
sm.mutex.Lock()
defer sm.mutex.Unlock()
// Clean up pipes
if sm.stdin != nil {
sm.stdin.Close()
sm.stdin = nil
}
if sm.stdout != nil {
sm.stdout.Close()
sm.stdout = nil
}
if sm.stderr != nil {
sm.stderr.Close()
sm.stderr = nil
}
sm.isRunning = false
sm.currentCmd = nil
sm.currentCancel = nil
if err != nil {
log.E.F("sprocket process exited with error: %v", err)
// Auto-disable sprocket on failure
sm.disabled = true
log.W.F("sprocket disabled due to process failure - all events will be rejected (script location: %s)", sm.scriptPath)
} else {
log.I.F("sprocket process exited normally")
}
}
// Shutdown gracefully shuts down the sprocket manager
func (sm *SprocketManager) Shutdown() {
sm.cancel()
if sm.isRunning {
sm.StopSprocket()
}
}