Add polling for changes from Docker

This commit is contained in:
IamTheFij 2018-08-02 18:56:02 -07:00
parent 5d88657d96
commit 3c22782848
2 changed files with 39 additions and 15 deletions

View File

@ -1,3 +1,7 @@
# docker-batch-scheduler # docker-batch-scheduler
WIP for a docker batch scheduler WIP for a docker batch scheduler
Uses https://godoc.org/github.com/robfig/cron for cron expressions.
Sets schedules based on label `cron.schedule="* * * * *"`

48
main.go
View File

@ -10,6 +10,8 @@ import (
"time" "time"
) )
var WatchInterval = (5 * time.Second)
type ContainerStartJob struct { type ContainerStartJob struct {
Client *client.Client Client *client.Client
ContainerID string ContainerID string
@ -19,26 +21,20 @@ type ContainerStartJob struct {
} }
func (job ContainerStartJob) Run() { func (job ContainerStartJob) Run() {
fmt.Println("Starting: ", job.Name) fmt.Println("Starting:", job.Name)
err := job.Client.ContainerStart(job.Context, job.ContainerID, types.ContainerStartOptions{}) err := job.Client.ContainerStart(job.Context, job.ContainerID, types.ContainerStartOptions{})
if err != nil { if err != nil {
panic(err) panic(err)
} }
} }
func main() { func QueryScheduledJobs(cli *client.Client) (jobs []ContainerStartJob) {
cli, err := client.NewEnvClient() fmt.Println("Scanning containers for new schedules...")
if err != nil {
panic(err)
}
containers, err := cli.ContainerList(context.Background(), types.ContainerListOptions{All: true}) containers, err := cli.ContainerList(context.Background(), types.ContainerListOptions{All: true})
if err != nil { if err != nil {
panic(err) panic(err)
} }
jobs := []ContainerStartJob{}
for _, container := range containers { for _, container := range containers {
if val, ok := container.Labels["cron.schedule"]; ok { if val, ok := container.Labels["cron.schedule"]; ok {
jobName := strings.Join(container.Names, "/") jobName := strings.Join(container.Names, "/")
@ -52,19 +48,43 @@ func main() {
} }
} }
c := cron.New() return
}
func ScheduleJobs(c *cron.Cron, jobs []ContainerStartJob) {
for _, job := range jobs { for _, job := range jobs {
fmt.Println("Scheduling ", job.Name, "(", job.Schedule, ")") fmt.Printf("Scheduling %s (%s) with schedule '%s'\n", job.Name, job.ContainerID[:10], job.Schedule)
c.AddJob(job.Schedule, job) c.AddJob(job.Schedule, job)
} }
}
// Start the cron job threads func main() {
c.Start() cli, err := client.NewEnvClient()
if err != nil {
panic(err)
}
// Create a cron
c := cron.New()
// Start the loop // Start the loop
for { for {
time.Sleep(5 * time.Second)
fmt.Println("Tick...") fmt.Println("Tick...")
// HACK: This is risky as it could fall on the same interval as a task and that task would get skipped
// It would be best to manage a ContainerID to Job mapping and then remove entries that are missing
// in the new list and add new entries. However, cron does not support this yet.
// Stop and create a new cron
c.Stop()
c = cron.New()
// Schedule jobs again
jobs := QueryScheduledJobs(cli)
ScheduleJobs(c, jobs)
c.Start()
// Sleep until the next query time
time.Sleep(WatchInterval)
} }
} }