dockron/main.go

347 lines
9.3 KiB
Go
Raw Normal View History

2018-08-01 15:56:13 +00:00
package main
import (
2018-08-04 00:45:53 +00:00
"flag"
2018-08-01 15:56:13 +00:00
"fmt"
2019-06-28 17:24:41 +00:00
"os"
"regexp"
2018-08-01 15:56:13 +00:00
"strings"
"time"
2020-08-07 22:00:30 +00:00
"git.iamthefij.com/iamthefij/slog"
2020-08-07 22:00:30 +00:00
dockerTypes "github.com/docker/docker/api/types"
2024-09-25 18:58:41 +00:00
"github.com/docker/docker/api/types/container"
2020-08-07 22:00:30 +00:00
dockerClient "github.com/docker/docker/client"
"github.com/robfig/cron/v3"
"golang.org/x/net/context"
2018-08-01 15:56:13 +00:00
)
2019-06-28 17:24:41 +00:00
var (
// defaultWatchInterval is the duration we should sleep until polling Docker
defaultWatchInterval = (1 * time.Minute)
2018-08-03 01:56:02 +00:00
2019-06-28 17:24:41 +00:00
// schedLabel is the string label to search for cron expressions
schedLabel = "dockron.schedule"
// execLabelRegex is will capture labels for an exec job
execLabelRegexp = regexp.MustCompile(`dockron\.([a-zA-Z0-9_-]+)\.(schedule|command)`)
2019-06-28 17:24:41 +00:00
// version of dockron being run
version = "dev"
)
2018-08-04 00:30:17 +00:00
// ContainerClient provides an interface for interracting with Docker
type ContainerClient interface {
2024-09-25 18:58:41 +00:00
ContainerExecCreate(ctx context.Context, container string, config container.ExecOptions) (dockerTypes.IDResponse, error)
ContainerExecInspect(ctx context.Context, execID string) (container.ExecInspect, error)
ContainerExecStart(ctx context.Context, execID string, config container.ExecStartOptions) error
ContainerInspect(ctx context.Context, containerID string) (dockerTypes.ContainerJSON, error)
2024-09-25 18:58:41 +00:00
ContainerList(context context.Context, options container.ListOptions) ([]dockerTypes.Container, error)
ContainerStart(context context.Context, containerID string, options container.StartOptions) error
}
// ContainerCronJob is an interface of a job to run on containers
type ContainerCronJob interface {
Run()
Name() string
UniqueName() string
Schedule() string
}
2018-08-04 00:30:17 +00:00
// ContainerStartJob represents a scheduled container task
// It contains a reference to a client, the schedule to run on, and the
// ID of that container that should be started
2018-08-01 15:56:13 +00:00
type ContainerStartJob struct {
client ContainerClient
context context.Context
name string
containerID string
schedule string
2018-08-01 15:56:13 +00:00
}
2018-08-04 00:30:17 +00:00
// Run is executed based on the ContainerStartJob Schedule and starts the
// container
2018-08-01 15:56:13 +00:00
func (job ContainerStartJob) Run() {
slog.Infof("Starting: %s", job.name)
// Check if container is already running
containerJSON, err := job.client.ContainerInspect(
job.context,
job.containerID,
)
slog.OnErrPanicf(err, "Could not get container details for job %s", job.name)
if containerJSON.State.Running {
slog.Warningf("Container is already running. Skipping %s", job.name)
2021-04-30 18:24:14 +00:00
return
}
// Start job
err = job.client.ContainerStart(
job.context,
job.containerID,
2024-09-25 18:58:41 +00:00
container.StartOptions{},
)
slog.OnErrPanicf(err, "Could not start container for job %s", job.name)
// Check results of job
for check := true; check; check = containerJSON.State.Running {
slog.Debugf("Still running %s", job.name)
containerJSON, err = job.client.ContainerInspect(
job.context,
job.containerID,
)
slog.OnErrPanicf(err, "Could not get container details for job %s", job.name)
time.Sleep(1 * time.Second)
2018-08-01 15:56:13 +00:00
}
slog.Debugf("Done execing %s. %+v", job.name, containerJSON.State)
2024-09-25 18:58:41 +00:00
// Log exit code if failed
if containerJSON.State.ExitCode != 0 {
slog.Errorf(
2023-10-08 20:14:43 +00:00
"Exec job %s exited with code %d",
job.name,
containerJSON.State.ExitCode,
)
}
}
// Name returns the name of the job
func (job ContainerStartJob) Name() string {
return job.name
}
// Schedule returns the schedule of the job
func (job ContainerStartJob) Schedule() string {
return job.schedule
2018-08-01 15:56:13 +00:00
}
// UniqueName returns a unique identifier for a container start job
func (job ContainerStartJob) UniqueName() string {
// ContainerID should be unique as a change in label will result in
// a new container as they are immutable
return job.name + "/" + job.containerID
}
// ContainerExecJob is a scheduled job to be executed in a running container
type ContainerExecJob struct {
ContainerStartJob
shellCommand string
}
// Run is executed based on the ContainerStartJob Schedule and starts the
// container
func (job ContainerExecJob) Run() {
slog.Infof("Execing: %s", job.name)
containerJSON, err := job.client.ContainerInspect(
job.context,
job.containerID,
)
slog.OnErrPanicf(err, "Could not get container details for job %s", job.name)
if !containerJSON.State.Running {
slog.Warningf("Container not running. Skipping %s", job.name)
2021-04-30 18:24:14 +00:00
return
}
execID, err := job.client.ContainerExecCreate(
job.context,
job.containerID,
2024-09-25 18:58:41 +00:00
container.ExecOptions{
Cmd: []string{"sh", "-c", strings.TrimSpace(job.shellCommand)},
},
)
slog.OnErrPanicf(err, "Could not create container exec job for %s", job.name)
err = job.client.ContainerExecStart(
job.context,
execID.ID,
2024-09-25 18:58:41 +00:00
container.ExecStartOptions{},
)
slog.OnErrPanicf(err, "Could not start container exec job for %s", job.name)
// Wait for job results
2024-09-25 18:58:41 +00:00
execInfo := container.ExecInspect{Running: true}
for execInfo.Running {
slog.Debugf("Still execing %s", job.name)
execInfo, err = job.client.ContainerExecInspect(
job.context,
execID.ID,
)
slog.Debugf("Exec info: %+v", execInfo)
2021-04-30 18:24:14 +00:00
if err != nil {
// Nothing we can do if we got an error here, so let's go
slog.OnErrWarnf(err, "Could not get status for exec job %s", job.name)
2021-04-30 18:24:14 +00:00
return
}
2021-04-30 18:24:14 +00:00
time.Sleep(1 * time.Second)
}
slog.Debugf("Done execing %s. %+v", job.name, execInfo)
// Log exit code if failed
if execInfo.ExitCode != 0 {
slog.Errorf("Exec job %s existed with code %d", job.name, execInfo.ExitCode)
}
}
// QueryScheduledJobs queries Docker for all containers with a schedule and
// returns a list of ContainerCronJob records to be scheduled
func QueryScheduledJobs(client ContainerClient) (jobs []ContainerCronJob) {
slog.Debugf("Scanning containers for new schedules...")
containers, err := client.ContainerList(
context.Background(),
2024-09-25 18:58:41 +00:00
container.ListOptions{All: true},
)
slog.OnErrPanicf(err, "Failure querying docker containers")
2018-08-01 15:56:13 +00:00
for _, container := range containers {
// Add start job
2019-06-28 17:24:41 +00:00
if val, ok := container.Labels[schedLabel]; ok {
2018-08-01 15:56:13 +00:00
jobName := strings.Join(container.Names, "/")
2021-04-30 18:24:14 +00:00
2018-08-01 15:59:52 +00:00
jobs = append(jobs, ContainerStartJob{
client: client,
containerID: container.ID,
context: context.Background(),
schedule: val,
name: jobName,
})
}
// Add exec jobs
execJobs := map[string]map[string]string{}
2021-04-30 18:24:14 +00:00
for label, value := range container.Labels {
results := execLabelRegexp.FindStringSubmatch(label)
2021-04-30 18:24:14 +00:00
expectedLabelParts := 3
if len(results) == expectedLabelParts {
// We've got part of a new job
jobName, jobField := results[1], results[2]
if partJob, ok := execJobs[jobName]; ok {
// Partial exists, add the other value
partJob[jobField] = value
} else {
// No partial exists, add this part
execJobs[jobName] = map[string]string{
jobField: value,
}
}
}
}
2021-04-30 18:24:14 +00:00
for jobName, jobConfig := range execJobs {
schedule, ok := jobConfig["schedule"]
if !ok {
continue
}
2021-04-30 18:24:14 +00:00
shellCommand, ok := jobConfig["command"]
if !ok {
continue
}
2021-04-30 18:24:14 +00:00
jobs = append(jobs, ContainerExecJob{
ContainerStartJob: ContainerStartJob{
client: client,
containerID: container.ID,
context: context.Background(),
schedule: schedule,
name: strings.Join(append(container.Names, jobName), "/"),
},
shellCommand: shellCommand,
2018-08-01 15:56:13 +00:00
})
}
}
2021-04-30 18:24:14 +00:00
return jobs
2018-08-03 01:56:02 +00:00
}
2018-08-01 15:59:52 +00:00
2018-08-04 00:30:17 +00:00
// ScheduleJobs accepts a Cron instance and a list of jobs to schedule.
// It then schedules the provided jobs
func ScheduleJobs(c *cron.Cron, jobs []ContainerCronJob) {
// Fetch existing jobs from the cron
existingJobs := map[string]cron.EntryID{}
for _, entry := range c.Entries() {
// This should be safe since ContainerCronJob is the only type of job we use
existingJobs[entry.Job.(ContainerCronJob).UniqueName()] = entry.ID
}
2018-08-01 15:59:52 +00:00
for _, job := range jobs {
if _, ok := existingJobs[job.UniqueName()]; ok {
// Job already exists, remove it from existing jobs so we don't
// unschedule it later
slog.Debugf("Job %s is already scheduled. Skipping", job.Name())
delete(existingJobs, job.UniqueName())
2021-04-30 18:24:14 +00:00
continue
}
// Job doesn't exist yet, schedule it
_, err := c.AddJob(job.Schedule(), job)
2020-04-06 23:53:28 +00:00
if err == nil {
slog.Infof(
"Scheduled %s (%s) with schedule '%s'\n",
job.Name(),
job.UniqueName(),
job.Schedule(),
)
2020-04-06 23:53:28 +00:00
} else {
// TODO: Track something for a healthcheck here
slog.Errorf(
"Could not schedule %s (%s) with schedule '%s'. %v\n",
job.Name(),
job.UniqueName(),
job.Schedule(),
err,
)
2020-04-06 23:53:28 +00:00
}
2018-08-01 15:59:52 +00:00
}
// Remove remaining scheduled jobs that weren't in the new list
for _, entryID := range existingJobs {
c.Remove(entryID)
}
2018-08-03 01:56:02 +00:00
}
func main() {
2018-08-04 00:30:17 +00:00
// Get a Docker Client
2020-08-07 22:43:17 +00:00
client, err := dockerClient.NewClientWithOpts(dockerClient.FromEnv)
2021-04-30 18:24:14 +00:00
slog.OnErrPanicf(err, "Could not create Docker client")
2018-08-01 15:59:52 +00:00
2018-08-04 00:45:53 +00:00
// Read interval for polling Docker
var watchInterval time.Duration
2021-04-30 18:24:14 +00:00
showVersion := flag.Bool("version", false, "Display the version of dockron and exit")
2019-06-28 17:24:41 +00:00
flag.DurationVar(&watchInterval, "watch", defaultWatchInterval, "Interval used to poll Docker for changes")
flag.BoolVar(&slog.DebugLevel, "debug", false, "Show debug logs")
2018-08-04 00:45:53 +00:00
flag.Parse()
2019-06-28 17:24:41 +00:00
// Print version if asked
if *showVersion {
fmt.Println("Dockron version:", version)
os.Exit(0)
}
2018-08-04 00:30:17 +00:00
// Create a Cron
2018-08-03 01:56:02 +00:00
c := cron.New()
c.Start()
2018-08-01 15:56:13 +00:00
// Start the loop
for {
// Schedule jobs again
jobs := QueryScheduledJobs(client)
2018-08-03 01:56:02 +00:00
ScheduleJobs(c, jobs)
2020-08-07 22:00:30 +00:00
2018-08-03 01:56:02 +00:00
// Sleep until the next query time
2018-08-04 00:45:53 +00:00
time.Sleep(watchInterval)
2018-08-01 15:56:13 +00:00
}
}