Add the ability to create execute jobs on long running containers
This commit is contained in:
parent
5c5fda3ddf
commit
3ee32b632a
@ -10,9 +10,17 @@ services:
|
|||||||
volumes:
|
volumes:
|
||||||
- /var/run/docker.sock:/var/run/docker.sock:ro
|
- /var/run/docker.sock:/var/run/docker.sock:ro
|
||||||
|
|
||||||
echoer:
|
start_echoer:
|
||||||
image: busybox:latest
|
image: busybox:latest
|
||||||
command: ["date"]
|
command: ["date"]
|
||||||
labels:
|
labels:
|
||||||
# Execute every minute
|
# Execute every minute
|
||||||
- 'dockron.schedule=* * * * *'
|
- 'dockron.schedule=* * * * *'
|
||||||
|
|
||||||
|
exec_echoer:
|
||||||
|
image: busybox:latest
|
||||||
|
command: sh -c "date > /out && tail -f /out"
|
||||||
|
labels:
|
||||||
|
# Execute every minute
|
||||||
|
- 'dockron.date.schedule=* * * * *'
|
||||||
|
- 'dockron.date.command=date >> /out'
|
||||||
|
57
logging.go
Normal file
57
logging.go
Normal file
@ -0,0 +1,57 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"log"
|
||||||
|
"os"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
// DebugLevel indicates if we should log at the debug level
|
||||||
|
DebugLevel = true
|
||||||
|
|
||||||
|
loggerDebug = log.New(os.Stderr, "DEBUG", log.LstdFlags)
|
||||||
|
loggerWarning = log.New(os.Stderr, "WARNING", log.LstdFlags)
|
||||||
|
loggerError = log.New(os.Stderr, "ERROR", log.LstdFlags)
|
||||||
|
)
|
||||||
|
|
||||||
|
// LogDebug will log with a DEBUG prefix if DebugLevel is set
|
||||||
|
func LogDebug(format string, v ...interface{}) {
|
||||||
|
if !DebugLevel {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
loggerDebug.Printf(format, v...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// LogWarning will log with a WARNING prefix
|
||||||
|
func LogWarning(format string, v ...interface{}) {
|
||||||
|
loggerWarning.Printf(format, v...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// LogError will log with a ERROR prefix
|
||||||
|
func LogError(format string, v ...interface{}) {
|
||||||
|
loggerError.Printf(format, v...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// WarnErr will provide a warning if an error is provided
|
||||||
|
func WarnErr(err error, format string, v ...interface{}) {
|
||||||
|
if err != nil {
|
||||||
|
loggerWarning.Printf(format, v...)
|
||||||
|
loggerError.Print(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// FatalErr will log out details of an error and exit
|
||||||
|
func FatalErr(err error, format string, v ...interface{}) {
|
||||||
|
if err != nil {
|
||||||
|
loggerError.Printf(format, v...)
|
||||||
|
loggerError.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// PanicErr will log out details of an error and exit
|
||||||
|
func PanicErr(err error, format string, v ...interface{}) {
|
||||||
|
if err != nil {
|
||||||
|
loggerError.Printf(format, v...)
|
||||||
|
loggerError.Panic(err)
|
||||||
|
}
|
||||||
|
}
|
238
main.go
238
main.go
@ -5,6 +5,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
|
"regexp"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -20,75 +21,225 @@ var (
|
|||||||
|
|
||||||
// schedLabel is the string label to search for cron expressions
|
// schedLabel is the string label to search for cron expressions
|
||||||
schedLabel = "dockron.schedule"
|
schedLabel = "dockron.schedule"
|
||||||
|
// execLabelRegex is will capture labels for an exec job
|
||||||
|
execLabelRegexp = regexp.MustCompile(`dockron\.([a-zA-Z0-9_-]+)\.(schedule|command)`)
|
||||||
|
|
||||||
// version of dockron being run
|
// version of dockron being run
|
||||||
version = "dev"
|
version = "dev"
|
||||||
|
|
||||||
// Debug can be used to show or supress certain log messages
|
|
||||||
Debug = true
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// ContainerClient provides an interface for interracting with Docker
|
// ContainerClient provides an interface for interracting with Docker
|
||||||
type ContainerClient interface {
|
type ContainerClient interface {
|
||||||
ContainerStart(context context.Context, containerID string, options dockerTypes.ContainerStartOptions) error
|
ContainerExecCreate(ctx context.Context, container string, config dockerTypes.ExecConfig) (dockerTypes.IDResponse, error)
|
||||||
|
ContainerExecInspect(ctx context.Context, execID string) (dockerTypes.ContainerExecInspect, error)
|
||||||
|
ContainerExecStart(ctx context.Context, execID string, config dockerTypes.ExecStartCheck) error
|
||||||
|
ContainerInspect(ctx context.Context, containerID string) (dockerTypes.ContainerJSON, error)
|
||||||
ContainerList(context context.Context, options dockerTypes.ContainerListOptions) ([]dockerTypes.Container, error)
|
ContainerList(context context.Context, options dockerTypes.ContainerListOptions) ([]dockerTypes.Container, error)
|
||||||
|
ContainerStart(context context.Context, containerID string, options dockerTypes.ContainerStartOptions) error
|
||||||
|
}
|
||||||
|
|
||||||
|
// ContainerCronJob is an interface of a job to run on containers
|
||||||
|
type ContainerCronJob interface {
|
||||||
|
Run()
|
||||||
|
Name() string
|
||||||
|
UniqueName() string
|
||||||
|
Schedule() string
|
||||||
}
|
}
|
||||||
|
|
||||||
// ContainerStartJob represents a scheduled container task
|
// ContainerStartJob represents a scheduled container task
|
||||||
// It contains a reference to a client, the schedule to run on, and the
|
// It contains a reference to a client, the schedule to run on, and the
|
||||||
// ID of that container that should be started
|
// ID of that container that should be started
|
||||||
type ContainerStartJob struct {
|
type ContainerStartJob struct {
|
||||||
Client ContainerClient
|
client ContainerClient
|
||||||
ContainerID string
|
context context.Context
|
||||||
Context context.Context
|
name string
|
||||||
Name string
|
containerID string
|
||||||
Schedule string
|
schedule string
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run is executed based on the ContainerStartJob Schedule and starts the
|
// Run is executed based on the ContainerStartJob Schedule and starts the
|
||||||
// container
|
// container
|
||||||
func (job ContainerStartJob) Run() {
|
func (job ContainerStartJob) Run() {
|
||||||
log.Println("Starting:", job.Name)
|
log.Println("Starting:", job.name)
|
||||||
err := job.Client.ContainerStart(
|
|
||||||
job.Context,
|
// Check if container is already running
|
||||||
job.ContainerID,
|
containerJSON, err := job.client.ContainerInspect(
|
||||||
|
job.context,
|
||||||
|
job.containerID,
|
||||||
|
)
|
||||||
|
PanicErr(err, "Could not get container details for job %s", job.name)
|
||||||
|
|
||||||
|
if containerJSON.State.Running {
|
||||||
|
LogWarning("Container is already running. Skipping %s", job.name)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start job
|
||||||
|
err = job.client.ContainerStart(
|
||||||
|
job.context,
|
||||||
|
job.containerID,
|
||||||
dockerTypes.ContainerStartOptions{},
|
dockerTypes.ContainerStartOptions{},
|
||||||
)
|
)
|
||||||
if err != nil {
|
PanicErr(err, "Could not start container for jobb %s", job.name)
|
||||||
panic(err)
|
|
||||||
|
// Check results of job
|
||||||
|
for check := true; check; check = containerJSON.State.Running {
|
||||||
|
LogDebug("Still running %s", job.name)
|
||||||
|
|
||||||
|
containerJSON, err = job.client.ContainerInspect(
|
||||||
|
job.context,
|
||||||
|
job.containerID,
|
||||||
|
)
|
||||||
|
PanicErr(err, "Could not get container details for job %s", job.name)
|
||||||
|
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
}
|
}
|
||||||
|
LogDebug("Done execing %s. %+v", job.name, containerJSON.State)
|
||||||
|
// Log exit code if failed
|
||||||
|
if containerJSON.State.ExitCode != 0 {
|
||||||
|
LogError(
|
||||||
|
"Exec job %s existed with code %d",
|
||||||
|
job.name,
|
||||||
|
containerJSON.State.ExitCode,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// Name returns the name of the job
|
||||||
|
func (job ContainerStartJob) Name() string {
|
||||||
|
return job.name
|
||||||
|
}
|
||||||
|
|
||||||
|
// Schedule returns the schedule of the job
|
||||||
|
func (job ContainerStartJob) Schedule() string {
|
||||||
|
return job.schedule
|
||||||
}
|
}
|
||||||
|
|
||||||
// UniqueName returns a unique identifier for a container start job
|
// UniqueName returns a unique identifier for a container start job
|
||||||
func (job ContainerStartJob) UniqueName() string {
|
func (job ContainerStartJob) UniqueName() string {
|
||||||
// ContainerID should be unique as a change in label will result in
|
// ContainerID should be unique as a change in label will result in
|
||||||
// a new container as they are immutable
|
// a new container as they are immutable
|
||||||
return job.ContainerID
|
return job.name + "/" + job.containerID
|
||||||
}
|
}
|
||||||
|
|
||||||
// QueryScheduledJobs queries Docker for all containers with a schedule and
|
// ContainerExecJob is a scheduled job to be executed in a running container
|
||||||
// returns a list of ContainerStartJob records to be scheduled
|
type ContainerExecJob struct {
|
||||||
func QueryScheduledJobs(client ContainerClient) (jobs []ContainerStartJob) {
|
ContainerStartJob
|
||||||
if Debug {
|
shellCommand string
|
||||||
log.Println("Scanning containers for new schedules...")
|
|
||||||
}
|
}
|
||||||
containers, err := client.ContainerList(
|
|
||||||
context.Background(),
|
// Run is executed based on the ContainerStartJob Schedule and starts the
|
||||||
dockerTypes.ContainerListOptions{All: true},
|
// container
|
||||||
|
func (job ContainerExecJob) Run() {
|
||||||
|
log.Println("Execing:", job.name)
|
||||||
|
containerJSON, err := job.client.ContainerInspect(
|
||||||
|
job.context,
|
||||||
|
job.containerID,
|
||||||
|
)
|
||||||
|
PanicErr(err, "Could not get container details for job %s", job.name)
|
||||||
|
|
||||||
|
if !containerJSON.State.Running {
|
||||||
|
LogWarning("Container not running. Skipping %s", job.name)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
execID, err := job.client.ContainerExecCreate(
|
||||||
|
job.context,
|
||||||
|
job.containerID,
|
||||||
|
dockerTypes.ExecConfig{
|
||||||
|
Cmd: []string{"sh", "-c", strings.TrimSpace(job.shellCommand)},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
PanicErr(err, "Could not create container exec job for %s", job.name)
|
||||||
|
|
||||||
|
err = job.client.ContainerExecStart(
|
||||||
|
job.context,
|
||||||
|
execID.ID,
|
||||||
|
dockerTypes.ExecStartCheck{},
|
||||||
|
)
|
||||||
|
PanicErr(err, "Could not start container exec job for %s", job.name)
|
||||||
|
|
||||||
|
// Wait for job results
|
||||||
|
execInfo := dockerTypes.ContainerExecInspect{Running: true}
|
||||||
|
for execInfo.Running {
|
||||||
|
LogDebug("Still execing %s", job.name)
|
||||||
|
execInfo, err = job.client.ContainerExecInspect(
|
||||||
|
job.context,
|
||||||
|
execID.ID,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
}
|
||||||
|
LogDebug("Done execing %s. %+v", job.name, execInfo)
|
||||||
|
// Log exit code if failed
|
||||||
|
if execInfo.ExitCode != 0 {
|
||||||
|
LogError("Exec job %s existed with code %d", job.name, execInfo.ExitCode)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// QueryScheduledJobs queries Docker for all containers with a schedule and
|
||||||
|
// returns a list of ContainerCronJob records to be scheduled
|
||||||
|
func QueryScheduledJobs(client ContainerClient) (jobs []ContainerCronJob) {
|
||||||
|
LogDebug("Scanning containers for new schedules...")
|
||||||
|
|
||||||
|
containers, err := client.ContainerList(
|
||||||
|
context.Background(),
|
||||||
|
dockerTypes.ContainerListOptions{All: true},
|
||||||
|
)
|
||||||
|
PanicErr(err, "Failure querying docker containers")
|
||||||
|
|
||||||
for _, container := range containers {
|
for _, container := range containers {
|
||||||
|
// Add start job
|
||||||
if val, ok := container.Labels[schedLabel]; ok {
|
if val, ok := container.Labels[schedLabel]; ok {
|
||||||
jobName := strings.Join(container.Names, "/")
|
jobName := strings.Join(container.Names, "/")
|
||||||
jobs = append(jobs, ContainerStartJob{
|
jobs = append(jobs, ContainerStartJob{
|
||||||
Schedule: val,
|
client: client,
|
||||||
Client: client,
|
containerID: container.ID,
|
||||||
ContainerID: container.ID,
|
context: context.Background(),
|
||||||
Context: context.Background(),
|
schedule: val,
|
||||||
Name: jobName,
|
name: jobName,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add exec jobs
|
||||||
|
execJobs := map[string]map[string]string{}
|
||||||
|
for label, value := range container.Labels {
|
||||||
|
results := execLabelRegexp.FindStringSubmatch(label)
|
||||||
|
if len(results) == 3 {
|
||||||
|
// We've got part of a new job
|
||||||
|
jobName, jobField := results[1], results[2]
|
||||||
|
if partJob, ok := execJobs[jobName]; ok {
|
||||||
|
// Partial exists, add the other value
|
||||||
|
partJob[jobField] = value
|
||||||
|
} else {
|
||||||
|
// No partial exists, add this part
|
||||||
|
execJobs[jobName] = map[string]string{
|
||||||
|
jobField: value,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for jobName, jobConfig := range execJobs {
|
||||||
|
schedule, ok := jobConfig["schedule"]
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
shellCommand, ok := jobConfig["command"]
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
jobs = append(jobs, ContainerExecJob{
|
||||||
|
ContainerStartJob: ContainerStartJob{
|
||||||
|
client: client,
|
||||||
|
containerID: container.ID,
|
||||||
|
context: context.Background(),
|
||||||
|
schedule: schedule,
|
||||||
|
name: strings.Join(append(container.Names, jobName), "/"),
|
||||||
|
},
|
||||||
|
shellCommand: shellCommand,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -98,40 +249,39 @@ func QueryScheduledJobs(client ContainerClient) (jobs []ContainerStartJob) {
|
|||||||
|
|
||||||
// ScheduleJobs accepts a Cron instance and a list of jobs to schedule.
|
// ScheduleJobs accepts a Cron instance and a list of jobs to schedule.
|
||||||
// It then schedules the provided jobs
|
// It then schedules the provided jobs
|
||||||
func ScheduleJobs(c *cron.Cron, jobs []ContainerStartJob) {
|
func ScheduleJobs(c *cron.Cron, jobs []ContainerCronJob) {
|
||||||
// Fetch existing jobs from the cron
|
// Fetch existing jobs from the cron
|
||||||
existingJobs := map[string]cron.EntryID{}
|
existingJobs := map[string]cron.EntryID{}
|
||||||
for _, entry := range c.Entries() {
|
for _, entry := range c.Entries() {
|
||||||
existingJobs[entry.Job.(ContainerStartJob).UniqueName()] = entry.ID
|
// This should be safe since ContainerCronJob is the only type of job we use
|
||||||
|
existingJobs[entry.Job.(ContainerCronJob).UniqueName()] = entry.ID
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, job := range jobs {
|
for _, job := range jobs {
|
||||||
if _, ok := existingJobs[job.UniqueName()]; ok {
|
if _, ok := existingJobs[job.UniqueName()]; ok {
|
||||||
// Job already exists, remove it from existing jobs so we don't
|
// Job already exists, remove it from existing jobs so we don't
|
||||||
// unschedule it later
|
// unschedule it later
|
||||||
if Debug {
|
LogDebug("Job %s is already scheduled. Skipping", job.Name())
|
||||||
log.Printf("Job %s is already scheduled. Skipping", job.Name)
|
|
||||||
}
|
|
||||||
delete(existingJobs, job.UniqueName())
|
delete(existingJobs, job.UniqueName())
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Job doesn't exist yet, schedule it
|
// Job doesn't exist yet, schedule it
|
||||||
_, err := c.AddJob(job.Schedule, job)
|
_, err := c.AddJob(job.Schedule(), job)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
log.Printf(
|
log.Printf(
|
||||||
"Scheduled %s (%s) with schedule '%s'\n",
|
"Scheduled %s (%s) with schedule '%s'\n",
|
||||||
job.Name,
|
job.Name(),
|
||||||
job.ContainerID[:10],
|
job.UniqueName(),
|
||||||
job.Schedule,
|
job.Schedule(),
|
||||||
)
|
)
|
||||||
} else {
|
} else {
|
||||||
// TODO: Track something for a healthcheck here
|
// TODO: Track something for a healthcheck here
|
||||||
log.Printf(
|
LogError(
|
||||||
"Error scheduling %s (%s) with schedule '%s'. %v\n",
|
"Could not schedule %s (%s) with schedule '%s'. %v\n",
|
||||||
job.Name,
|
job.Name(),
|
||||||
job.ContainerID[:10],
|
job.UniqueName(),
|
||||||
job.Schedule,
|
job.Schedule(),
|
||||||
err,
|
err,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
@ -154,7 +304,7 @@ func main() {
|
|||||||
var watchInterval time.Duration
|
var watchInterval time.Duration
|
||||||
flag.DurationVar(&watchInterval, "watch", defaultWatchInterval, "Interval used to poll Docker for changes")
|
flag.DurationVar(&watchInterval, "watch", defaultWatchInterval, "Interval used to poll Docker for changes")
|
||||||
var showVersion = flag.Bool("version", false, "Display the version of dockron and exit")
|
var showVersion = flag.Bool("version", false, "Display the version of dockron and exit")
|
||||||
flag.BoolVar(&Debug, "debug", false, "Show debug logs")
|
flag.BoolVar(&DebugLevel, "debug", false, "Show debug logs")
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
|
|
||||||
// Print version if asked
|
// Print version if asked
|
||||||
|
359
main_test.go
359
main_test.go
@ -3,6 +3,7 @@ package main
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
|
"sort"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
dockerTypes "github.com/docker/docker/api/types"
|
dockerTypes "github.com/docker/docker/api/types"
|
||||||
@ -13,6 +14,9 @@ import (
|
|||||||
// FakeDockerClient is used to test without interracting with Docker
|
// FakeDockerClient is used to test without interracting with Docker
|
||||||
type FakeDockerClient struct {
|
type FakeDockerClient struct {
|
||||||
FakeContainers []dockerTypes.Container
|
FakeContainers []dockerTypes.Container
|
||||||
|
FakeExecIDResponse string
|
||||||
|
FakeContainerExecInspect dockerTypes.ContainerExecInspect
|
||||||
|
FakeContainerInspect dockerTypes.ContainerJSON
|
||||||
}
|
}
|
||||||
|
|
||||||
// ContainerStart pretends to start a container
|
// ContainerStart pretends to start a container
|
||||||
@ -24,18 +28,29 @@ func (fakeClient *FakeDockerClient) ContainerList(context context.Context, optio
|
|||||||
return fakeClient.FakeContainers, nil
|
return fakeClient.FakeContainers, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewFakeDockerClient creates an empty client
|
func (fakeClient *FakeDockerClient) ContainerExecCreate(ctx context.Context, container string, config dockerTypes.ExecConfig) (dockerTypes.IDResponse, error) {
|
||||||
func NewFakeDockerClient() *FakeDockerClient {
|
return dockerTypes.IDResponse{ID: fakeClient.FakeExecIDResponse}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fakeClient *FakeDockerClient) ContainerExecStart(ctx context.Context, execID string, config dockerTypes.ExecStartCheck) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fakeClient *FakeDockerClient) ContainerExecInspect(ctx context.Context, execID string) (dockerTypes.ContainerExecInspect, error) {
|
||||||
|
return fakeClient.FakeContainerExecInspect, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fakeClient *FakeDockerClient) ContainerInspect(ctx context.Context, containerID string) (dockerTypes.ContainerJSON, error) {
|
||||||
|
return fakeClient.FakeContainerInspect, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// newFakeDockerClient creates an empty client
|
||||||
|
func newFakeDockerClient() *FakeDockerClient {
|
||||||
return &FakeDockerClient{}
|
return &FakeDockerClient{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewFakeDockerClientWithContainers creates a client with the provided containers
|
// errorUnequal checks that two values are equal and fails the test if not
|
||||||
func NewFakeDockerClientWithContainers(containers []dockerTypes.Container) *FakeDockerClient {
|
func errorUnequal(t *testing.T, expected interface{}, actual interface{}, message string) {
|
||||||
return &FakeDockerClient{FakeContainers: containers}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ErrorUnequal checks that two values are equal and fails the test if not
|
|
||||||
func ErrorUnequal(t *testing.T, expected interface{}, actual interface{}, message string) {
|
|
||||||
if expected != actual {
|
if expected != actual {
|
||||||
t.Errorf("%s Expected: %+v Actual: %+v", message, expected, actual)
|
t.Errorf("%s Expected: %+v Actual: %+v", message, expected, actual)
|
||||||
}
|
}
|
||||||
@ -44,17 +59,17 @@ func ErrorUnequal(t *testing.T, expected interface{}, actual interface{}, messag
|
|||||||
// TestQueryScheduledJobs checks that when querying the Docker client that we
|
// TestQueryScheduledJobs checks that when querying the Docker client that we
|
||||||
// create jobs for any containers with a dockron.schedule
|
// create jobs for any containers with a dockron.schedule
|
||||||
func TestQueryScheduledJobs(t *testing.T) {
|
func TestQueryScheduledJobs(t *testing.T) {
|
||||||
client := NewFakeDockerClient()
|
client := newFakeDockerClient()
|
||||||
|
|
||||||
cases := []struct {
|
cases := []struct {
|
||||||
name string
|
name string
|
||||||
fakeContainers []dockerTypes.Container
|
fakeContainers []dockerTypes.Container
|
||||||
expectedJobs []ContainerStartJob
|
expectedJobs []ContainerCronJob
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "No containers",
|
name: "No containers",
|
||||||
fakeContainers: []dockerTypes.Container{},
|
fakeContainers: []dockerTypes.Container{},
|
||||||
expectedJobs: []ContainerStartJob{},
|
expectedJobs: []ContainerCronJob{},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "One container without schedule",
|
name: "One container without schedule",
|
||||||
@ -64,7 +79,7 @@ func TestQueryScheduledJobs(t *testing.T) {
|
|||||||
ID: "no_schedule_1",
|
ID: "no_schedule_1",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedJobs: []ContainerStartJob{},
|
expectedJobs: []ContainerCronJob{},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "One container with schedule",
|
name: "One container with schedule",
|
||||||
@ -77,13 +92,13 @@ func TestQueryScheduledJobs(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedJobs: []ContainerStartJob{
|
expectedJobs: []ContainerCronJob{
|
||||||
ContainerStartJob{
|
ContainerStartJob{
|
||||||
Name: "has_schedule_1",
|
name: "has_schedule_1",
|
||||||
ContainerID: "has_schedule_1",
|
containerID: "has_schedule_1",
|
||||||
Schedule: "* * * * *",
|
schedule: "* * * * *",
|
||||||
Context: context.Background(),
|
context: context.Background(),
|
||||||
Client: client,
|
client: client,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -102,13 +117,101 @@ func TestQueryScheduledJobs(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedJobs: []ContainerStartJob{
|
expectedJobs: []ContainerCronJob{
|
||||||
ContainerStartJob{
|
ContainerStartJob{
|
||||||
Name: "has_schedule_1",
|
name: "has_schedule_1",
|
||||||
ContainerID: "has_schedule_1",
|
containerID: "has_schedule_1",
|
||||||
Schedule: "* * * * *",
|
schedule: "* * * * *",
|
||||||
Context: context.Background(),
|
context: context.Background(),
|
||||||
Client: client,
|
client: client,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Incomplete exec job, schedule only",
|
||||||
|
fakeContainers: []dockerTypes.Container{
|
||||||
|
dockerTypes.Container{
|
||||||
|
Names: []string{"exec_job_1"},
|
||||||
|
ID: "exec_job_1",
|
||||||
|
Labels: map[string]string{
|
||||||
|
"dockron.test.schedule": "* * * * *",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedJobs: []ContainerCronJob{},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Incomplete exec job, command only",
|
||||||
|
fakeContainers: []dockerTypes.Container{
|
||||||
|
dockerTypes.Container{
|
||||||
|
Names: []string{"exec_job_1"},
|
||||||
|
ID: "exec_job_1",
|
||||||
|
Labels: map[string]string{
|
||||||
|
"dockron.test.command": "date",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedJobs: []ContainerCronJob{},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Complete exec job",
|
||||||
|
fakeContainers: []dockerTypes.Container{
|
||||||
|
dockerTypes.Container{
|
||||||
|
Names: []string{"exec_job_1"},
|
||||||
|
ID: "exec_job_1",
|
||||||
|
Labels: map[string]string{
|
||||||
|
"dockron.test.schedule": "* * * * *",
|
||||||
|
"dockron.test.command": "date",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedJobs: []ContainerCronJob{
|
||||||
|
ContainerExecJob{
|
||||||
|
ContainerStartJob: ContainerStartJob{
|
||||||
|
name: "exec_job_1/test",
|
||||||
|
containerID: "exec_job_1",
|
||||||
|
schedule: "* * * * *",
|
||||||
|
context: context.Background(),
|
||||||
|
client: client,
|
||||||
|
},
|
||||||
|
shellCommand: "date",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Dual exec jobs on single container",
|
||||||
|
fakeContainers: []dockerTypes.Container{
|
||||||
|
dockerTypes.Container{
|
||||||
|
Names: []string{"exec_job_1"},
|
||||||
|
ID: "exec_job_1",
|
||||||
|
Labels: map[string]string{
|
||||||
|
"dockron.test1.schedule": "* * * * *",
|
||||||
|
"dockron.test1.command": "date",
|
||||||
|
"dockron.test2.schedule": "* * * * *",
|
||||||
|
"dockron.test2.command": "date",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedJobs: []ContainerCronJob{
|
||||||
|
ContainerExecJob{
|
||||||
|
ContainerStartJob: ContainerStartJob{
|
||||||
|
name: "exec_job_1/test1",
|
||||||
|
containerID: "exec_job_1",
|
||||||
|
schedule: "* * * * *",
|
||||||
|
context: context.Background(),
|
||||||
|
client: client,
|
||||||
|
},
|
||||||
|
shellCommand: "date",
|
||||||
|
},
|
||||||
|
ContainerExecJob{
|
||||||
|
ContainerStartJob: ContainerStartJob{
|
||||||
|
name: "exec_job_1/test2",
|
||||||
|
containerID: "exec_job_1",
|
||||||
|
schedule: "* * * * *",
|
||||||
|
context: context.Background(),
|
||||||
|
client: client,
|
||||||
|
},
|
||||||
|
shellCommand: "date",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -123,12 +226,15 @@ func TestQueryScheduledJobs(t *testing.T) {
|
|||||||
client.FakeContainers = c.fakeContainers
|
client.FakeContainers = c.fakeContainers
|
||||||
|
|
||||||
jobs := QueryScheduledJobs(client)
|
jobs := QueryScheduledJobs(client)
|
||||||
|
// Sort so we can compare each list of jobs
|
||||||
|
sort.Slice(jobs, func(i, j int) bool {
|
||||||
|
return jobs[i].UniqueName() < jobs[j].UniqueName()
|
||||||
|
})
|
||||||
|
|
||||||
t.Logf("Expected jobs: %+v, Actual jobs: %+v", c.expectedJobs, jobs)
|
t.Logf("Expected jobs: %+v, Actual jobs: %+v", c.expectedJobs, jobs)
|
||||||
|
errorUnequal(t, len(c.expectedJobs), len(jobs), "Job lengths don't match")
|
||||||
ErrorUnequal(t, len(c.expectedJobs), len(jobs), "Job lengths don't match")
|
|
||||||
for i, job := range jobs {
|
for i, job := range jobs {
|
||||||
ErrorUnequal(t, c.expectedJobs[i], job, "Job value does not match")
|
errorUnequal(t, c.expectedJobs[i], job, "Job value does not match")
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -142,82 +248,82 @@ func TestScheduleJobs(t *testing.T) {
|
|||||||
// Tests must be executed sequentially!
|
// Tests must be executed sequentially!
|
||||||
cases := []struct {
|
cases := []struct {
|
||||||
name string
|
name string
|
||||||
queriedJobs []ContainerStartJob
|
queriedJobs []ContainerCronJob
|
||||||
expectedJobs []ContainerStartJob
|
expectedJobs []ContainerCronJob
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "No containers",
|
name: "No containers",
|
||||||
queriedJobs: []ContainerStartJob{},
|
queriedJobs: []ContainerCronJob{},
|
||||||
expectedJobs: []ContainerStartJob{},
|
expectedJobs: []ContainerCronJob{},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "One container with schedule",
|
name: "One container with schedule",
|
||||||
queriedJobs: []ContainerStartJob{
|
queriedJobs: []ContainerCronJob{
|
||||||
ContainerStartJob{
|
ContainerStartJob{
|
||||||
Name: "has_schedule_1",
|
name: "has_schedule_1",
|
||||||
ContainerID: "has_schedule_1",
|
containerID: "has_schedule_1",
|
||||||
Schedule: "* * * * *",
|
schedule: "* * * * *",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedJobs: []ContainerStartJob{
|
expectedJobs: []ContainerCronJob{
|
||||||
ContainerStartJob{
|
ContainerStartJob{
|
||||||
Name: "has_schedule_1",
|
name: "has_schedule_1",
|
||||||
ContainerID: "has_schedule_1",
|
containerID: "has_schedule_1",
|
||||||
Schedule: "* * * * *",
|
schedule: "* * * * *",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "Add a second job",
|
name: "Add a second job",
|
||||||
queriedJobs: []ContainerStartJob{
|
queriedJobs: []ContainerCronJob{
|
||||||
ContainerStartJob{
|
ContainerStartJob{
|
||||||
Name: "has_schedule_1",
|
name: "has_schedule_1",
|
||||||
ContainerID: "has_schedule_1",
|
containerID: "has_schedule_1",
|
||||||
Schedule: "* * * * *",
|
schedule: "* * * * *",
|
||||||
},
|
},
|
||||||
ContainerStartJob{
|
ContainerStartJob{
|
||||||
Name: "has_schedule_2",
|
name: "has_schedule_2",
|
||||||
ContainerID: "has_schedule_2",
|
containerID: "has_schedule_2",
|
||||||
Schedule: "* * * * *",
|
schedule: "* * * * *",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedJobs: []ContainerStartJob{
|
expectedJobs: []ContainerCronJob{
|
||||||
ContainerStartJob{
|
ContainerStartJob{
|
||||||
Name: "has_schedule_1",
|
name: "has_schedule_1",
|
||||||
ContainerID: "has_schedule_1",
|
containerID: "has_schedule_1",
|
||||||
Schedule: "* * * * *",
|
schedule: "* * * * *",
|
||||||
},
|
},
|
||||||
ContainerStartJob{
|
ContainerStartJob{
|
||||||
Name: "has_schedule_2",
|
name: "has_schedule_2",
|
||||||
ContainerID: "has_schedule_2",
|
containerID: "has_schedule_2",
|
||||||
Schedule: "* * * * *",
|
schedule: "* * * * *",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "Replace job 1",
|
name: "Replace job 1",
|
||||||
queriedJobs: []ContainerStartJob{
|
queriedJobs: []ContainerCronJob{
|
||||||
ContainerStartJob{
|
ContainerStartJob{
|
||||||
Name: "has_schedule_1",
|
name: "has_schedule_1",
|
||||||
ContainerID: "has_schedule_1_prime",
|
containerID: "has_schedule_1_prime",
|
||||||
Schedule: "* * * * *",
|
schedule: "* * * * *",
|
||||||
},
|
},
|
||||||
ContainerStartJob{
|
ContainerStartJob{
|
||||||
Name: "has_schedule_2",
|
name: "has_schedule_2",
|
||||||
ContainerID: "has_schedule_2",
|
containerID: "has_schedule_2",
|
||||||
Schedule: "* * * * *",
|
schedule: "* * * * *",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedJobs: []ContainerStartJob{
|
expectedJobs: []ContainerCronJob{
|
||||||
ContainerStartJob{
|
ContainerStartJob{
|
||||||
Name: "has_schedule_2",
|
name: "has_schedule_2",
|
||||||
ContainerID: "has_schedule_2",
|
containerID: "has_schedule_2",
|
||||||
Schedule: "* * * * *",
|
schedule: "* * * * *",
|
||||||
},
|
},
|
||||||
ContainerStartJob{
|
ContainerStartJob{
|
||||||
Name: "has_schedule_1",
|
name: "has_schedule_1",
|
||||||
ContainerID: "has_schedule_1_prime",
|
containerID: "has_schedule_1_prime",
|
||||||
Schedule: "* * * * *",
|
schedule: "* * * * *",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -234,9 +340,9 @@ func TestScheduleJobs(t *testing.T) {
|
|||||||
scheduledEntries := croner.Entries()
|
scheduledEntries := croner.Entries()
|
||||||
t.Logf("Cron entries: %+v", scheduledEntries)
|
t.Logf("Cron entries: %+v", scheduledEntries)
|
||||||
|
|
||||||
ErrorUnequal(t, len(c.expectedJobs), len(scheduledEntries), "Job and entry lengths don't match")
|
errorUnequal(t, len(c.expectedJobs), len(scheduledEntries), "Job and entry lengths don't match")
|
||||||
for i, entry := range scheduledEntries {
|
for i, entry := range scheduledEntries {
|
||||||
ErrorUnequal(t, c.expectedJobs[i], entry.Job, "Job value does not match entry")
|
errorUnequal(t, c.expectedJobs[i], entry.Job, "Job value does not match entry")
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -248,17 +354,17 @@ func TestScheduleJobs(t *testing.T) {
|
|||||||
// TestDoLoop is close to an integration test that checks the main loop logic
|
// TestDoLoop is close to an integration test that checks the main loop logic
|
||||||
func TestDoLoop(t *testing.T) {
|
func TestDoLoop(t *testing.T) {
|
||||||
croner := cron.New()
|
croner := cron.New()
|
||||||
client := NewFakeDockerClient()
|
client := newFakeDockerClient()
|
||||||
|
|
||||||
cases := []struct {
|
cases := []struct {
|
||||||
name string
|
name string
|
||||||
fakeContainers []dockerTypes.Container
|
fakeContainers []dockerTypes.Container
|
||||||
expectedJobs []ContainerStartJob
|
expectedJobs []ContainerCronJob
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "No containers",
|
name: "No containers",
|
||||||
fakeContainers: []dockerTypes.Container{},
|
fakeContainers: []dockerTypes.Container{},
|
||||||
expectedJobs: []ContainerStartJob{},
|
expectedJobs: []ContainerCronJob{},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "One container without schedule",
|
name: "One container without schedule",
|
||||||
@ -268,7 +374,7 @@ func TestDoLoop(t *testing.T) {
|
|||||||
ID: "no_schedule_1",
|
ID: "no_schedule_1",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedJobs: []ContainerStartJob{},
|
expectedJobs: []ContainerCronJob{},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "One container with schedule",
|
name: "One container with schedule",
|
||||||
@ -281,13 +387,13 @@ func TestDoLoop(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedJobs: []ContainerStartJob{
|
expectedJobs: []ContainerCronJob{
|
||||||
ContainerStartJob{
|
ContainerStartJob{
|
||||||
Name: "has_schedule_1",
|
name: "has_schedule_1",
|
||||||
ContainerID: "has_schedule_1",
|
containerID: "has_schedule_1",
|
||||||
Schedule: "* * * * *",
|
schedule: "* * * * *",
|
||||||
Context: context.Background(),
|
context: context.Background(),
|
||||||
Client: client,
|
client: client,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -306,13 +412,13 @@ func TestDoLoop(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedJobs: []ContainerStartJob{
|
expectedJobs: []ContainerCronJob{
|
||||||
ContainerStartJob{
|
ContainerStartJob{
|
||||||
Name: "has_schedule_1",
|
name: "has_schedule_1",
|
||||||
ContainerID: "has_schedule_1",
|
containerID: "has_schedule_1",
|
||||||
Schedule: "* * * * *",
|
schedule: "* * * * *",
|
||||||
Context: context.Background(),
|
context: context.Background(),
|
||||||
Client: client,
|
client: client,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -334,20 +440,20 @@ func TestDoLoop(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedJobs: []ContainerStartJob{
|
expectedJobs: []ContainerCronJob{
|
||||||
ContainerStartJob{
|
ContainerStartJob{
|
||||||
Name: "has_schedule_1",
|
name: "has_schedule_1",
|
||||||
ContainerID: "has_schedule_1",
|
containerID: "has_schedule_1",
|
||||||
Schedule: "* * * * *",
|
schedule: "* * * * *",
|
||||||
Context: context.Background(),
|
context: context.Background(),
|
||||||
Client: client,
|
client: client,
|
||||||
},
|
},
|
||||||
ContainerStartJob{
|
ContainerStartJob{
|
||||||
Name: "has_schedule_2",
|
name: "has_schedule_2",
|
||||||
ContainerID: "has_schedule_2",
|
containerID: "has_schedule_2",
|
||||||
Schedule: "* * * * *",
|
schedule: "* * * * *",
|
||||||
Context: context.Background(),
|
context: context.Background(),
|
||||||
Client: client,
|
client: client,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -369,20 +475,53 @@ func TestDoLoop(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedJobs: []ContainerStartJob{
|
expectedJobs: []ContainerCronJob{
|
||||||
ContainerStartJob{
|
ContainerStartJob{
|
||||||
Name: "has_schedule_2",
|
name: "has_schedule_2",
|
||||||
ContainerID: "has_schedule_2",
|
containerID: "has_schedule_2",
|
||||||
Schedule: "* * * * *",
|
schedule: "* * * * *",
|
||||||
Context: context.Background(),
|
context: context.Background(),
|
||||||
Client: client,
|
client: client,
|
||||||
},
|
},
|
||||||
ContainerStartJob{
|
ContainerStartJob{
|
||||||
Name: "has_schedule_1",
|
name: "has_schedule_1",
|
||||||
ContainerID: "has_schedule_1_prime",
|
containerID: "has_schedule_1_prime",
|
||||||
Schedule: "* * * * *",
|
schedule: "* * * * *",
|
||||||
Context: context.Background(),
|
context: context.Background(),
|
||||||
Client: client,
|
client: client,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Remove second container and add exec to first",
|
||||||
|
fakeContainers: []dockerTypes.Container{
|
||||||
|
dockerTypes.Container{
|
||||||
|
Names: []string{"has_schedule_1"},
|
||||||
|
ID: "has_schedule_1_prime",
|
||||||
|
Labels: map[string]string{
|
||||||
|
"dockron.schedule": "* * * * *",
|
||||||
|
"dockron.test.schedule": "* * * * *",
|
||||||
|
"dockron.test.command": "date",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedJobs: []ContainerCronJob{
|
||||||
|
ContainerStartJob{
|
||||||
|
name: "has_schedule_1",
|
||||||
|
containerID: "has_schedule_1_prime",
|
||||||
|
schedule: "* * * * *",
|
||||||
|
context: context.Background(),
|
||||||
|
client: client,
|
||||||
|
},
|
||||||
|
ContainerExecJob{
|
||||||
|
ContainerStartJob: ContainerStartJob{
|
||||||
|
name: "has_schedule_1/test",
|
||||||
|
containerID: "has_schedule_1_prime",
|
||||||
|
schedule: "* * * * *",
|
||||||
|
context: context.Background(),
|
||||||
|
client: client,
|
||||||
|
},
|
||||||
|
shellCommand: "date",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -405,9 +544,9 @@ func TestDoLoop(t *testing.T) {
|
|||||||
scheduledEntries := croner.Entries()
|
scheduledEntries := croner.Entries()
|
||||||
t.Logf("Cron entries: %+v", scheduledEntries)
|
t.Logf("Cron entries: %+v", scheduledEntries)
|
||||||
|
|
||||||
ErrorUnequal(t, len(c.expectedJobs), len(scheduledEntries), "Job and entry lengths don't match")
|
errorUnequal(t, len(c.expectedJobs), len(scheduledEntries), "Job and entry lengths don't match")
|
||||||
for i, entry := range scheduledEntries {
|
for i, entry := range scheduledEntries {
|
||||||
ErrorUnequal(t, c.expectedJobs[i], entry.Job, "Job value does not match entry")
|
errorUnequal(t, c.expectedJobs[i], entry.Job, "Job value does not match entry")
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user