package tortoise import ( "bytes" "context" "errors" "os/exec" "sync" "time" ) const ( MAX_RESULTS = 100 ) // CommandResult holds the result of a shell command execution. type CommandResult struct { Command string ReturnCode int Output string ErrOutput string } // ShellRunner manages shell command execution. type ShellRunner struct { cmdQueue chan func() *CommandResult results chan *CommandResult stopChan chan struct{} running sync.WaitGroup callbacks chan func() shell string // Private field to store the shell mu sync.Mutex isStopped bool activeCmds map[*exec.Cmd]struct{} // Track active commands for cancellation } // NewShellRunner creates a new ShellRunner instance with the default shell (`sh`). func NewShellRunner() *ShellRunner { return NewShellRunnerWithShell("sh") } // NewShellRunnerWithShell creates a new ShellRunner with the specified shell. func NewShellRunnerWithShell(shell string) *ShellRunner { return &ShellRunner{ cmdQueue: make(chan func() *CommandResult), results: make(chan *CommandResult, MAX_RESULTS), stopChan: make(chan struct{}), callbacks: make(chan func()), shell: shell, activeCmds: make(map[*exec.Cmd]struct{}), isStopped: true, } } // Start begins processing shell commands asynchronously. func (sr *ShellRunner) Start() { go func() { for { select { case <-sr.stopChan: return case cmdFunc, ok := <-sr.cmdQueue: if !ok { return } sr.running.Add(1) go func() { result := cmdFunc() sr.results <- result sr.running.Done() }() case callback := <-sr.callbacks: callback() } } }() sr.isStopped = false } // AddCommand adds a shell command to be executed with an optional callback. // No commands can be added if the runner has been stopped or not yet started. // The callback is executed asynchronously after the command has completed. // The order of command execution and callback invocation can be expected to be preserved. func (sr *ShellRunner) AddCommand(command string, callback func(*CommandResult)) error { sr.mu.Lock() defer sr.mu.Unlock() if sr.isStopped { return errors.New("runner is stopped, cannot add new commands") } sr.cmdQueue <- func() *CommandResult { result := sr.executeCommand(command) if callback != nil { sr.callbacks <- func() { callback(result) } } return result } return nil } // GetResults retrieves the next available command result (non-blocking). func (sr *ShellRunner) GetResults() *CommandResult { select { case result := <-sr.results: return result default: return nil } } // Stop gracefully stops the ShellRunner, closing the command queue and waiting for all commands to finish. func (sr *ShellRunner) Stop() { sr.mu.Lock() if sr.isStopped { sr.mu.Unlock() return } sr.isStopped = true close(sr.cmdQueue) // No more commands can be added close(sr.stopChan) sr.mu.Unlock() sr.running.Wait() } // Kill stops the ShellRunner immediately, terminating all running commands. func (sr *ShellRunner) Kill() { sr.mu.Lock() if sr.isStopped { sr.mu.Unlock() return } sr.isStopped = true close(sr.cmdQueue) // Prevent further commands close(sr.stopChan) // Terminate all active commands for cmd := range sr.activeCmds { _ = cmd.Process.Kill() } sr.mu.Unlock() sr.running.Wait() } // KillWithTimeout attempts to stop the ShellRunner, killing commands if the duration is exceeded. func (sr *ShellRunner) KillWithTimeout(timeout time.Duration) error { done := make(chan struct{}) go func() { sr.Stop() close(done) }() select { case <-done: return nil case <-time.After(timeout): sr.Kill() return errors.New("commands killed due to timeout") } } // executeCommand runs a shell command asynchronously, capturing stdout, stderr, and return code. func (sr *ShellRunner) executeCommand(command string) *CommandResult { var outBuf, errBuf bytes.Buffer ctx := context.Background() cmd := exec.CommandContext(ctx, sr.shell, "-c", command) cmd.Stdout = &outBuf cmd.Stderr = &errBuf // Track the active command sr.mu.Lock() sr.activeCmds[cmd] = struct{}{} sr.mu.Unlock() err := cmd.Start() if err != nil { return &CommandResult{ Command: command, ReturnCode: -1, ErrOutput: err.Error(), } } err = cmd.Wait() // Remove from active commands sr.mu.Lock() delete(sr.activeCmds, cmd) sr.mu.Unlock() result := &CommandResult{ Command: command, Output: outBuf.String(), ErrOutput: errBuf.String(), } if err != nil { var exitErr *exec.ExitError if errors.As(err, &exitErr) { result.ReturnCode = exitErr.ExitCode() } else { result.ReturnCode = -1 } } else { result.ReturnCode = 0 } return result }