This commit is contained in:
parent
d4782bfae2
commit
bdbd9fb722
15
job.go
15
job.go
@ -225,12 +225,25 @@ func (j Job) Healthy() (bool, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (j Job) Run() {
|
func (j Job) Run() {
|
||||||
|
result := JobResult{
|
||||||
|
JobName: j.Name,
|
||||||
|
JobType: "backup",
|
||||||
|
Success: true,
|
||||||
|
LastError: nil,
|
||||||
|
Message: "",
|
||||||
|
}
|
||||||
|
|
||||||
if err := j.RunBackup(); err != nil {
|
if err := j.RunBackup(); err != nil {
|
||||||
j.healthy = false
|
j.healthy = false
|
||||||
j.lastErr = err
|
j.lastErr = err
|
||||||
|
|
||||||
j.Logger().Printf("ERROR: Backup failed: %v", err)
|
j.Logger().Printf("ERROR: Backup failed: %s", err.Error())
|
||||||
|
|
||||||
|
result.Success = false
|
||||||
|
result.LastError = err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
JobComplete(result)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (j Job) NewRestic() *Restic {
|
func (j Job) NewRestic() *Restic {
|
||||||
|
5
main.go
5
main.go
@ -136,6 +136,7 @@ func main() {
|
|||||||
backup := flag.String("backup", "", "Run backup jobs now. Names are comma separated and `all` will run all.")
|
backup := flag.String("backup", "", "Run backup jobs now. Names are comma separated and `all` will run all.")
|
||||||
restore := flag.String("restore", "", "Run restore jobs now. Names are comma separated and `all` will run all.")
|
restore := flag.String("restore", "", "Run restore jobs now. Names are comma separated and `all` will run all.")
|
||||||
once := flag.Bool("once", false, "Run jobs specified using -backup and -restore once and exit")
|
once := flag.Bool("once", false, "Run jobs specified using -backup and -restore once and exit")
|
||||||
|
healthCheckAddr := flag.String("addr", "0.0.0.0:8080", "address to bind health check API")
|
||||||
flag.StringVar(&JobBaseDir, "base-dir", JobBaseDir, "Base dir to create intermediate job files like SQL dumps.")
|
flag.StringVar(&JobBaseDir, "base-dir", JobBaseDir, "Base dir to create intermediate job files like SQL dumps.")
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
|
|
||||||
@ -174,6 +175,10 @@ func main() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
_ = RunHTTPHandlers(*healthCheckAddr)
|
||||||
|
}()
|
||||||
|
|
||||||
// TODO: Add healthcheck handler using Job.Healthy()
|
// TODO: Add healthcheck handler using Job.Healthy()
|
||||||
if err := ScheduleAndRunJobs(jobs); err != nil {
|
if err := ScheduleAndRunJobs(jobs); err != nil {
|
||||||
log.Fatalf("failed running jobs: %v", err)
|
log.Fatalf("failed running jobs: %v", err)
|
||||||
|
29
restic.go
29
restic.go
@ -241,6 +241,33 @@ func (rcmd Restic) BuildEnv() []string {
|
|||||||
return envList
|
return envList
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type ResticError struct {
|
||||||
|
OriginalError error
|
||||||
|
Command string
|
||||||
|
Output []string
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewResticError(command string, output []string, originalError error) *ResticError {
|
||||||
|
return &ResticError{
|
||||||
|
OriginalError: originalError,
|
||||||
|
Command: command,
|
||||||
|
Output: output,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *ResticError) Error() string {
|
||||||
|
return fmt.Sprintf(
|
||||||
|
"error running restic %s: %s\nOutput:\n%s",
|
||||||
|
e.Command,
|
||||||
|
e.OriginalError,
|
||||||
|
strings.Join(e.Output, "\n"),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *ResticError) Unwrap() error {
|
||||||
|
return e.OriginalError
|
||||||
|
}
|
||||||
|
|
||||||
func (rcmd Restic) RunRestic(command string, options CommandOptions, commandArgs ...string) ([]string, error) {
|
func (rcmd Restic) RunRestic(command string, options CommandOptions, commandArgs ...string) ([]string, error) {
|
||||||
args := []string{}
|
args := []string{}
|
||||||
if rcmd.GlobalOpts != nil {
|
if rcmd.GlobalOpts != nil {
|
||||||
@ -265,7 +292,7 @@ func (rcmd Restic) RunRestic(command string, options CommandOptions, commandArgs
|
|||||||
responseErr = ErrRepoNotFound
|
responseErr = ErrRepoNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
return output.Lines, fmt.Errorf("error running restic %s: %w", command, responseErr)
|
return output.Lines, NewResticError(command, output.Lines, responseErr)
|
||||||
}
|
}
|
||||||
|
|
||||||
return output.Lines, nil
|
return output.Lines, nil
|
||||||
|
76
scheduler.go
76
scheduler.go
@ -1,40 +1,102 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
|
"sync"
|
||||||
"syscall"
|
"syscall"
|
||||||
|
|
||||||
"github.com/robfig/cron/v3"
|
"github.com/robfig/cron/v3"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var jobResultsLock = sync.Mutex{}
|
||||||
|
var jobResults = map[string]JobResult{}
|
||||||
|
|
||||||
|
type JobResult struct {
|
||||||
|
JobName string
|
||||||
|
JobType string
|
||||||
|
Success bool
|
||||||
|
LastError error
|
||||||
|
Message string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r JobResult) Format() string {
|
||||||
|
return fmt.Sprintf("%s %s ok? %v\n\n%+v", r.JobName, r.JobType, r.Success, r.LastError)
|
||||||
|
}
|
||||||
|
|
||||||
|
func JobComplete(result JobResult) {
|
||||||
|
fmt.Printf("Completed job %+v\n", result)
|
||||||
|
|
||||||
|
jobResultsLock.Lock()
|
||||||
|
jobResults[result.JobName] = result
|
||||||
|
jobResultsLock.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func writeJobResult(writer http.ResponseWriter, jobName string) {
|
||||||
|
if jobResult, ok := jobResults[jobName]; ok {
|
||||||
|
if !jobResult.Success {
|
||||||
|
writer.WriteHeader(http.StatusServiceUnavailable)
|
||||||
|
}
|
||||||
|
|
||||||
|
jobResult.Message = jobResult.LastError.Error()
|
||||||
|
if err := json.NewEncoder(writer).Encode(jobResult); err != nil {
|
||||||
|
_, _ = writer.Write([]byte(fmt.Sprintf("failed writing json for %s", jobResult.Format())))
|
||||||
|
}
|
||||||
|
|
||||||
|
writer.Header().Set("Content-Type", "application/json")
|
||||||
|
} else {
|
||||||
|
writer.WriteHeader(http.StatusNotFound)
|
||||||
|
_, _ = writer.Write([]byte("{\"Message\": \"Unknown job\"}"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func healthHandleFunc(writer http.ResponseWriter, request *http.Request) {
|
||||||
|
query := request.URL.Query()
|
||||||
|
if jobName, ok := query["job"]; ok {
|
||||||
|
writeJobResult(writer, jobName[0])
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
_, _ = writer.Write([]byte("ok"))
|
||||||
|
}
|
||||||
|
|
||||||
|
func RunHTTPHandlers(addr string) error {
|
||||||
|
http.HandleFunc("/health", healthHandleFunc)
|
||||||
|
|
||||||
|
return fmt.Errorf("error on healthcheck: %w", http.ListenAndServe(addr, nil))
|
||||||
|
}
|
||||||
|
|
||||||
func ScheduleAndRunJobs(jobs []Job) error {
|
func ScheduleAndRunJobs(jobs []Job) error {
|
||||||
signalChan := make(chan os.Signal, 1)
|
signalChan := make(chan os.Signal, 1)
|
||||||
|
|
||||||
signal.Notify(signalChan,
|
signal.Notify(
|
||||||
|
signalChan,
|
||||||
syscall.SIGINT,
|
syscall.SIGINT,
|
||||||
syscall.SIGTERM,
|
syscall.SIGTERM,
|
||||||
syscall.SIGQUIT,
|
syscall.SIGQUIT,
|
||||||
)
|
)
|
||||||
|
|
||||||
runner := cron.New()
|
scheduler := cron.New()
|
||||||
|
|
||||||
for _, job := range jobs {
|
for _, job := range jobs {
|
||||||
fmt.Println("Scheduling", job.Name)
|
fmt.Println("Scheduling", job.Name)
|
||||||
|
|
||||||
if _, err := runner.AddJob(job.Schedule, job); err != nil {
|
if _, err := scheduler.AddJob(job.Schedule, job); err != nil {
|
||||||
return fmt.Errorf("Error scheduling job %s: %w", job.Name, err)
|
return fmt.Errorf("error scheduling job %s: %w", job.Name, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
runner.Start()
|
scheduler.Start()
|
||||||
|
|
||||||
switch <-signalChan {
|
switch <-signalChan {
|
||||||
case syscall.SIGINT:
|
case syscall.SIGINT:
|
||||||
fmt.Println("Stopping now...")
|
fmt.Println("Stopping now...")
|
||||||
|
|
||||||
defer runner.Stop()
|
defer scheduler.Stop()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
case syscall.SIGTERM:
|
case syscall.SIGTERM:
|
||||||
@ -44,7 +106,7 @@ func ScheduleAndRunJobs(jobs []Job) error {
|
|||||||
fmt.Println("Stopping after running jobs complete...")
|
fmt.Println("Stopping after running jobs complete...")
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
ctx := runner.Stop()
|
ctx := scheduler.Stop()
|
||||||
<-ctx.Done()
|
<-ctx.Done()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
4
tasks.go
4
tasks.go
@ -223,7 +223,7 @@ func (t BackupFilesTask) RunBackup(cfg TaskConfig) error {
|
|||||||
|
|
||||||
if err := cfg.Restic.Backup(cfg.BackupPaths, *t.BackupOpts); err != nil {
|
if err := cfg.Restic.Backup(cfg.BackupPaths, *t.BackupOpts); err != nil {
|
||||||
err = fmt.Errorf("failed backing up paths: %w", err)
|
err = fmt.Errorf("failed backing up paths: %w", err)
|
||||||
cfg.Logger.Fatal(err)
|
cfg.Logger.Print(err)
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -239,7 +239,7 @@ func (t BackupFilesTask) RunRestore(cfg TaskConfig) error {
|
|||||||
// TODO: Make the snapshot configurable
|
// TODO: Make the snapshot configurable
|
||||||
if err := cfg.Restic.Restore("latest", *t.RestoreOpts); err != nil {
|
if err := cfg.Restic.Restore("latest", *t.RestoreOpts); err != nil {
|
||||||
err = fmt.Errorf("failed restoring paths: %w", err)
|
err = fmt.Errorf("failed restoring paths: %w", err)
|
||||||
cfg.Logger.Fatal(err)
|
cfg.Logger.Print(err)
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -1,40 +1,37 @@
|
|||||||
job "TestBackup" {
|
job "TestBackup" {
|
||||||
schedule = "1 * * * *"
|
schedule = "* * * * *"
|
||||||
|
|
||||||
config {
|
config {
|
||||||
repo = "./backups"
|
repo = "test/data/backups"
|
||||||
passphrase = "supersecret"
|
passphrase = "supersecret"
|
||||||
|
|
||||||
options {
|
options {
|
||||||
CacheDir = "./cache"
|
CacheDir = "test/data/cache"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
task "before script" {
|
task "create test data" {
|
||||||
script {
|
pre_script {
|
||||||
on_backup = "echo before backup!"
|
on_backup = "echo test > test/data/data/test.txt"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
task "backup" {
|
task "backup phases" {
|
||||||
backup {
|
pre_script {
|
||||||
files = [
|
on_backup = "echo 'pre-backup'"
|
||||||
"./data"
|
on_restore = "echo 'pre-restore'"
|
||||||
]
|
}
|
||||||
|
|
||||||
backup_opts {
|
post_script {
|
||||||
Tags = ["foo"]
|
on_backup = "echo 'post-backup'"
|
||||||
}
|
on_restore = "echo 'post-restore'"
|
||||||
|
|
||||||
restore_opts {
|
|
||||||
Target = "."
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
task "after script" {
|
backup {
|
||||||
script {
|
paths = ["./test/data/data"]
|
||||||
on_backup = "echo after backup!"
|
restore_opts {
|
||||||
|
Target = "."
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user