tortoise/main.go

243 lines
5.2 KiB
Go

package tortoise
import (
"bytes"
"context"
"errors"
"os/exec"
"sync"
"time"
)
const (
MAX_RESULTS = 100
)
// CommandResult holds the result of a shell command execution.
type CommandResult struct {
Command string
ReturnCode int
Output string
ErrOutput string
}
// ShellRunner manages shell command execution.
type ShellRunner struct {
cmdQueue chan func() *CommandResult
results chan *CommandResult
stopChan chan struct{}
running sync.WaitGroup
callbacks chan func()
shell string // Private field to store the shell
mu sync.Mutex
isStopped bool
activeCmds map[*exec.Cmd]struct{} // Track active commands for cancellation
timeout time.Duration
}
// NewShellRunner creates a new ShellRunner instance with the default shell (`sh`).
func NewShellRunner() *ShellRunner {
return NewShellRunnerWithShell("sh")
}
// NewShellRunnerWithShell creates a new ShellRunner with the specified shell.
func NewShellRunnerWithShell(shell string) *ShellRunner {
return &ShellRunner{
cmdQueue: make(chan func() *CommandResult),
results: make(chan *CommandResult, MAX_RESULTS),
stopChan: make(chan struct{}),
callbacks: make(chan func()),
shell: shell,
activeCmds: make(map[*exec.Cmd]struct{}),
isStopped: true,
timeout: 0,
}
}
func (sr *ShellRunner) SetTimeout(timeout time.Duration) {
sr.timeout = timeout
}
// Start begins processing shell commands asynchronously.
func (sr *ShellRunner) Start() {
go func() {
for {
select {
case <-sr.stopChan:
return
case cmdFunc, ok := <-sr.cmdQueue:
if !ok {
return
}
sr.running.Add(1)
go func() {
result := cmdFunc()
sr.results <- result
sr.running.Done()
}()
case callback := <-sr.callbacks:
callback()
}
}
}()
sr.isStopped = false
}
// AddCommand adds a shell command to be executed with an optional callback.
// No commands can be added if the runner has been stopped or not yet started.
// The callback is executed asynchronously after the command has completed.
// The order of command execution and callback invocation can be expected to be preserved.
func (sr *ShellRunner) AddCommand(command string, callback func(*CommandResult)) error {
cmd, cancel := sr.newShellCommand(command)
return sr.AddCmd(cmd, callback, cancel)
}
func (sr *ShellRunner) AddCmd(cmd *exec.Cmd, callback func(*CommandResult), cancel context.CancelFunc) error {
sr.mu.Lock()
defer sr.mu.Unlock()
if sr.isStopped {
return errors.New("runner is stopped, cannot add new commands")
}
sr.cmdQueue <- func() *CommandResult {
result := sr.executeCommand(cmd, cancel)
if callback != nil {
sr.running.Add(1)
sr.callbacks <- func() {
callback(result)
sr.running.Done()
}
}
return result
}
return nil
}
// GetResults retrieves the next available command result (non-blocking).
func (sr *ShellRunner) GetResults() *CommandResult {
select {
case result := <-sr.results:
return result
default:
return nil
}
}
// Stop gracefully stops the ShellRunner, closing the command queue and waiting for all commands to finish.
func (sr *ShellRunner) Stop() {
sr.mu.Lock()
if sr.isStopped {
sr.mu.Unlock()
return
}
sr.isStopped = true
close(sr.cmdQueue) // No more commands can be added
close(sr.stopChan)
sr.mu.Unlock()
sr.running.Wait()
}
// Kill stops the ShellRunner immediately, terminating all running commands.
func (sr *ShellRunner) Kill() {
sr.mu.Lock()
// Terminate all active commands
for cmd := range sr.activeCmds {
_ = cmd.Process.Kill()
}
sr.mu.Unlock()
sr.Stop()
}
// KillWithTimeout attempts to stop the ShellRunner, killing commands if the duration is exceeded.
func (sr *ShellRunner) KillWithTimeout(timeout time.Duration) error {
done := make(chan struct{})
go func() {
sr.Stop()
close(done)
}()
select {
case <-done:
return nil
case <-time.After(timeout):
sr.Kill()
return errors.New("commands killed due to timeout")
}
}
func (sr *ShellRunner) newShellCommand(command string) (*exec.Cmd, context.CancelFunc) {
var ctx context.Context
var cancel context.CancelFunc
if sr.timeout > 0 {
ctx, cancel = context.WithTimeout(context.Background(), sr.timeout)
} else {
ctx = context.Background()
}
return exec.CommandContext(ctx, sr.shell, "-c", command), cancel
}
// executeCommand runs a shell command asynchronously, capturing stdout, stderr, and return code.
func (sr *ShellRunner) executeCommand(cmd *exec.Cmd, cancel context.CancelFunc) *CommandResult {
if cancel != nil {
defer cancel()
}
var outBuf, errBuf bytes.Buffer
cmd.Stdout = &outBuf
cmd.Stderr = &errBuf
// Track the active command
sr.mu.Lock()
sr.activeCmds[cmd] = struct{}{}
sr.mu.Unlock()
err := cmd.Start()
if err != nil {
return &CommandResult{
Command: cmd.String(),
ReturnCode: -1,
ErrOutput: err.Error(),
}
}
err = cmd.Wait()
// Remove from active commands
sr.mu.Lock()
delete(sr.activeCmds, cmd)
sr.mu.Unlock()
result := &CommandResult{
Command: cmd.String(),
Output: outBuf.String(),
ErrOutput: errBuf.String(),
}
if err != nil {
var exitErr *exec.ExitError
if errors.As(err, &exitErr) {
result.ReturnCode = exitErr.ExitCode()
} else {
result.ReturnCode = -1
}
} else {
result.ReturnCode = 0
}
return result
}