Update jobs only when a new container id is found

Rather than trashing the whole schedule, instead compare what is
currently scheduled with the newly found containers and then add or
remove jobs as needed.

Fixes #2
This commit is contained in:
IamTheFij 2020-04-07 09:58:12 -07:00
parent bda0ce4b1f
commit 050465b0aa
3 changed files with 63 additions and 28 deletions

View File

@ -1,3 +1,4 @@
.PHONY: test all
DOCKER_TAG ?= dockron-dev-${USER}
GIT_TAG_NAME := $(shell git tag -l --contains HEAD)
GIT_SHA := $(shell git rev-parse HEAD)
@ -13,7 +14,7 @@ vendor:
# Runs the application, useful while developing
.PHONY: run
run:
go run .
go run . -watch 10s -debug
.PHONY: test
test:

74
main.go
View File

@ -23,6 +23,9 @@ var (
// version of dockron being run
version = "dev"
// Debug can be used to show or supress certain log messages
Debug = true
)
// ContainerClient provides an interface for interracting with Docker
@ -46,17 +49,33 @@ type ContainerStartJob struct {
// container
func (job ContainerStartJob) Run() {
log.Println("Starting:", job.Name)
err := job.Client.ContainerStart(job.Context, job.ContainerID, dockerTypes.ContainerStartOptions{})
err := job.Client.ContainerStart(
job.Context,
job.ContainerID,
dockerTypes.ContainerStartOptions{},
)
if err != nil {
panic(err)
}
}
// UniqueName returns a unique identifier for a container start job
func (job ContainerStartJob) UniqueName() string {
// ContainerID should be unique as a change in label will result in
// a new container as they are immutable
return job.ContainerID
}
// QueryScheduledJobs queries Docker for all containers with a schedule and
// returns a list of ContainerStartJob records to be scheduled
func QueryScheduledJobs(client ContainerClient) (jobs []ContainerStartJob) {
if Debug {
log.Println("Scanning containers for new schedules...")
containers, err := client.ContainerList(context.Background(), dockerTypes.ContainerListOptions{All: true})
}
containers, err := client.ContainerList(
context.Background(),
dockerTypes.ContainerListOptions{All: true},
)
if err != nil {
panic(err)
}
@ -80,16 +99,48 @@ func QueryScheduledJobs(client ContainerClient) (jobs []ContainerStartJob) {
// ScheduleJobs accepts a Cron instance and a list of jobs to schedule.
// It then schedules the provided jobs
func ScheduleJobs(c *cron.Cron, jobs []ContainerStartJob) {
// Fetch existing jobs from the cron
existingJobs := map[string]cron.EntryID{}
for _, entry := range c.Entries() {
existingJobs[entry.Job.(ContainerStartJob).UniqueName()] = entry.ID
}
for _, job := range jobs {
// TODO: Do something with the entryId returned here
if _, ok := existingJobs[job.UniqueName()]; ok {
// Job already exists, remove it from existing jobs so we don't
// unschedule it later
if Debug {
log.Printf("Job %s is already scheduled. Skipping", job.Name)
}
delete(existingJobs, job.UniqueName())
continue
}
// Job doesn't exist yet, schedule it
_, err := c.AddJob(job.Schedule, job)
if err == nil {
log.Printf("Scheduled %s (%s) with schedule '%s'\n", job.Name, job.ContainerID[:10], job.Schedule)
log.Printf(
"Scheduled %s (%s) with schedule '%s'\n",
job.Name,
job.ContainerID[:10],
job.Schedule,
)
} else {
// TODO: Track something for a healthcheck here
log.Printf("Error scheduling %s (%s) with schedule '%s'. %v\n", job.Name, job.ContainerID[:10], job.Schedule, err)
log.Printf(
"Error scheduling %s (%s) with schedule '%s'. %v\n",
job.Name,
job.ContainerID[:10],
job.Schedule,
err,
)
}
}
// Remove remaining scheduled jobs that weren't in the new list
for _, entryID := range existingJobs {
c.Remove(entryID)
}
}
func main() {
@ -103,6 +154,7 @@ func main() {
var watchInterval time.Duration
flag.DurationVar(&watchInterval, "watch", defaultWatchInterval, "Interval used to poll Docker for changes")
var showVersion = flag.Bool("version", false, "Display the version of dockron and exit")
flag.BoolVar(&Debug, "debug", false, "Show debug logs")
flag.Parse()
// Print version if asked
@ -113,22 +165,14 @@ func main() {
// Create a Cron
c := cron.New()
c.Start()
// Start the loop
for {
// HACK: This is risky as it could fall on the same interval as a task and that task would get skipped
// It would be best to manage a ContainerID to Job mapping and then remove entries that are missing
// in the new list and add new entries. However, cron does not support this yet.
// Stop and create a new cron
c.Stop()
c = cron.New()
// Schedule jobs again
jobs := QueryScheduledJobs(client)
ScheduleJobs(c, jobs)
c.Start()
// Sleep until the next query time
time.Sleep(watchInterval)
}

View File

@ -171,10 +171,7 @@ func TestScheduleJobs(t *testing.T) {
}
})
// Subsequently scheduled jobs will append since we currently just stop and create a new cron
// Eventually this test case should change when proper removal is supported
t.Run("Schedule a second job", func(t *testing.T) {
t.Run("Schedule a second job removing the first", func(t *testing.T) {
log.Printf("Running %s", t.Name())
jobs := []ContainerStartJob{
ContainerStartJob{
@ -184,13 +181,6 @@ func TestScheduleJobs(t *testing.T) {
},
}
ScheduleJobs(c, jobs)
jobs = append([]ContainerStartJob{
ContainerStartJob{
ContainerID: "0123456789/has_schedule_1",
Name: "has_schedule_1",
Schedule: "* * * * *",
},
}, jobs...)
scheduledEntries := c.Entries()