diff --git a/pkg/policy/benchmark_test.go b/pkg/policy/benchmark_test.go index c3f4001..5b7a839 100644 --- a/pkg/policy/benchmark_test.go +++ b/pkg/policy/benchmark_test.go @@ -104,21 +104,25 @@ done b.Fatalf("Failed to create test script: %v", err) } - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + manager := &PolicyManager{ - ctx: ctx, - configDir: tempDir, - scriptPath: scriptPath, - enabled: true, - responseChan: make(chan PolicyResponse, 100), + ctx: ctx, + cancel: cancel, + configDir: tempDir, + scriptPath: scriptPath, + enabled: true, + runners: make(map[string]*ScriptRunner), } - // Start the policy manager - err = manager.StartPolicy() + // Get or create runner and start it + runner := manager.getOrCreateRunner(scriptPath) + err = runner.Start() if err != nil { - b.Fatalf("Failed to start policy: %v", err) + b.Fatalf("Failed to start policy script: %v", err) } - defer manager.StopPolicy() + defer runner.Stop() // Give the script time to start time.Sleep(100 * time.Millisecond) diff --git a/pkg/policy/policy.go b/pkg/policy/policy.go index 1d0086c..23757ea 100644 --- a/pkg/policy/policy.go +++ b/pkg/policy/policy.go @@ -119,10 +119,9 @@ type PolicyResponse struct { Msg string `json:"msg"` // NIP-20 response message (only used for reject) } -// PolicyManager handles policy script execution and management. -// It manages the lifecycle of policy scripts, handles communication with them, -// and provides resilient operation with automatic restart capabilities. -type PolicyManager struct { +// ScriptRunner manages a single policy script process. +// Each unique script path gets its own independent runner with its own goroutine. +type ScriptRunner struct { ctx context.Context cancel context.CancelFunc configDir string @@ -132,7 +131,6 @@ type PolicyManager struct { mutex sync.RWMutex isRunning bool isStarting bool - enabled bool stdin io.WriteCloser stdout io.ReadCloser stderr io.ReadCloser @@ -140,6 +138,20 @@ type PolicyManager struct { startupChan chan error } +// PolicyManager handles multiple policy script runners. +// It manages the lifecycle of policy scripts, handles communication with them, +// and provides resilient operation with automatic restart capabilities. +// Each unique script path gets its own ScriptRunner instance. +type PolicyManager struct { + ctx context.Context + cancel context.CancelFunc + configDir string + scriptPath string // Default script path for backward compatibility + enabled bool + mutex sync.RWMutex + runners map[string]*ScriptRunner // Map of script path -> runner +} + // P represents a complete policy configuration for a Nostr relay. // It defines access control rules, kind filtering, and default behavior. // Policies are evaluated in order: global rules, kind filtering, specific rules, then default policy. @@ -199,13 +211,12 @@ func NewWithManager(ctx context.Context, appName string, enabled bool) *P { ctx, cancel := context.WithCancel(ctx) manager := &PolicyManager{ - ctx: ctx, - cancel: cancel, - configDir: configDir, - scriptPath: scriptPath, - enabled: enabled, - responseChan: make(chan PolicyResponse, 100), // Buffered channel for responses - startupChan: make(chan error, 1), // Channel for startup completion + ctx: ctx, + cancel: cancel, + configDir: configDir, + scriptPath: scriptPath, + enabled: enabled, + runners: make(map[string]*ScriptRunner), } // Load policy configuration from JSON file @@ -231,6 +242,406 @@ func NewWithManager(ctx context.Context, appName string, enabled bool) *P { return policy } +// getOrCreateRunner gets an existing runner for the script path or creates a new one. +// This method is thread-safe and ensures only one runner exists per unique script path. +func (pm *PolicyManager) getOrCreateRunner(scriptPath string) *ScriptRunner { + pm.mutex.Lock() + defer pm.mutex.Unlock() + + // Check if runner already exists + if runner, exists := pm.runners[scriptPath]; exists { + return runner + } + + // Create new runner + runnerCtx, runnerCancel := context.WithCancel(pm.ctx) + runner := &ScriptRunner{ + ctx: runnerCtx, + cancel: runnerCancel, + configDir: pm.configDir, + scriptPath: scriptPath, + responseChan: make(chan PolicyResponse, 100), + startupChan: make(chan error, 1), + } + + pm.runners[scriptPath] = runner + + // Start periodic check for this runner + go runner.periodicCheck() + + return runner +} + +// ScriptRunner methods + +// IsRunning returns whether the script is currently running. +func (sr *ScriptRunner) IsRunning() bool { + sr.mutex.RLock() + defer sr.mutex.RUnlock() + return sr.isRunning +} + +// ensureRunning ensures the script is running, starting it if necessary. +func (sr *ScriptRunner) ensureRunning() error { + sr.mutex.Lock() + // Check if already running + if sr.isRunning { + sr.mutex.Unlock() + return nil + } + + // Check if already starting + if sr.isStarting { + sr.mutex.Unlock() + // Wait for startup to complete + select { + case err := <-sr.startupChan: + if err != nil { + return fmt.Errorf("script startup failed: %v", err) + } + // Double-check it's actually running after receiving signal + sr.mutex.RLock() + running := sr.isRunning + sr.mutex.RUnlock() + if !running { + return fmt.Errorf("script startup completed but process is not running") + } + return nil + case <-time.After(10 * time.Second): + return fmt.Errorf("script startup timeout") + case <-sr.ctx.Done(): + return fmt.Errorf("script context cancelled") + } + } + + // Mark as starting + sr.isStarting = true + sr.mutex.Unlock() + + // Start the script in a goroutine + go func() { + err := sr.Start() + sr.mutex.Lock() + sr.isStarting = false + sr.mutex.Unlock() + // Signal startup completion (non-blocking) + // Drain any stale value first, then send + select { + case <-sr.startupChan: + default: + } + select { + case sr.startupChan <- err: + default: + // Channel should be empty now, but if it's full, try again + sr.startupChan <- err + } + }() + + // Wait for startup to complete + select { + case err := <-sr.startupChan: + if err != nil { + return fmt.Errorf("script startup failed: %v", err) + } + // Double-check it's actually running after receiving signal + sr.mutex.RLock() + running := sr.isRunning + sr.mutex.RUnlock() + if !running { + return fmt.Errorf("script startup completed but process is not running") + } + return nil + case <-time.After(10 * time.Second): + sr.mutex.Lock() + sr.isStarting = false + sr.mutex.Unlock() + return fmt.Errorf("script startup timeout") + case <-sr.ctx.Done(): + sr.mutex.Lock() + sr.isStarting = false + sr.mutex.Unlock() + return fmt.Errorf("script context cancelled") + } +} + +// Start starts the script process. +func (sr *ScriptRunner) Start() error { + sr.mutex.Lock() + defer sr.mutex.Unlock() + + if sr.isRunning { + return fmt.Errorf("script is already running") + } + + if _, err := os.Stat(sr.scriptPath); os.IsNotExist(err) { + return fmt.Errorf("script does not exist at %s", sr.scriptPath) + } + + // Create a new context for this command + cmdCtx, cmdCancel := context.WithCancel(sr.ctx) + + // Make the script executable + if err := os.Chmod(sr.scriptPath, 0755); chk.E(err) { + cmdCancel() + return fmt.Errorf("failed to make script executable: %v", err) + } + + // Start the script + cmd := exec.CommandContext(cmdCtx, sr.scriptPath) + cmd.Dir = sr.configDir + + // Set up stdio pipes for communication + stdin, err := cmd.StdinPipe() + if chk.E(err) { + cmdCancel() + return fmt.Errorf("failed to create stdin pipe: %v", err) + } + + stdout, err := cmd.StdoutPipe() + if chk.E(err) { + cmdCancel() + stdin.Close() + return fmt.Errorf("failed to create stdout pipe: %v", err) + } + + stderr, err := cmd.StderrPipe() + if chk.E(err) { + cmdCancel() + stdin.Close() + stdout.Close() + return fmt.Errorf("failed to create stderr pipe: %v", err) + } + + // Start the command + if err := cmd.Start(); chk.E(err) { + cmdCancel() + stdin.Close() + stdout.Close() + stderr.Close() + return fmt.Errorf("failed to start script: %v", err) + } + + sr.currentCmd = cmd + sr.currentCancel = cmdCancel + sr.stdin = stdin + sr.stdout = stdout + sr.stderr = stderr + sr.isRunning = true + + // Start response reader in background + go sr.readResponses() + + // Log stderr output in background + go sr.logOutput(stdout, stderr) + + // Monitor the process + go sr.monitorProcess() + + log.I.F("policy script started: %s (pid=%d)", sr.scriptPath, cmd.Process.Pid) + return nil +} + +// Stop stops the script gracefully. +func (sr *ScriptRunner) Stop() error { + sr.mutex.Lock() + defer sr.mutex.Unlock() + + if !sr.isRunning || sr.currentCmd == nil { + return fmt.Errorf("script is not running") + } + + // Close stdin first to signal the script to exit + if sr.stdin != nil { + sr.stdin.Close() + } + + // Cancel the context + if sr.currentCancel != nil { + sr.currentCancel() + } + + // Wait for graceful shutdown with timeout + done := make(chan error, 1) + go func() { + done <- sr.currentCmd.Wait() + }() + + select { + case <-done: + // Process exited gracefully + log.I.F("policy script stopped: %s", sr.scriptPath) + case <-time.After(5 * time.Second): + // Force kill after 5 seconds + log.W.F("policy script did not stop gracefully, sending SIGKILL: %s", sr.scriptPath) + if err := sr.currentCmd.Process.Kill(); chk.E(err) { + log.E.F("failed to kill script process: %v", err) + } + <-done // Wait for the kill to complete + } + + // Clean up pipes + if sr.stdin != nil { + sr.stdin.Close() + sr.stdin = nil + } + if sr.stdout != nil { + sr.stdout.Close() + sr.stdout = nil + } + if sr.stderr != nil { + sr.stderr.Close() + sr.stderr = nil + } + + sr.isRunning = false + sr.currentCmd = nil + sr.currentCancel = nil + + return nil +} + +// ProcessEvent sends an event to the script and waits for a response. +func (sr *ScriptRunner) ProcessEvent(evt *PolicyEvent) (*PolicyResponse, error) { + sr.mutex.RLock() + if !sr.isRunning || sr.stdin == nil { + sr.mutex.RUnlock() + return nil, fmt.Errorf("script is not running") + } + stdin := sr.stdin + sr.mutex.RUnlock() + + // Serialize the event to JSON + eventJSON, err := json.Marshal(evt) + if chk.E(err) { + return nil, fmt.Errorf("failed to serialize event: %v", err) + } + + // Send the event JSON to the script (newline-terminated) + if _, err := stdin.Write(append(eventJSON, '\n')); chk.E(err) { + return nil, fmt.Errorf("failed to write event to script: %v", err) + } + + // Wait for response with timeout + select { + case response := <-sr.responseChan: + return &response, nil + case <-time.After(5 * time.Second): + return nil, fmt.Errorf("script response timeout") + case <-sr.ctx.Done(): + return nil, fmt.Errorf("script context cancelled") + } +} + +// readResponses reads JSONL responses from the script +func (sr *ScriptRunner) readResponses() { + if sr.stdout == nil { + return + } + + scanner := bufio.NewScanner(sr.stdout) + for scanner.Scan() { + line := scanner.Text() + if line == "" { + continue + } + + var response PolicyResponse + if err := json.Unmarshal([]byte(line), &response); chk.E(err) { + log.E.F("failed to parse policy response from %s: %v", sr.scriptPath, err) + continue + } + + // Send response to channel (non-blocking) + select { + case sr.responseChan <- response: + default: + log.W.F("policy response channel full for %s, dropping response", sr.scriptPath) + } + } + + if err := scanner.Err(); chk.E(err) { + log.E.F("error reading policy responses from %s: %v", sr.scriptPath, err) + } +} + +// logOutput logs the output from stderr +func (sr *ScriptRunner) logOutput(stdout, stderr io.ReadCloser) { + defer stderr.Close() + + // Only log stderr, stdout is used by readResponses + go func() { + io.Copy(os.Stderr, stderr) + }() +} + +// monitorProcess monitors the script process and cleans up when it exits +func (sr *ScriptRunner) monitorProcess() { + if sr.currentCmd == nil { + return + } + + err := sr.currentCmd.Wait() + + sr.mutex.Lock() + defer sr.mutex.Unlock() + + // Clean up pipes + if sr.stdin != nil { + sr.stdin.Close() + sr.stdin = nil + } + if sr.stdout != nil { + sr.stdout.Close() + sr.stdout = nil + } + if sr.stderr != nil { + sr.stderr.Close() + sr.stderr = nil + } + + sr.isRunning = false + sr.currentCmd = nil + sr.currentCancel = nil + + if err != nil { + log.E.F("policy script exited with error: %s: %v, will retry periodically", sr.scriptPath, err) + } else { + log.I.F("policy script exited normally: %s", sr.scriptPath) + } +} + +// periodicCheck periodically checks if script becomes available and attempts to restart failed scripts. +func (sr *ScriptRunner) periodicCheck() { + ticker := time.NewTicker(60 * time.Second) + defer ticker.Stop() + + for { + select { + case <-sr.ctx.Done(): + return + case <-ticker.C: + sr.mutex.RLock() + running := sr.isRunning + sr.mutex.RUnlock() + + // Check if script is not running and try to start it + if !running { + if _, err := os.Stat(sr.scriptPath); err == nil { + // Script exists but not running, try to start + go func() { + if err := sr.Start(); err != nil { + log.E.F("failed to restart policy script %s: %v, will retry in next cycle", sr.scriptPath, err) + } else { + log.I.F("policy script restarted successfully: %s", sr.scriptPath) + } + }() + } + } + } + } +} + // LoadFromFile loads policy configuration from a JSON file. // Returns an error if the file doesn't exist, can't be read, or contains invalid JSON. func (p *P) LoadFromFile(configPath string) error { @@ -285,7 +696,7 @@ func (p *P) CheckPolicy(access string, ev *event.E, loggedInPubkey []byte, ipAdd if rule.Script != "" && p.Manager != nil { if p.Manager.IsEnabled() { // Check if script file exists before trying to use it - if _, err := os.Stat(p.Manager.GetScriptPath()); err == nil { + if _, err := os.Stat(rule.Script); err == nil { // Script exists, try to use it allowed, err := p.checkScriptPolicy(access, ev, rule.Script, loggedInPubkey, ipAddress) if err == nil { @@ -482,16 +893,19 @@ func (p *P) checkScriptPolicy(access string, ev *event.E, scriptPath string, log return p.getDefaultPolicyAction(), nil } - // Policy is enabled, check if it's running - if !p.Manager.IsRunning() { - // Check if script file exists - if _, err := os.Stat(p.Manager.GetScriptPath()); os.IsNotExist(err) { - // Script doesn't exist, return error so caller can fall back to other criteria - return false, fmt.Errorf("policy script does not exist at %s", p.Manager.GetScriptPath()) - } + // Check if script file exists + if _, err := os.Stat(scriptPath); os.IsNotExist(err) { + // Script doesn't exist, return error so caller can fall back to other criteria + return false, fmt.Errorf("policy script does not exist at %s", scriptPath) + } - // Try to start the policy and wait for it - if err := p.Manager.ensureRunning(); err != nil { + // Get or create a runner for this specific script path + runner := p.Manager.getOrCreateRunner(scriptPath) + + // Policy is enabled, check if this runner is running + if !runner.IsRunning() { + // Try to start this runner and wait for it + if err := runner.ensureRunning(); err != nil { // Startup failed, return error so caller can fall back to other criteria return false, fmt.Errorf("failed to start policy script: %v", err) } @@ -505,7 +919,7 @@ func (p *P) checkScriptPolicy(access string, ev *event.E, scriptPath string, log } // Process event through policy script - response, scriptErr := p.Manager.ProcessEvent(policyEvent) + response, scriptErr := runner.ProcessEvent(policyEvent) if chk.E(scriptErr) { log.E.F("policy rule for kind %d failed (script processing error: %v), falling back to default policy (%s)", ev.Kind, scriptErr, p.DefaultPolicy) // Fall back to default policy on script failure @@ -527,413 +941,70 @@ func (p *P) checkScriptPolicy(access string, ev *event.E, scriptPath string, log } } -// PolicyManager methods (similar to SprocketManager) +// PolicyManager methods -// periodicCheck periodically checks if policy script becomes available and attempts to restart failed scripts. -// Runs every 60 seconds (1 minute) to provide resilient script management. +// periodicCheck periodically checks if the default policy script becomes available. +// This is for backward compatibility with the default script path. func (pm *PolicyManager) periodicCheck() { - ticker := time.NewTicker(60 * time.Second) // Check every 60 seconds (1 minute) - defer ticker.Stop() - - for { - select { - case <-pm.ctx.Done(): - return - case <-ticker.C: - pm.mutex.RLock() - running := pm.isRunning - pm.mutex.RUnlock() - - // Check if policy script is not running and try to start it - if !running { - if _, err := os.Stat(pm.scriptPath); err == nil { - // Script exists but policy isn't running, try to start - go func() { - if err := pm.StartPolicy(); err != nil { - log.E.F("failed to restart policy: %v, will retry in next cycle", err) - } else { - log.I.F("policy restarted successfully") - } - }() - } - } - } - } + // Get or create runner for the default script path + // This will also start its own periodic check + pm.getOrCreateRunner(pm.scriptPath) } -// startPolicyIfExists starts the policy script if the file exists +// startPolicyIfExists starts the default policy script if the file exists. +// This is for backward compatibility with the default script path. func (pm *PolicyManager) startPolicyIfExists() { if _, err := os.Stat(pm.scriptPath); err == nil { - if err := pm.StartPolicy(); err != nil { - log.E.F("failed to start policy: %v, will retry periodically", err) - // Don't disable policy manager, just log the error and let periodic check retry + // Get or create runner for the default script, which will start it + runner := pm.getOrCreateRunner(pm.scriptPath) + if err := runner.Start(); err != nil { + log.E.F("failed to start default policy script: %v, will retry periodically", err) } } else { - log.W.F("policy script not found at %s, will retry periodically", pm.scriptPath) - // Don't disable policy manager, just log and let periodic check retry - } -} - -// ensureRunning ensures the policy is running, starting it if necessary. -// It waits for startup to complete with a timeout and returns an error if startup fails. -func (pm *PolicyManager) ensureRunning() error { - pm.mutex.Lock() - // Check if already running - if pm.isRunning { - pm.mutex.Unlock() - return nil - } - - // Check if already starting - if pm.isStarting { - pm.mutex.Unlock() - // Wait for startup to complete - select { - case err := <-pm.startupChan: - if err != nil { - return fmt.Errorf("policy startup failed: %v", err) - } - // Double-check it's actually running after receiving signal - pm.mutex.RLock() - running := pm.isRunning - pm.mutex.RUnlock() - if !running { - return fmt.Errorf("policy startup completed but process is not running") - } - return nil - case <-time.After(10 * time.Second): - return fmt.Errorf("policy startup timeout") - case <-pm.ctx.Done(): - return fmt.Errorf("policy context cancelled") - } - } - - // Mark as starting - pm.isStarting = true - pm.mutex.Unlock() - - // Start the policy in a goroutine - go func() { - err := pm.StartPolicy() - pm.mutex.Lock() - pm.isStarting = false - pm.mutex.Unlock() - // Signal startup completion (non-blocking) - // Drain any stale value first, then send - select { - case <-pm.startupChan: - default: - } - select { - case pm.startupChan <- err: - default: - // Channel should be empty now, but if it's full, try again - pm.startupChan <- err - } - }() - - // Wait for startup to complete - select { - case err := <-pm.startupChan: - if err != nil { - return fmt.Errorf("policy startup failed: %v", err) - } - // Double-check it's actually running after receiving signal - pm.mutex.RLock() - running := pm.isRunning - pm.mutex.RUnlock() - if !running { - return fmt.Errorf("policy startup completed but process is not running") - } - return nil - case <-time.After(10 * time.Second): - pm.mutex.Lock() - pm.isStarting = false - pm.mutex.Unlock() - return fmt.Errorf("policy startup timeout") - case <-pm.ctx.Done(): - pm.mutex.Lock() - pm.isStarting = false - pm.mutex.Unlock() - return fmt.Errorf("policy context cancelled") - } -} - -// StartPolicy starts the policy script process. -// Returns an error if the script doesn't exist, can't be executed, or is already running. -func (pm *PolicyManager) StartPolicy() error { - pm.mutex.Lock() - defer pm.mutex.Unlock() - - if pm.isRunning { - return fmt.Errorf("policy is already running") - } - - if _, err := os.Stat(pm.scriptPath); os.IsNotExist(err) { - return fmt.Errorf("policy script does not exist") - } - - // Create a new context for this command - cmdCtx, cmdCancel := context.WithCancel(pm.ctx) - - // Make the script executable - if err := os.Chmod(pm.scriptPath, 0755); chk.E(err) { - cmdCancel() - return fmt.Errorf("failed to make script executable: %v", err) - } - - // Start the script - cmd := exec.CommandContext(cmdCtx, pm.scriptPath) - cmd.Dir = pm.configDir - - // Set up stdio pipes for communication - stdin, err := cmd.StdinPipe() - if chk.E(err) { - cmdCancel() - return fmt.Errorf("failed to create stdin pipe: %v", err) - } - - stdout, err := cmd.StdoutPipe() - if chk.E(err) { - cmdCancel() - stdin.Close() - return fmt.Errorf("failed to create stdout pipe: %v", err) - } - - stderr, err := cmd.StderrPipe() - if chk.E(err) { - cmdCancel() - stdin.Close() - stdout.Close() - return fmt.Errorf("failed to create stderr pipe: %v", err) - } - - // Start the command - if err := cmd.Start(); chk.E(err) { - cmdCancel() - stdin.Close() - stdout.Close() - stderr.Close() - return fmt.Errorf("failed to start policy: %v", err) - } - - pm.currentCmd = cmd - pm.currentCancel = cmdCancel - pm.stdin = stdin - pm.stdout = stdout - pm.stderr = stderr - pm.isRunning = true - - // Start response reader in background - go pm.readResponses() - - // Log stderr output in background - go pm.logOutput(stdout, stderr) - - // Monitor the process - go pm.monitorProcess() - - log.I.F("policy started (pid=%d)", cmd.Process.Pid) - return nil -} - -// StopPolicy stops the policy script gracefully with SIGTERM, falling back to SIGKILL if needed. -// Returns an error if the policy is not currently running. -func (pm *PolicyManager) StopPolicy() error { - pm.mutex.Lock() - defer pm.mutex.Unlock() - - if !pm.isRunning || pm.currentCmd == nil { - return fmt.Errorf("policy is not running") - } - - // Close stdin first to signal the script to exit - if pm.stdin != nil { - pm.stdin.Close() - } - - // Cancel the context - if pm.currentCancel != nil { - pm.currentCancel() - } - - // Wait for graceful shutdown with timeout - done := make(chan error, 1) - go func() { - done <- pm.currentCmd.Wait() - }() - - select { - case <-done: - // Process exited gracefully - log.I.F("policy stopped gracefully") - case <-time.After(5 * time.Second): - // Force kill after 5 seconds - log.W.F("policy did not stop gracefully, sending SIGKILL") - if err := pm.currentCmd.Process.Kill(); chk.E(err) { - log.E.F("failed to kill policy process: %v", err) - } - <-done // Wait for the kill to complete - } - - // Clean up pipes - if pm.stdin != nil { - pm.stdin.Close() - pm.stdin = nil - } - if pm.stdout != nil { - pm.stdout.Close() - pm.stdout = nil - } - if pm.stderr != nil { - pm.stderr.Close() - pm.stderr = nil - } - - pm.isRunning = false - pm.currentCmd = nil - pm.currentCancel = nil - - return nil -} - -// ProcessEvent sends an event to the policy script and waits for a response. -// Returns the script's decision or an error if the script is not running or communication fails. -func (pm *PolicyManager) ProcessEvent(evt *PolicyEvent) (*PolicyResponse, error) { - pm.mutex.RLock() - if !pm.isRunning || pm.stdin == nil { - pm.mutex.RUnlock() - return nil, fmt.Errorf("policy is not running") - } - stdin := pm.stdin - pm.mutex.RUnlock() - - // Serialize the event to JSON - eventJSON, err := json.Marshal(evt) - if chk.E(err) { - return nil, fmt.Errorf("failed to serialize event: %v", err) - } - - // Send the event JSON to the policy script (newline-terminated for shell-readers) - if _, err := stdin.Write(append(eventJSON, '\n')); chk.E(err) { - return nil, fmt.Errorf("failed to write event to policy: %v", err) - } - - // Wait for response with timeout - select { - case response := <-pm.responseChan: - return &response, nil - case <-time.After(5 * time.Second): - return nil, fmt.Errorf("policy response timeout") - case <-pm.ctx.Done(): - return nil, fmt.Errorf("policy context cancelled") - } -} - -// readResponses reads JSONL responses from the policy script -func (pm *PolicyManager) readResponses() { - if pm.stdout == nil { - return - } - - scanner := bufio.NewScanner(pm.stdout) - for scanner.Scan() { - line := scanner.Text() - if line == "" { - continue - } - - var response PolicyResponse - if err := json.Unmarshal([]byte(line), &response); chk.E(err) { - log.E.F("failed to parse policy response: %v", err) - continue - } - - // Send response to channel (non-blocking) - select { - case pm.responseChan <- response: - default: - log.W.F("policy response channel full, dropping response") - } - } - - if err := scanner.Err(); chk.E(err) { - log.E.F("error reading policy responses: %v", err) - } -} - -// logOutput logs the output from stdout and stderr -func (pm *PolicyManager) logOutput(stdout, stderr io.ReadCloser) { - defer stderr.Close() - - // Only log stderr, stdout is used by readResponses - go func() { - io.Copy(os.Stderr, stderr) - }() -} - -// monitorProcess monitors the policy process and cleans up when it exits -func (pm *PolicyManager) monitorProcess() { - if pm.currentCmd == nil { - return - } - - err := pm.currentCmd.Wait() - - pm.mutex.Lock() - defer pm.mutex.Unlock() - - // Clean up pipes - if pm.stdin != nil { - pm.stdin.Close() - pm.stdin = nil - } - if pm.stdout != nil { - pm.stdout.Close() - pm.stdout = nil - } - if pm.stderr != nil { - pm.stderr.Close() - pm.stderr = nil - } - - pm.isRunning = false - pm.currentCmd = nil - pm.currentCancel = nil - - if err != nil { - log.E.F("policy process exited with error: %v, will retry periodically", err) - // Don't disable policy manager, let periodic check handle restart - log.W.F("policy script crashed - events will fall back to default policy until restart (script location: %s)", pm.scriptPath) - } else { - log.I.F("policy process exited normally") + log.W.F("default policy script not found at %s, will be started if it appears", pm.scriptPath) } } // IsEnabled returns whether the policy manager is enabled. -// This is set during initialization and doesn't change during runtime. func (pm *PolicyManager) IsEnabled() bool { return pm.enabled } -// IsRunning returns whether the policy script is currently running. -// This can change during runtime as scripts start, stop, or crash. +// IsRunning returns whether the default policy script is currently running. +// Deprecated: Use getOrCreateRunner(scriptPath).IsRunning() for specific scripts. func (pm *PolicyManager) IsRunning() bool { pm.mutex.RLock() defer pm.mutex.RUnlock() - return pm.isRunning + + // Check if default script runner exists and is running + if runner, exists := pm.runners[pm.scriptPath]; exists { + return runner.IsRunning() + } + return false } -// GetScriptPath returns the path to the policy script. +// GetScriptPath returns the default script path. func (pm *PolicyManager) GetScriptPath() string { return pm.scriptPath } -// Shutdown gracefully shuts down the policy manager. -// It cancels the context and stops any running policy script. +// Shutdown gracefully shuts down the policy manager and all running scripts. func (pm *PolicyManager) Shutdown() { pm.cancel() - if pm.isRunning { - pm.StopPolicy() + + pm.mutex.Lock() + defer pm.mutex.Unlock() + + // Stop all running scripts + for path, runner := range pm.runners { + if runner.IsRunning() { + log.I.F("stopping policy script: %s", path) + runner.Stop() + } + // Cancel the runner's context + runner.cancel() } + + // Clear runners map + pm.runners = make(map[string]*ScriptRunner) } diff --git a/pkg/policy/policy_test.go b/pkg/policy/policy_test.go index 2f11809..3c3fd57 100644 --- a/pkg/policy/policy_test.go +++ b/pkg/policy/policy_test.go @@ -715,12 +715,12 @@ func TestPolicyManagerLifecycle(t *testing.T) { defer cancel() manager := &PolicyManager{ - ctx: ctx, - cancel: cancel, - configDir: "/tmp", - scriptPath: "/tmp/policy.sh", - enabled: true, - responseChan: make(chan PolicyResponse, 100), + ctx: ctx, + cancel: cancel, + configDir: "/tmp", + scriptPath: "/tmp/policy.sh", + enabled: true, + runners: make(map[string]*ScriptRunner), } // Test manager state @@ -732,31 +732,37 @@ func TestPolicyManagerLifecycle(t *testing.T) { t.Error("Expected policy manager to not be running initially") } + // Test getting or creating a runner for a non-existent script + runner := manager.getOrCreateRunner("/tmp/policy.sh") + if runner == nil { + t.Fatal("Expected runner to be created") + } + // Test starting with non-existent script (should fail gracefully) - err := manager.StartPolicy() + err := runner.Start() if err == nil { - t.Error("Expected error when starting policy with non-existent script") + t.Error("Expected error when starting script with non-existent file") } // Test stopping when not running (should fail gracefully) - err = manager.StopPolicy() + err = runner.Stop() if err == nil { - t.Error("Expected error when stopping policy that's not running") + t.Error("Expected error when stopping script that's not running") } } func TestPolicyManagerProcessEvent(t *testing.T) { - // Test processing event when manager is not running (should fail gracefully) + // Test processing event when runner is not running (should fail gracefully) ctx, cancel := context.WithCancel(context.Background()) defer cancel() manager := &PolicyManager{ - ctx: ctx, - cancel: cancel, - configDir: "/tmp", - scriptPath: "/tmp/policy.sh", - enabled: true, - responseChan: make(chan PolicyResponse, 100), + ctx: ctx, + cancel: cancel, + configDir: "/tmp", + scriptPath: "/tmp/policy.sh", + enabled: true, + runners: make(map[string]*ScriptRunner), } // Generate real keypair for testing @@ -772,10 +778,13 @@ func TestPolicyManagerProcessEvent(t *testing.T) { IPAddress: "127.0.0.1", } + // Get or create a runner + runner := manager.getOrCreateRunner("/tmp/policy.sh") + // Process event when not running (should fail gracefully) - _, err := manager.ProcessEvent(policyEvent) + _, err := runner.ProcessEvent(policyEvent) if err == nil { - t.Error("Expected error when processing event with non-running policy manager") + t.Error("Expected error when processing event with non-running script") } } @@ -886,43 +895,53 @@ func TestEdgeCasesManagerWithInvalidScript(t *testing.T) { t.Fatalf("Failed to create invalid script: %v", err) } - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + manager := &PolicyManager{ - ctx: ctx, - configDir: tempDir, - scriptPath: scriptPath, - enabled: true, - responseChan: make(chan PolicyResponse, 100), + ctx: ctx, + cancel: cancel, + configDir: tempDir, + scriptPath: scriptPath, + enabled: true, + runners: make(map[string]*ScriptRunner), } - // Should fail to start with invalid script - err = manager.StartPolicy() + // Get runner and try to start with invalid script + runner := manager.getOrCreateRunner(scriptPath) + err = runner.Start() if err == nil { - t.Error("Expected error when starting policy with invalid script") + t.Error("Expected error when starting invalid script") } } func TestEdgeCasesManagerDoubleStart(t *testing.T) { // Test double start without actually starting (simpler test) - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + manager := &PolicyManager{ - ctx: ctx, - configDir: "/tmp", - scriptPath: "/tmp/policy.sh", - enabled: true, - responseChan: make(chan PolicyResponse, 100), + ctx: ctx, + cancel: cancel, + configDir: "/tmp", + scriptPath: "/tmp/policy.sh", + enabled: true, + runners: make(map[string]*ScriptRunner), } + // Get runner + runner := manager.getOrCreateRunner("/tmp/policy.sh") + // Try to start with non-existent script - should fail - err := manager.StartPolicy() + err := runner.Start() if err == nil { - t.Error("Expected error when starting policy manager with non-existent script") + t.Error("Expected error when starting script with non-existent file") } // Try to start again - should still fail - err = manager.StartPolicy() + err = runner.Start() if err == nil { - t.Error("Expected error when starting policy manager twice") + t.Error("Expected error when starting script twice") } } @@ -1150,8 +1169,8 @@ func TestScriptPolicyDisabledFallsBackToDefault(t *testing.T) { }, }, Manager: &PolicyManager{ - enabled: false, // Policy is disabled - isRunning: false, + enabled: false, // Policy is disabled + runners: make(map[string]*ScriptRunner), }, } @@ -1354,8 +1373,8 @@ func TestScriptProcessingDisabledFallsBackToDefault(t *testing.T) { }, }, Manager: &PolicyManager{ - enabled: false, // Policy is disabled - isRunning: false, + enabled: false, // Policy is disabled + runners: make(map[string]*ScriptRunner), }, } diff --git a/pkg/version/version b/pkg/version/version index 37eb51d..8cbcda9 100644 --- a/pkg/version/version +++ b/pkg/version/version @@ -1 +1 @@ -v0.27.0 \ No newline at end of file +v0.27.1 \ No newline at end of file