create concurrent script runner per rule script
Some checks failed
Go / build (push) Has been cancelled
Go / release (push) Has been cancelled

bump to v0.27.1
This commit is contained in:
2025-11-10 10:56:02 +00:00
parent 84b7c0e11c
commit 09bcbac20d
4 changed files with 551 additions and 457 deletions

View File

@@ -104,21 +104,25 @@ done
b.Fatalf("Failed to create test script: %v", err)
}
ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
manager := &PolicyManager{
ctx: ctx,
configDir: tempDir,
scriptPath: scriptPath,
enabled: true,
responseChan: make(chan PolicyResponse, 100),
ctx: ctx,
cancel: cancel,
configDir: tempDir,
scriptPath: scriptPath,
enabled: true,
runners: make(map[string]*ScriptRunner),
}
// Start the policy manager
err = manager.StartPolicy()
// Get or create runner and start it
runner := manager.getOrCreateRunner(scriptPath)
err = runner.Start()
if err != nil {
b.Fatalf("Failed to start policy: %v", err)
b.Fatalf("Failed to start policy script: %v", err)
}
defer manager.StopPolicy()
defer runner.Stop()
// Give the script time to start
time.Sleep(100 * time.Millisecond)

View File

@@ -119,10 +119,9 @@ type PolicyResponse struct {
Msg string `json:"msg"` // NIP-20 response message (only used for reject)
}
// PolicyManager handles policy script execution and management.
// It manages the lifecycle of policy scripts, handles communication with them,
// and provides resilient operation with automatic restart capabilities.
type PolicyManager struct {
// ScriptRunner manages a single policy script process.
// Each unique script path gets its own independent runner with its own goroutine.
type ScriptRunner struct {
ctx context.Context
cancel context.CancelFunc
configDir string
@@ -132,7 +131,6 @@ type PolicyManager struct {
mutex sync.RWMutex
isRunning bool
isStarting bool
enabled bool
stdin io.WriteCloser
stdout io.ReadCloser
stderr io.ReadCloser
@@ -140,6 +138,20 @@ type PolicyManager struct {
startupChan chan error
}
// PolicyManager handles multiple policy script runners.
// It manages the lifecycle of policy scripts, handles communication with them,
// and provides resilient operation with automatic restart capabilities.
// Each unique script path gets its own ScriptRunner instance.
type PolicyManager struct {
ctx context.Context
cancel context.CancelFunc
configDir string
scriptPath string // Default script path for backward compatibility
enabled bool
mutex sync.RWMutex
runners map[string]*ScriptRunner // Map of script path -> runner
}
// P represents a complete policy configuration for a Nostr relay.
// It defines access control rules, kind filtering, and default behavior.
// Policies are evaluated in order: global rules, kind filtering, specific rules, then default policy.
@@ -199,13 +211,12 @@ func NewWithManager(ctx context.Context, appName string, enabled bool) *P {
ctx, cancel := context.WithCancel(ctx)
manager := &PolicyManager{
ctx: ctx,
cancel: cancel,
configDir: configDir,
scriptPath: scriptPath,
enabled: enabled,
responseChan: make(chan PolicyResponse, 100), // Buffered channel for responses
startupChan: make(chan error, 1), // Channel for startup completion
ctx: ctx,
cancel: cancel,
configDir: configDir,
scriptPath: scriptPath,
enabled: enabled,
runners: make(map[string]*ScriptRunner),
}
// Load policy configuration from JSON file
@@ -231,6 +242,406 @@ func NewWithManager(ctx context.Context, appName string, enabled bool) *P {
return policy
}
// getOrCreateRunner gets an existing runner for the script path or creates a new one.
// This method is thread-safe and ensures only one runner exists per unique script path.
func (pm *PolicyManager) getOrCreateRunner(scriptPath string) *ScriptRunner {
pm.mutex.Lock()
defer pm.mutex.Unlock()
// Check if runner already exists
if runner, exists := pm.runners[scriptPath]; exists {
return runner
}
// Create new runner
runnerCtx, runnerCancel := context.WithCancel(pm.ctx)
runner := &ScriptRunner{
ctx: runnerCtx,
cancel: runnerCancel,
configDir: pm.configDir,
scriptPath: scriptPath,
responseChan: make(chan PolicyResponse, 100),
startupChan: make(chan error, 1),
}
pm.runners[scriptPath] = runner
// Start periodic check for this runner
go runner.periodicCheck()
return runner
}
// ScriptRunner methods
// IsRunning returns whether the script is currently running.
func (sr *ScriptRunner) IsRunning() bool {
sr.mutex.RLock()
defer sr.mutex.RUnlock()
return sr.isRunning
}
// ensureRunning ensures the script is running, starting it if necessary.
func (sr *ScriptRunner) ensureRunning() error {
sr.mutex.Lock()
// Check if already running
if sr.isRunning {
sr.mutex.Unlock()
return nil
}
// Check if already starting
if sr.isStarting {
sr.mutex.Unlock()
// Wait for startup to complete
select {
case err := <-sr.startupChan:
if err != nil {
return fmt.Errorf("script startup failed: %v", err)
}
// Double-check it's actually running after receiving signal
sr.mutex.RLock()
running := sr.isRunning
sr.mutex.RUnlock()
if !running {
return fmt.Errorf("script startup completed but process is not running")
}
return nil
case <-time.After(10 * time.Second):
return fmt.Errorf("script startup timeout")
case <-sr.ctx.Done():
return fmt.Errorf("script context cancelled")
}
}
// Mark as starting
sr.isStarting = true
sr.mutex.Unlock()
// Start the script in a goroutine
go func() {
err := sr.Start()
sr.mutex.Lock()
sr.isStarting = false
sr.mutex.Unlock()
// Signal startup completion (non-blocking)
// Drain any stale value first, then send
select {
case <-sr.startupChan:
default:
}
select {
case sr.startupChan <- err:
default:
// Channel should be empty now, but if it's full, try again
sr.startupChan <- err
}
}()
// Wait for startup to complete
select {
case err := <-sr.startupChan:
if err != nil {
return fmt.Errorf("script startup failed: %v", err)
}
// Double-check it's actually running after receiving signal
sr.mutex.RLock()
running := sr.isRunning
sr.mutex.RUnlock()
if !running {
return fmt.Errorf("script startup completed but process is not running")
}
return nil
case <-time.After(10 * time.Second):
sr.mutex.Lock()
sr.isStarting = false
sr.mutex.Unlock()
return fmt.Errorf("script startup timeout")
case <-sr.ctx.Done():
sr.mutex.Lock()
sr.isStarting = false
sr.mutex.Unlock()
return fmt.Errorf("script context cancelled")
}
}
// Start starts the script process.
func (sr *ScriptRunner) Start() error {
sr.mutex.Lock()
defer sr.mutex.Unlock()
if sr.isRunning {
return fmt.Errorf("script is already running")
}
if _, err := os.Stat(sr.scriptPath); os.IsNotExist(err) {
return fmt.Errorf("script does not exist at %s", sr.scriptPath)
}
// Create a new context for this command
cmdCtx, cmdCancel := context.WithCancel(sr.ctx)
// Make the script executable
if err := os.Chmod(sr.scriptPath, 0755); chk.E(err) {
cmdCancel()
return fmt.Errorf("failed to make script executable: %v", err)
}
// Start the script
cmd := exec.CommandContext(cmdCtx, sr.scriptPath)
cmd.Dir = sr.configDir
// Set up stdio pipes for communication
stdin, err := cmd.StdinPipe()
if chk.E(err) {
cmdCancel()
return fmt.Errorf("failed to create stdin pipe: %v", err)
}
stdout, err := cmd.StdoutPipe()
if chk.E(err) {
cmdCancel()
stdin.Close()
return fmt.Errorf("failed to create stdout pipe: %v", err)
}
stderr, err := cmd.StderrPipe()
if chk.E(err) {
cmdCancel()
stdin.Close()
stdout.Close()
return fmt.Errorf("failed to create stderr pipe: %v", err)
}
// Start the command
if err := cmd.Start(); chk.E(err) {
cmdCancel()
stdin.Close()
stdout.Close()
stderr.Close()
return fmt.Errorf("failed to start script: %v", err)
}
sr.currentCmd = cmd
sr.currentCancel = cmdCancel
sr.stdin = stdin
sr.stdout = stdout
sr.stderr = stderr
sr.isRunning = true
// Start response reader in background
go sr.readResponses()
// Log stderr output in background
go sr.logOutput(stdout, stderr)
// Monitor the process
go sr.monitorProcess()
log.I.F("policy script started: %s (pid=%d)", sr.scriptPath, cmd.Process.Pid)
return nil
}
// Stop stops the script gracefully.
func (sr *ScriptRunner) Stop() error {
sr.mutex.Lock()
defer sr.mutex.Unlock()
if !sr.isRunning || sr.currentCmd == nil {
return fmt.Errorf("script is not running")
}
// Close stdin first to signal the script to exit
if sr.stdin != nil {
sr.stdin.Close()
}
// Cancel the context
if sr.currentCancel != nil {
sr.currentCancel()
}
// Wait for graceful shutdown with timeout
done := make(chan error, 1)
go func() {
done <- sr.currentCmd.Wait()
}()
select {
case <-done:
// Process exited gracefully
log.I.F("policy script stopped: %s", sr.scriptPath)
case <-time.After(5 * time.Second):
// Force kill after 5 seconds
log.W.F("policy script did not stop gracefully, sending SIGKILL: %s", sr.scriptPath)
if err := sr.currentCmd.Process.Kill(); chk.E(err) {
log.E.F("failed to kill script process: %v", err)
}
<-done // Wait for the kill to complete
}
// Clean up pipes
if sr.stdin != nil {
sr.stdin.Close()
sr.stdin = nil
}
if sr.stdout != nil {
sr.stdout.Close()
sr.stdout = nil
}
if sr.stderr != nil {
sr.stderr.Close()
sr.stderr = nil
}
sr.isRunning = false
sr.currentCmd = nil
sr.currentCancel = nil
return nil
}
// ProcessEvent sends an event to the script and waits for a response.
func (sr *ScriptRunner) ProcessEvent(evt *PolicyEvent) (*PolicyResponse, error) {
sr.mutex.RLock()
if !sr.isRunning || sr.stdin == nil {
sr.mutex.RUnlock()
return nil, fmt.Errorf("script is not running")
}
stdin := sr.stdin
sr.mutex.RUnlock()
// Serialize the event to JSON
eventJSON, err := json.Marshal(evt)
if chk.E(err) {
return nil, fmt.Errorf("failed to serialize event: %v", err)
}
// Send the event JSON to the script (newline-terminated)
if _, err := stdin.Write(append(eventJSON, '\n')); chk.E(err) {
return nil, fmt.Errorf("failed to write event to script: %v", err)
}
// Wait for response with timeout
select {
case response := <-sr.responseChan:
return &response, nil
case <-time.After(5 * time.Second):
return nil, fmt.Errorf("script response timeout")
case <-sr.ctx.Done():
return nil, fmt.Errorf("script context cancelled")
}
}
// readResponses reads JSONL responses from the script
func (sr *ScriptRunner) readResponses() {
if sr.stdout == nil {
return
}
scanner := bufio.NewScanner(sr.stdout)
for scanner.Scan() {
line := scanner.Text()
if line == "" {
continue
}
var response PolicyResponse
if err := json.Unmarshal([]byte(line), &response); chk.E(err) {
log.E.F("failed to parse policy response from %s: %v", sr.scriptPath, err)
continue
}
// Send response to channel (non-blocking)
select {
case sr.responseChan <- response:
default:
log.W.F("policy response channel full for %s, dropping response", sr.scriptPath)
}
}
if err := scanner.Err(); chk.E(err) {
log.E.F("error reading policy responses from %s: %v", sr.scriptPath, err)
}
}
// logOutput logs the output from stderr
func (sr *ScriptRunner) logOutput(stdout, stderr io.ReadCloser) {
defer stderr.Close()
// Only log stderr, stdout is used by readResponses
go func() {
io.Copy(os.Stderr, stderr)
}()
}
// monitorProcess monitors the script process and cleans up when it exits
func (sr *ScriptRunner) monitorProcess() {
if sr.currentCmd == nil {
return
}
err := sr.currentCmd.Wait()
sr.mutex.Lock()
defer sr.mutex.Unlock()
// Clean up pipes
if sr.stdin != nil {
sr.stdin.Close()
sr.stdin = nil
}
if sr.stdout != nil {
sr.stdout.Close()
sr.stdout = nil
}
if sr.stderr != nil {
sr.stderr.Close()
sr.stderr = nil
}
sr.isRunning = false
sr.currentCmd = nil
sr.currentCancel = nil
if err != nil {
log.E.F("policy script exited with error: %s: %v, will retry periodically", sr.scriptPath, err)
} else {
log.I.F("policy script exited normally: %s", sr.scriptPath)
}
}
// periodicCheck periodically checks if script becomes available and attempts to restart failed scripts.
func (sr *ScriptRunner) periodicCheck() {
ticker := time.NewTicker(60 * time.Second)
defer ticker.Stop()
for {
select {
case <-sr.ctx.Done():
return
case <-ticker.C:
sr.mutex.RLock()
running := sr.isRunning
sr.mutex.RUnlock()
// Check if script is not running and try to start it
if !running {
if _, err := os.Stat(sr.scriptPath); err == nil {
// Script exists but not running, try to start
go func() {
if err := sr.Start(); err != nil {
log.E.F("failed to restart policy script %s: %v, will retry in next cycle", sr.scriptPath, err)
} else {
log.I.F("policy script restarted successfully: %s", sr.scriptPath)
}
}()
}
}
}
}
}
// LoadFromFile loads policy configuration from a JSON file.
// Returns an error if the file doesn't exist, can't be read, or contains invalid JSON.
func (p *P) LoadFromFile(configPath string) error {
@@ -285,7 +696,7 @@ func (p *P) CheckPolicy(access string, ev *event.E, loggedInPubkey []byte, ipAdd
if rule.Script != "" && p.Manager != nil {
if p.Manager.IsEnabled() {
// Check if script file exists before trying to use it
if _, err := os.Stat(p.Manager.GetScriptPath()); err == nil {
if _, err := os.Stat(rule.Script); err == nil {
// Script exists, try to use it
allowed, err := p.checkScriptPolicy(access, ev, rule.Script, loggedInPubkey, ipAddress)
if err == nil {
@@ -482,16 +893,19 @@ func (p *P) checkScriptPolicy(access string, ev *event.E, scriptPath string, log
return p.getDefaultPolicyAction(), nil
}
// Policy is enabled, check if it's running
if !p.Manager.IsRunning() {
// Check if script file exists
if _, err := os.Stat(p.Manager.GetScriptPath()); os.IsNotExist(err) {
// Script doesn't exist, return error so caller can fall back to other criteria
return false, fmt.Errorf("policy script does not exist at %s", p.Manager.GetScriptPath())
}
// Check if script file exists
if _, err := os.Stat(scriptPath); os.IsNotExist(err) {
// Script doesn't exist, return error so caller can fall back to other criteria
return false, fmt.Errorf("policy script does not exist at %s", scriptPath)
}
// Try to start the policy and wait for it
if err := p.Manager.ensureRunning(); err != nil {
// Get or create a runner for this specific script path
runner := p.Manager.getOrCreateRunner(scriptPath)
// Policy is enabled, check if this runner is running
if !runner.IsRunning() {
// Try to start this runner and wait for it
if err := runner.ensureRunning(); err != nil {
// Startup failed, return error so caller can fall back to other criteria
return false, fmt.Errorf("failed to start policy script: %v", err)
}
@@ -505,7 +919,7 @@ func (p *P) checkScriptPolicy(access string, ev *event.E, scriptPath string, log
}
// Process event through policy script
response, scriptErr := p.Manager.ProcessEvent(policyEvent)
response, scriptErr := runner.ProcessEvent(policyEvent)
if chk.E(scriptErr) {
log.E.F("policy rule for kind %d failed (script processing error: %v), falling back to default policy (%s)", ev.Kind, scriptErr, p.DefaultPolicy)
// Fall back to default policy on script failure
@@ -527,413 +941,70 @@ func (p *P) checkScriptPolicy(access string, ev *event.E, scriptPath string, log
}
}
// PolicyManager methods (similar to SprocketManager)
// PolicyManager methods
// periodicCheck periodically checks if policy script becomes available and attempts to restart failed scripts.
// Runs every 60 seconds (1 minute) to provide resilient script management.
// periodicCheck periodically checks if the default policy script becomes available.
// This is for backward compatibility with the default script path.
func (pm *PolicyManager) periodicCheck() {
ticker := time.NewTicker(60 * time.Second) // Check every 60 seconds (1 minute)
defer ticker.Stop()
for {
select {
case <-pm.ctx.Done():
return
case <-ticker.C:
pm.mutex.RLock()
running := pm.isRunning
pm.mutex.RUnlock()
// Check if policy script is not running and try to start it
if !running {
if _, err := os.Stat(pm.scriptPath); err == nil {
// Script exists but policy isn't running, try to start
go func() {
if err := pm.StartPolicy(); err != nil {
log.E.F("failed to restart policy: %v, will retry in next cycle", err)
} else {
log.I.F("policy restarted successfully")
}
}()
}
}
}
}
// Get or create runner for the default script path
// This will also start its own periodic check
pm.getOrCreateRunner(pm.scriptPath)
}
// startPolicyIfExists starts the policy script if the file exists
// startPolicyIfExists starts the default policy script if the file exists.
// This is for backward compatibility with the default script path.
func (pm *PolicyManager) startPolicyIfExists() {
if _, err := os.Stat(pm.scriptPath); err == nil {
if err := pm.StartPolicy(); err != nil {
log.E.F("failed to start policy: %v, will retry periodically", err)
// Don't disable policy manager, just log the error and let periodic check retry
// Get or create runner for the default script, which will start it
runner := pm.getOrCreateRunner(pm.scriptPath)
if err := runner.Start(); err != nil {
log.E.F("failed to start default policy script: %v, will retry periodically", err)
}
} else {
log.W.F("policy script not found at %s, will retry periodically", pm.scriptPath)
// Don't disable policy manager, just log and let periodic check retry
}
}
// ensureRunning ensures the policy is running, starting it if necessary.
// It waits for startup to complete with a timeout and returns an error if startup fails.
func (pm *PolicyManager) ensureRunning() error {
pm.mutex.Lock()
// Check if already running
if pm.isRunning {
pm.mutex.Unlock()
return nil
}
// Check if already starting
if pm.isStarting {
pm.mutex.Unlock()
// Wait for startup to complete
select {
case err := <-pm.startupChan:
if err != nil {
return fmt.Errorf("policy startup failed: %v", err)
}
// Double-check it's actually running after receiving signal
pm.mutex.RLock()
running := pm.isRunning
pm.mutex.RUnlock()
if !running {
return fmt.Errorf("policy startup completed but process is not running")
}
return nil
case <-time.After(10 * time.Second):
return fmt.Errorf("policy startup timeout")
case <-pm.ctx.Done():
return fmt.Errorf("policy context cancelled")
}
}
// Mark as starting
pm.isStarting = true
pm.mutex.Unlock()
// Start the policy in a goroutine
go func() {
err := pm.StartPolicy()
pm.mutex.Lock()
pm.isStarting = false
pm.mutex.Unlock()
// Signal startup completion (non-blocking)
// Drain any stale value first, then send
select {
case <-pm.startupChan:
default:
}
select {
case pm.startupChan <- err:
default:
// Channel should be empty now, but if it's full, try again
pm.startupChan <- err
}
}()
// Wait for startup to complete
select {
case err := <-pm.startupChan:
if err != nil {
return fmt.Errorf("policy startup failed: %v", err)
}
// Double-check it's actually running after receiving signal
pm.mutex.RLock()
running := pm.isRunning
pm.mutex.RUnlock()
if !running {
return fmt.Errorf("policy startup completed but process is not running")
}
return nil
case <-time.After(10 * time.Second):
pm.mutex.Lock()
pm.isStarting = false
pm.mutex.Unlock()
return fmt.Errorf("policy startup timeout")
case <-pm.ctx.Done():
pm.mutex.Lock()
pm.isStarting = false
pm.mutex.Unlock()
return fmt.Errorf("policy context cancelled")
}
}
// StartPolicy starts the policy script process.
// Returns an error if the script doesn't exist, can't be executed, or is already running.
func (pm *PolicyManager) StartPolicy() error {
pm.mutex.Lock()
defer pm.mutex.Unlock()
if pm.isRunning {
return fmt.Errorf("policy is already running")
}
if _, err := os.Stat(pm.scriptPath); os.IsNotExist(err) {
return fmt.Errorf("policy script does not exist")
}
// Create a new context for this command
cmdCtx, cmdCancel := context.WithCancel(pm.ctx)
// Make the script executable
if err := os.Chmod(pm.scriptPath, 0755); chk.E(err) {
cmdCancel()
return fmt.Errorf("failed to make script executable: %v", err)
}
// Start the script
cmd := exec.CommandContext(cmdCtx, pm.scriptPath)
cmd.Dir = pm.configDir
// Set up stdio pipes for communication
stdin, err := cmd.StdinPipe()
if chk.E(err) {
cmdCancel()
return fmt.Errorf("failed to create stdin pipe: %v", err)
}
stdout, err := cmd.StdoutPipe()
if chk.E(err) {
cmdCancel()
stdin.Close()
return fmt.Errorf("failed to create stdout pipe: %v", err)
}
stderr, err := cmd.StderrPipe()
if chk.E(err) {
cmdCancel()
stdin.Close()
stdout.Close()
return fmt.Errorf("failed to create stderr pipe: %v", err)
}
// Start the command
if err := cmd.Start(); chk.E(err) {
cmdCancel()
stdin.Close()
stdout.Close()
stderr.Close()
return fmt.Errorf("failed to start policy: %v", err)
}
pm.currentCmd = cmd
pm.currentCancel = cmdCancel
pm.stdin = stdin
pm.stdout = stdout
pm.stderr = stderr
pm.isRunning = true
// Start response reader in background
go pm.readResponses()
// Log stderr output in background
go pm.logOutput(stdout, stderr)
// Monitor the process
go pm.monitorProcess()
log.I.F("policy started (pid=%d)", cmd.Process.Pid)
return nil
}
// StopPolicy stops the policy script gracefully with SIGTERM, falling back to SIGKILL if needed.
// Returns an error if the policy is not currently running.
func (pm *PolicyManager) StopPolicy() error {
pm.mutex.Lock()
defer pm.mutex.Unlock()
if !pm.isRunning || pm.currentCmd == nil {
return fmt.Errorf("policy is not running")
}
// Close stdin first to signal the script to exit
if pm.stdin != nil {
pm.stdin.Close()
}
// Cancel the context
if pm.currentCancel != nil {
pm.currentCancel()
}
// Wait for graceful shutdown with timeout
done := make(chan error, 1)
go func() {
done <- pm.currentCmd.Wait()
}()
select {
case <-done:
// Process exited gracefully
log.I.F("policy stopped gracefully")
case <-time.After(5 * time.Second):
// Force kill after 5 seconds
log.W.F("policy did not stop gracefully, sending SIGKILL")
if err := pm.currentCmd.Process.Kill(); chk.E(err) {
log.E.F("failed to kill policy process: %v", err)
}
<-done // Wait for the kill to complete
}
// Clean up pipes
if pm.stdin != nil {
pm.stdin.Close()
pm.stdin = nil
}
if pm.stdout != nil {
pm.stdout.Close()
pm.stdout = nil
}
if pm.stderr != nil {
pm.stderr.Close()
pm.stderr = nil
}
pm.isRunning = false
pm.currentCmd = nil
pm.currentCancel = nil
return nil
}
// ProcessEvent sends an event to the policy script and waits for a response.
// Returns the script's decision or an error if the script is not running or communication fails.
func (pm *PolicyManager) ProcessEvent(evt *PolicyEvent) (*PolicyResponse, error) {
pm.mutex.RLock()
if !pm.isRunning || pm.stdin == nil {
pm.mutex.RUnlock()
return nil, fmt.Errorf("policy is not running")
}
stdin := pm.stdin
pm.mutex.RUnlock()
// Serialize the event to JSON
eventJSON, err := json.Marshal(evt)
if chk.E(err) {
return nil, fmt.Errorf("failed to serialize event: %v", err)
}
// Send the event JSON to the policy script (newline-terminated for shell-readers)
if _, err := stdin.Write(append(eventJSON, '\n')); chk.E(err) {
return nil, fmt.Errorf("failed to write event to policy: %v", err)
}
// Wait for response with timeout
select {
case response := <-pm.responseChan:
return &response, nil
case <-time.After(5 * time.Second):
return nil, fmt.Errorf("policy response timeout")
case <-pm.ctx.Done():
return nil, fmt.Errorf("policy context cancelled")
}
}
// readResponses reads JSONL responses from the policy script
func (pm *PolicyManager) readResponses() {
if pm.stdout == nil {
return
}
scanner := bufio.NewScanner(pm.stdout)
for scanner.Scan() {
line := scanner.Text()
if line == "" {
continue
}
var response PolicyResponse
if err := json.Unmarshal([]byte(line), &response); chk.E(err) {
log.E.F("failed to parse policy response: %v", err)
continue
}
// Send response to channel (non-blocking)
select {
case pm.responseChan <- response:
default:
log.W.F("policy response channel full, dropping response")
}
}
if err := scanner.Err(); chk.E(err) {
log.E.F("error reading policy responses: %v", err)
}
}
// logOutput logs the output from stdout and stderr
func (pm *PolicyManager) logOutput(stdout, stderr io.ReadCloser) {
defer stderr.Close()
// Only log stderr, stdout is used by readResponses
go func() {
io.Copy(os.Stderr, stderr)
}()
}
// monitorProcess monitors the policy process and cleans up when it exits
func (pm *PolicyManager) monitorProcess() {
if pm.currentCmd == nil {
return
}
err := pm.currentCmd.Wait()
pm.mutex.Lock()
defer pm.mutex.Unlock()
// Clean up pipes
if pm.stdin != nil {
pm.stdin.Close()
pm.stdin = nil
}
if pm.stdout != nil {
pm.stdout.Close()
pm.stdout = nil
}
if pm.stderr != nil {
pm.stderr.Close()
pm.stderr = nil
}
pm.isRunning = false
pm.currentCmd = nil
pm.currentCancel = nil
if err != nil {
log.E.F("policy process exited with error: %v, will retry periodically", err)
// Don't disable policy manager, let periodic check handle restart
log.W.F("policy script crashed - events will fall back to default policy until restart (script location: %s)", pm.scriptPath)
} else {
log.I.F("policy process exited normally")
log.W.F("default policy script not found at %s, will be started if it appears", pm.scriptPath)
}
}
// IsEnabled returns whether the policy manager is enabled.
// This is set during initialization and doesn't change during runtime.
func (pm *PolicyManager) IsEnabled() bool {
return pm.enabled
}
// IsRunning returns whether the policy script is currently running.
// This can change during runtime as scripts start, stop, or crash.
// IsRunning returns whether the default policy script is currently running.
// Deprecated: Use getOrCreateRunner(scriptPath).IsRunning() for specific scripts.
func (pm *PolicyManager) IsRunning() bool {
pm.mutex.RLock()
defer pm.mutex.RUnlock()
return pm.isRunning
// Check if default script runner exists and is running
if runner, exists := pm.runners[pm.scriptPath]; exists {
return runner.IsRunning()
}
return false
}
// GetScriptPath returns the path to the policy script.
// GetScriptPath returns the default script path.
func (pm *PolicyManager) GetScriptPath() string {
return pm.scriptPath
}
// Shutdown gracefully shuts down the policy manager.
// It cancels the context and stops any running policy script.
// Shutdown gracefully shuts down the policy manager and all running scripts.
func (pm *PolicyManager) Shutdown() {
pm.cancel()
if pm.isRunning {
pm.StopPolicy()
pm.mutex.Lock()
defer pm.mutex.Unlock()
// Stop all running scripts
for path, runner := range pm.runners {
if runner.IsRunning() {
log.I.F("stopping policy script: %s", path)
runner.Stop()
}
// Cancel the runner's context
runner.cancel()
}
// Clear runners map
pm.runners = make(map[string]*ScriptRunner)
}

View File

@@ -715,12 +715,12 @@ func TestPolicyManagerLifecycle(t *testing.T) {
defer cancel()
manager := &PolicyManager{
ctx: ctx,
cancel: cancel,
configDir: "/tmp",
scriptPath: "/tmp/policy.sh",
enabled: true,
responseChan: make(chan PolicyResponse, 100),
ctx: ctx,
cancel: cancel,
configDir: "/tmp",
scriptPath: "/tmp/policy.sh",
enabled: true,
runners: make(map[string]*ScriptRunner),
}
// Test manager state
@@ -732,31 +732,37 @@ func TestPolicyManagerLifecycle(t *testing.T) {
t.Error("Expected policy manager to not be running initially")
}
// Test getting or creating a runner for a non-existent script
runner := manager.getOrCreateRunner("/tmp/policy.sh")
if runner == nil {
t.Fatal("Expected runner to be created")
}
// Test starting with non-existent script (should fail gracefully)
err := manager.StartPolicy()
err := runner.Start()
if err == nil {
t.Error("Expected error when starting policy with non-existent script")
t.Error("Expected error when starting script with non-existent file")
}
// Test stopping when not running (should fail gracefully)
err = manager.StopPolicy()
err = runner.Stop()
if err == nil {
t.Error("Expected error when stopping policy that's not running")
t.Error("Expected error when stopping script that's not running")
}
}
func TestPolicyManagerProcessEvent(t *testing.T) {
// Test processing event when manager is not running (should fail gracefully)
// Test processing event when runner is not running (should fail gracefully)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
manager := &PolicyManager{
ctx: ctx,
cancel: cancel,
configDir: "/tmp",
scriptPath: "/tmp/policy.sh",
enabled: true,
responseChan: make(chan PolicyResponse, 100),
ctx: ctx,
cancel: cancel,
configDir: "/tmp",
scriptPath: "/tmp/policy.sh",
enabled: true,
runners: make(map[string]*ScriptRunner),
}
// Generate real keypair for testing
@@ -772,10 +778,13 @@ func TestPolicyManagerProcessEvent(t *testing.T) {
IPAddress: "127.0.0.1",
}
// Get or create a runner
runner := manager.getOrCreateRunner("/tmp/policy.sh")
// Process event when not running (should fail gracefully)
_, err := manager.ProcessEvent(policyEvent)
_, err := runner.ProcessEvent(policyEvent)
if err == nil {
t.Error("Expected error when processing event with non-running policy manager")
t.Error("Expected error when processing event with non-running script")
}
}
@@ -886,43 +895,53 @@ func TestEdgeCasesManagerWithInvalidScript(t *testing.T) {
t.Fatalf("Failed to create invalid script: %v", err)
}
ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
manager := &PolicyManager{
ctx: ctx,
configDir: tempDir,
scriptPath: scriptPath,
enabled: true,
responseChan: make(chan PolicyResponse, 100),
ctx: ctx,
cancel: cancel,
configDir: tempDir,
scriptPath: scriptPath,
enabled: true,
runners: make(map[string]*ScriptRunner),
}
// Should fail to start with invalid script
err = manager.StartPolicy()
// Get runner and try to start with invalid script
runner := manager.getOrCreateRunner(scriptPath)
err = runner.Start()
if err == nil {
t.Error("Expected error when starting policy with invalid script")
t.Error("Expected error when starting invalid script")
}
}
func TestEdgeCasesManagerDoubleStart(t *testing.T) {
// Test double start without actually starting (simpler test)
ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
manager := &PolicyManager{
ctx: ctx,
configDir: "/tmp",
scriptPath: "/tmp/policy.sh",
enabled: true,
responseChan: make(chan PolicyResponse, 100),
ctx: ctx,
cancel: cancel,
configDir: "/tmp",
scriptPath: "/tmp/policy.sh",
enabled: true,
runners: make(map[string]*ScriptRunner),
}
// Get runner
runner := manager.getOrCreateRunner("/tmp/policy.sh")
// Try to start with non-existent script - should fail
err := manager.StartPolicy()
err := runner.Start()
if err == nil {
t.Error("Expected error when starting policy manager with non-existent script")
t.Error("Expected error when starting script with non-existent file")
}
// Try to start again - should still fail
err = manager.StartPolicy()
err = runner.Start()
if err == nil {
t.Error("Expected error when starting policy manager twice")
t.Error("Expected error when starting script twice")
}
}
@@ -1150,8 +1169,8 @@ func TestScriptPolicyDisabledFallsBackToDefault(t *testing.T) {
},
},
Manager: &PolicyManager{
enabled: false, // Policy is disabled
isRunning: false,
enabled: false, // Policy is disabled
runners: make(map[string]*ScriptRunner),
},
}
@@ -1354,8 +1373,8 @@ func TestScriptProcessingDisabledFallsBackToDefault(t *testing.T) {
},
},
Manager: &PolicyManager{
enabled: false, // Policy is disabled
isRunning: false,
enabled: false, // Policy is disabled
runners: make(map[string]*ScriptRunner),
},
}