tortoise/main.go

223 lines
4.7 KiB
Go

package tortoise
import (
"bytes"
"context"
"errors"
"os/exec"
"sync"
"time"
)
const (
MAX_RESULTS = 100
)
// CommandResult holds the result of a shell command execution.
type CommandResult struct {
Command string
ReturnCode int
Output string
ErrOutput string
}
// ShellRunner manages shell command execution.
type ShellRunner struct {
cmdQueue chan func() *CommandResult
results chan *CommandResult
stopChan chan struct{}
running sync.WaitGroup
callbacks chan func()
shell string // Private field to store the shell
mu sync.Mutex
isStopped bool
activeCmds map[*exec.Cmd]struct{} // Track active commands for cancellation
}
// NewShellRunner creates a new ShellRunner instance with the default shell (`sh`).
func NewShellRunner() *ShellRunner {
return NewShellRunnerWithShell("sh")
}
// NewShellRunnerWithShell creates a new ShellRunner with the specified shell.
func NewShellRunnerWithShell(shell string) *ShellRunner {
return &ShellRunner{
cmdQueue: make(chan func() *CommandResult),
results: make(chan *CommandResult, MAX_RESULTS),
stopChan: make(chan struct{}),
callbacks: make(chan func()),
shell: shell,
activeCmds: make(map[*exec.Cmd]struct{}),
isStopped: true,
}
}
// Start begins processing shell commands asynchronously.
func (sr *ShellRunner) Start() {
go func() {
for {
select {
case <-sr.stopChan:
return
case cmdFunc, ok := <-sr.cmdQueue:
if !ok {
return
}
sr.running.Add(1)
go func() {
result := cmdFunc()
sr.results <- result
sr.running.Done()
}()
case callback := <-sr.callbacks:
callback()
}
}
}()
sr.isStopped = false
}
// AddCommand adds a shell command to be executed with an optional callback.
// No commands can be added if the runner has been stopped or not yet started.
// The callback is executed asynchronously after the command has completed.
// The order of command execution and callback invocation can be expected to be preserved.
func (sr *ShellRunner) AddCommand(command string, callback func(*CommandResult)) error {
sr.mu.Lock()
defer sr.mu.Unlock()
if sr.isStopped {
return errors.New("runner is stopped, cannot add new commands")
}
sr.cmdQueue <- func() *CommandResult {
result := sr.executeCommand(command)
if callback != nil {
sr.callbacks <- func() {
callback(result)
}
}
return result
}
return nil
}
// GetResults retrieves the next available command result (non-blocking).
func (sr *ShellRunner) GetResults() *CommandResult {
select {
case result := <-sr.results:
return result
default:
return nil
}
}
// Stop gracefully stops the ShellRunner, closing the command queue and waiting for all commands to finish.
func (sr *ShellRunner) Stop() {
sr.mu.Lock()
if sr.isStopped {
sr.mu.Unlock()
return
}
sr.isStopped = true
close(sr.cmdQueue) // No more commands can be added
close(sr.stopChan)
sr.mu.Unlock()
sr.running.Wait()
}
// Kill stops the ShellRunner immediately, terminating all running commands.
func (sr *ShellRunner) Kill() {
sr.mu.Lock()
if sr.isStopped {
sr.mu.Unlock()
return
}
sr.isStopped = true
close(sr.cmdQueue) // Prevent further commands
close(sr.stopChan)
// Terminate all active commands
for cmd := range sr.activeCmds {
_ = cmd.Process.Kill()
}
sr.mu.Unlock()
sr.running.Wait()
}
// KillWithTimeout attempts to stop the ShellRunner, killing commands if the duration is exceeded.
func (sr *ShellRunner) KillWithTimeout(timeout time.Duration) error {
done := make(chan struct{})
go func() {
sr.Stop()
close(done)
}()
select {
case <-done:
return nil
case <-time.After(timeout):
sr.Kill()
return errors.New("commands killed due to timeout")
}
}
// executeCommand runs a shell command asynchronously, capturing stdout, stderr, and return code.
func (sr *ShellRunner) executeCommand(command string) *CommandResult {
var outBuf, errBuf bytes.Buffer
ctx := context.Background()
cmd := exec.CommandContext(ctx, sr.shell, "-c", command)
cmd.Stdout = &outBuf
cmd.Stderr = &errBuf
// Track the active command
sr.mu.Lock()
sr.activeCmds[cmd] = struct{}{}
sr.mu.Unlock()
err := cmd.Start()
if err != nil {
return &CommandResult{
Command: command,
ReturnCode: -1,
ErrOutput: err.Error(),
}
}
err = cmd.Wait()
// Remove from active commands
sr.mu.Lock()
delete(sr.activeCmds, cmd)
sr.mu.Unlock()
result := &CommandResult{
Command: command,
Output: outBuf.String(),
ErrOutput: errBuf.String(),
}
if err != nil {
var exitErr *exec.ExitError
if errors.As(err, &exitErr) {
result.ReturnCode = exitErr.ExitCode()
} else {
result.ReturnCode = -1
}
} else {
result.ReturnCode = 0
}
return result
}