diff --git a/README.md b/README.md index 69c6a32..4c806ae 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,7 @@ # docker-batch-scheduler -WIP for a docker batch scheduler \ No newline at end of file +WIP for a docker batch scheduler + +Uses https://godoc.org/github.com/robfig/cron for cron expressions. + +Sets schedules based on label `cron.schedule="* * * * *"` diff --git a/main.go b/main.go index 134a479..baa9e0f 100644 --- a/main.go +++ b/main.go @@ -10,6 +10,8 @@ import ( "time" ) +var WatchInterval = (5 * time.Second) + type ContainerStartJob struct { Client *client.Client ContainerID string @@ -19,26 +21,20 @@ type ContainerStartJob struct { } func (job ContainerStartJob) Run() { - fmt.Println("Starting: ", job.Name) + fmt.Println("Starting:", job.Name) err := job.Client.ContainerStart(job.Context, job.ContainerID, types.ContainerStartOptions{}) if err != nil { panic(err) } } -func main() { - cli, err := client.NewEnvClient() - if err != nil { - panic(err) - } - +func QueryScheduledJobs(cli *client.Client) (jobs []ContainerStartJob) { + fmt.Println("Scanning containers for new schedules...") containers, err := cli.ContainerList(context.Background(), types.ContainerListOptions{All: true}) if err != nil { panic(err) } - jobs := []ContainerStartJob{} - for _, container := range containers { if val, ok := container.Labels["cron.schedule"]; ok { jobName := strings.Join(container.Names, "/") @@ -52,19 +48,43 @@ func main() { } } - c := cron.New() + return +} +func ScheduleJobs(c *cron.Cron, jobs []ContainerStartJob) { for _, job := range jobs { - fmt.Println("Scheduling ", job.Name, "(", job.Schedule, ")") + fmt.Printf("Scheduling %s (%s) with schedule '%s'\n", job.Name, job.ContainerID[:10], job.Schedule) c.AddJob(job.Schedule, job) } +} - // Start the cron job threads - c.Start() +func main() { + cli, err := client.NewEnvClient() + if err != nil { + panic(err) + } + + // Create a cron + c := cron.New() // Start the loop for { - time.Sleep(5 * time.Second) fmt.Println("Tick...") + + // HACK: This is risky as it could fall on the same interval as a task and that task would get skipped + // It would be best to manage a ContainerID to Job mapping and then remove entries that are missing + // in the new list and add new entries. However, cron does not support this yet. + + // Stop and create a new cron + c.Stop() + c = cron.New() + + // Schedule jobs again + jobs := QueryScheduledJobs(cli) + ScheduleJobs(c, jobs) + c.Start() + + // Sleep until the next query time + time.Sleep(WatchInterval) } }