diff --git a/Makefile b/Makefile index 281d0d9..ba4341d 100644 --- a/Makefile +++ b/Makefile @@ -1,3 +1,4 @@ +.PHONY: test all DOCKER_TAG ?= dockron-dev-${USER} GIT_TAG_NAME := $(shell git tag -l --contains HEAD) GIT_SHA := $(shell git rev-parse HEAD) @@ -13,7 +14,7 @@ vendor: # Runs the application, useful while developing .PHONY: run run: - go run . + go run . -watch 10s -debug .PHONY: test test: diff --git a/main.go b/main.go index 7d6b8e9..eae98bd 100644 --- a/main.go +++ b/main.go @@ -23,6 +23,9 @@ var ( // version of dockron being run version = "dev" + + // Debug can be used to show or supress certain log messages + Debug = true ) // ContainerClient provides an interface for interracting with Docker @@ -46,17 +49,33 @@ type ContainerStartJob struct { // container func (job ContainerStartJob) Run() { log.Println("Starting:", job.Name) - err := job.Client.ContainerStart(job.Context, job.ContainerID, dockerTypes.ContainerStartOptions{}) + err := job.Client.ContainerStart( + job.Context, + job.ContainerID, + dockerTypes.ContainerStartOptions{}, + ) if err != nil { panic(err) } } +// UniqueName returns a unique identifier for a container start job +func (job ContainerStartJob) UniqueName() string { + // ContainerID should be unique as a change in label will result in + // a new container as they are immutable + return job.ContainerID +} + // QueryScheduledJobs queries Docker for all containers with a schedule and // returns a list of ContainerStartJob records to be scheduled func QueryScheduledJobs(client ContainerClient) (jobs []ContainerStartJob) { - log.Println("Scanning containers for new schedules...") - containers, err := client.ContainerList(context.Background(), dockerTypes.ContainerListOptions{All: true}) + if Debug { + log.Println("Scanning containers for new schedules...") + } + containers, err := client.ContainerList( + context.Background(), + dockerTypes.ContainerListOptions{All: true}, + ) if err != nil { panic(err) } @@ -80,16 +99,48 @@ func QueryScheduledJobs(client ContainerClient) (jobs []ContainerStartJob) { // ScheduleJobs accepts a Cron instance and a list of jobs to schedule. // It then schedules the provided jobs func ScheduleJobs(c *cron.Cron, jobs []ContainerStartJob) { + // Fetch existing jobs from the cron + existingJobs := map[string]cron.EntryID{} + for _, entry := range c.Entries() { + existingJobs[entry.Job.(ContainerStartJob).UniqueName()] = entry.ID + } + for _, job := range jobs { - // TODO: Do something with the entryId returned here + if _, ok := existingJobs[job.UniqueName()]; ok { + // Job already exists, remove it from existing jobs so we don't + // unschedule it later + if Debug { + log.Printf("Job %s is already scheduled. Skipping", job.Name) + } + delete(existingJobs, job.UniqueName()) + continue + } + + // Job doesn't exist yet, schedule it _, err := c.AddJob(job.Schedule, job) if err == nil { - log.Printf("Scheduled %s (%s) with schedule '%s'\n", job.Name, job.ContainerID[:10], job.Schedule) + log.Printf( + "Scheduled %s (%s) with schedule '%s'\n", + job.Name, + job.ContainerID[:10], + job.Schedule, + ) } else { // TODO: Track something for a healthcheck here - log.Printf("Error scheduling %s (%s) with schedule '%s'. %v\n", job.Name, job.ContainerID[:10], job.Schedule, err) + log.Printf( + "Error scheduling %s (%s) with schedule '%s'. %v\n", + job.Name, + job.ContainerID[:10], + job.Schedule, + err, + ) } } + + // Remove remaining scheduled jobs that weren't in the new list + for _, entryID := range existingJobs { + c.Remove(entryID) + } } func main() { @@ -103,6 +154,7 @@ func main() { var watchInterval time.Duration flag.DurationVar(&watchInterval, "watch", defaultWatchInterval, "Interval used to poll Docker for changes") var showVersion = flag.Bool("version", false, "Display the version of dockron and exit") + flag.BoolVar(&Debug, "debug", false, "Show debug logs") flag.Parse() // Print version if asked @@ -113,22 +165,14 @@ func main() { // Create a Cron c := cron.New() + c.Start() // Start the loop for { - // HACK: This is risky as it could fall on the same interval as a task and that task would get skipped - // It would be best to manage a ContainerID to Job mapping and then remove entries that are missing - // in the new list and add new entries. However, cron does not support this yet. - - // Stop and create a new cron - c.Stop() - c = cron.New() - + // Schedule jobs again jobs := QueryScheduledJobs(client) ScheduleJobs(c, jobs) - c.Start() - // Sleep until the next query time time.Sleep(watchInterval) } diff --git a/main_test.go b/main_test.go index fccafe8..0b36662 100644 --- a/main_test.go +++ b/main_test.go @@ -171,10 +171,7 @@ func TestScheduleJobs(t *testing.T) { } }) - // Subsequently scheduled jobs will append since we currently just stop and create a new cron - // Eventually this test case should change when proper removal is supported - - t.Run("Schedule a second job", func(t *testing.T) { + t.Run("Schedule a second job removing the first", func(t *testing.T) { log.Printf("Running %s", t.Name()) jobs := []ContainerStartJob{ ContainerStartJob{ @@ -184,13 +181,6 @@ func TestScheduleJobs(t *testing.T) { }, } ScheduleJobs(c, jobs) - jobs = append([]ContainerStartJob{ - ContainerStartJob{ - ContainerID: "0123456789/has_schedule_1", - Name: "has_schedule_1", - Schedule: "* * * * *", - }, - }, jobs...) scheduledEntries := c.Entries()