Update jobs only when a new container id is found
All checks were successful
continuous-integration/drone/push Build is passing

Rather than trashing the whole schedule, instead compare what is
currently scheduled with the newly found containers and then add or
remove jobs as needed.

Fixes #2
This commit is contained in:
IamTheFij 2020-04-07 09:58:12 -07:00
parent 6e324795d4
commit 7b2eb917ad
2 changed files with 61 additions and 16 deletions

View File

@ -1,3 +1,4 @@
.PHONY: test all
DOCKER_TAG ?= dockron-dev-${USER} DOCKER_TAG ?= dockron-dev-${USER}
GIT_TAG_NAME := $(shell git tag -l --contains HEAD) GIT_TAG_NAME := $(shell git tag -l --contains HEAD)
GIT_SHA := $(shell git rev-parse HEAD) GIT_SHA := $(shell git rev-parse HEAD)
@ -13,7 +14,7 @@ vendor:
# Runs the application, useful while developing # Runs the application, useful while developing
.PHONY: run .PHONY: run
run: run:
go run *.go go run *.go -watch 10s -debug
# Output target # Output target
dockron: dockron:

74
main.go
View File

@ -22,6 +22,9 @@ var (
// version of dockron being run // version of dockron being run
version = "dev" version = "dev"
// Debug can be used to show or supress certain log messages
Debug = true
) )
// ContainerStartJob represents a scheduled container task // ContainerStartJob represents a scheduled container task
@ -39,17 +42,33 @@ type ContainerStartJob struct {
// container // container
func (job ContainerStartJob) Run() { func (job ContainerStartJob) Run() {
log.Println("Starting:", job.Name) log.Println("Starting:", job.Name)
err := job.Client.ContainerStart(job.Context, job.ContainerID, types.ContainerStartOptions{}) err := job.Client.ContainerStart(
job.Context,
job.ContainerID,
types.ContainerStartOptions{},
)
if err != nil { if err != nil {
panic(err) panic(err)
} }
} }
// UniqueName returns a unique identifier for a container start job
func (job ContainerStartJob) UniqueName() string {
// ContainerID should be unique as a change in label will result in
// a new container as they are immutable
return job.ContainerID
}
// QueryScheduledJobs queries Docker for all containers with a schedule and // QueryScheduledJobs queries Docker for all containers with a schedule and
// returns a list of ContainerStartJob records to be scheduled // returns a list of ContainerStartJob records to be scheduled
func QueryScheduledJobs(cli *client.Client) (jobs []ContainerStartJob) { func QueryScheduledJobs(cli *client.Client) (jobs []ContainerStartJob) {
log.Println("Scanning containers for new schedules...") if Debug {
containers, err := cli.ContainerList(context.Background(), types.ContainerListOptions{All: true}) log.Println("Scanning containers for new schedules...")
}
containers, err := cli.ContainerList(
context.Background(),
types.ContainerListOptions{All: true},
)
if err != nil { if err != nil {
panic(err) panic(err)
} }
@ -73,16 +92,48 @@ func QueryScheduledJobs(cli *client.Client) (jobs []ContainerStartJob) {
// ScheduleJobs accepts a Cron instance and a list of jobs to schedule. // ScheduleJobs accepts a Cron instance and a list of jobs to schedule.
// It then schedules the provided jobs // It then schedules the provided jobs
func ScheduleJobs(c *cron.Cron, jobs []ContainerStartJob) { func ScheduleJobs(c *cron.Cron, jobs []ContainerStartJob) {
// Fetch existing jobs from the cron
existingJobs := map[string]cron.EntryID{}
for _, entry := range c.Entries() {
existingJobs[entry.Job.(ContainerStartJob).UniqueName()] = entry.ID
}
for _, job := range jobs { for _, job := range jobs {
// TODO: Do something with the entryId returned here if _, ok := existingJobs[job.UniqueName()]; ok {
// Job already exists, remove it from existing jobs so we don't
// unschedule it later
if Debug {
log.Printf("Job %s is already scheduled. Skipping", job.Name)
}
delete(existingJobs, job.UniqueName())
continue
}
// Job doesn't exist yet, schedule it
_, err := c.AddJob(job.Schedule, job) _, err := c.AddJob(job.Schedule, job)
if err == nil { if err == nil {
log.Printf("Scheduled %s (%s) with schedule '%s'\n", job.Name, job.ContainerID[:10], job.Schedule) log.Printf(
"Scheduled %s (%s) with schedule '%s'\n",
job.Name,
job.ContainerID[:10],
job.Schedule,
)
} else { } else {
// TODO: Track something for a healthcheck here // TODO: Track something for a healthcheck here
log.Printf("Error scheduling %s (%s) with schedule '%s'. %v", job.Name, job.ContainerID[:10], job.Schedule, err) log.Printf(
"Error scheduling %s (%s) with schedule '%s'. %v",
job.Name,
job.ContainerID[:10],
job.Schedule,
err,
)
} }
} }
// Remove remaining scheduled jobs that weren't in the new list
for _, entryID := range existingJobs {
c.Remove(entryID)
}
} }
func main() { func main() {
@ -96,6 +147,7 @@ func main() {
var watchInterval time.Duration var watchInterval time.Duration
flag.DurationVar(&watchInterval, "watch", defaultWatchInterval, "Interval used to poll Docker for changes") flag.DurationVar(&watchInterval, "watch", defaultWatchInterval, "Interval used to poll Docker for changes")
var showVersion = flag.Bool("version", false, "Display the version of dockron and exit") var showVersion = flag.Bool("version", false, "Display the version of dockron and exit")
flag.BoolVar(&Debug, "debug", false, "Show debug logs")
flag.Parse() flag.Parse()
// Print version if asked // Print version if asked
@ -106,21 +158,13 @@ func main() {
// Create a Cron // Create a Cron
c := cron.New() c := cron.New()
c.Start()
// Start the loop // Start the loop
for { for {
// HACK: This is risky as it could fall on the same interval as a task and that task would get skipped
// It would be best to manage a ContainerID to Job mapping and then remove entries that are missing
// in the new list and add new entries. However, cron does not support this yet.
// Stop and create a new cron
c.Stop()
c = cron.New()
// Schedule jobs again // Schedule jobs again
jobs := QueryScheduledJobs(cli) jobs := QueryScheduledJobs(cli)
ScheduleJobs(c, jobs) ScheduleJobs(c, jobs)
c.Start()
// Sleep until the next query time // Sleep until the next query time
time.Sleep(watchInterval) time.Sleep(watchInterval)