129 lines
3.8 KiB
Python
129 lines
3.8 KiB
Python
|
#! /usr/bin/env python3
|
||
|
from os import environ
|
||
|
from time import sleep
|
||
|
from typing import Any
|
||
|
from typing import cast
|
||
|
from argparse import ArgumentParser
|
||
|
|
||
|
import requests
|
||
|
|
||
|
|
||
|
NOMAD_ADDR = environ.get("NOMAD_ADDR", "http://127.0.0.1:4646")
|
||
|
NOMAD_TOKEN = environ.get("NOMAD_TOKEN")
|
||
|
|
||
|
|
||
|
def nomad_req(
|
||
|
*path: str,
|
||
|
params: dict[str, Any] | None = None,
|
||
|
data: dict[str, Any] | None = None,
|
||
|
method="GET",
|
||
|
) -> list[dict[str, Any]] | dict[str, Any] | str:
|
||
|
headers = {
|
||
|
"Content-Type": "application/json",
|
||
|
}
|
||
|
if NOMAD_TOKEN:
|
||
|
headers["X-Nomad-Token"] = NOMAD_TOKEN
|
||
|
|
||
|
response = requests.request(
|
||
|
method,
|
||
|
f"{NOMAD_ADDR}/v1/{'/'.join(path)}",
|
||
|
params=params,
|
||
|
json=data,
|
||
|
headers=headers,
|
||
|
)
|
||
|
try:
|
||
|
response.raise_for_status()
|
||
|
except requests.exceptions.RequestException as ex:
|
||
|
print(response.text)
|
||
|
raise ex
|
||
|
|
||
|
try:
|
||
|
return response.json()
|
||
|
except requests.exceptions.JSONDecodeError:
|
||
|
return response.text
|
||
|
|
||
|
|
||
|
def wait_for_job_alloc_status(job_id: str, status: str):
|
||
|
allocs = nomad_req("job", job_id, "allocations")
|
||
|
allocs = cast(list[dict[str, Any]], allocs)
|
||
|
|
||
|
while not all(alloc["ClientStatus"] == status for alloc in allocs):
|
||
|
print(f"Waiting for all allocs to reach {status}...")
|
||
|
sleep(5)
|
||
|
allocs = nomad_req("job", job_id, "allocations")
|
||
|
allocs = cast(list[dict[str, Any]], allocs)
|
||
|
|
||
|
|
||
|
def wait_for_eval_status(eval_id: str, status: str):
|
||
|
eval = nomad_req("evaluation", eval_id)
|
||
|
eval = cast(dict[str, Any], eval)
|
||
|
|
||
|
while eval["Status"] != status:
|
||
|
print(f"Waiting for eval to reach {status}...")
|
||
|
sleep(5)
|
||
|
eval = nomad_req("evaluation", eval_id)
|
||
|
eval = cast(dict[str, Any], eval)
|
||
|
|
||
|
|
||
|
parser = ArgumentParser(
|
||
|
description="Execute one off backups and restores of services",
|
||
|
)
|
||
|
parser.add_argument("service_name", help="Name of the service to backup or restore")
|
||
|
parser.add_argument("-a", "--action", default="backup", choices=("backup", "restore"), help="Action to take, backup or restore")
|
||
|
parser.add_argument("-s", "--snapshot", default="latest", help="Backup snapshot to restore, if restore is the chosen action")
|
||
|
parser.add_argument("-x", "--extra-safe", action="store_true", help="Perform extra safe backup or restore by stoping target job first")
|
||
|
args = parser.parse_args()
|
||
|
|
||
|
service_name = args.service_name
|
||
|
service_info = nomad_req("service", service_name, params={"choose": "1|backups"})
|
||
|
|
||
|
if not service_info:
|
||
|
print(f"Could not find service {service_name}")
|
||
|
exit(1)
|
||
|
|
||
|
service_info = cast(list[dict[str, Any]], service_info)
|
||
|
node_id = service_info[0]["NodeID"]
|
||
|
job_id = service_info[0]["JobID"]
|
||
|
|
||
|
node = nomad_req("node", node_id)
|
||
|
node = cast(dict[str, Any], node)
|
||
|
node_name = node["Name"]
|
||
|
backup_job_name = f"backup-oneoff-{node_name}"
|
||
|
|
||
|
backup_job = nomad_req("job", backup_job_name)
|
||
|
if not backup_job:
|
||
|
print(f"Could not find backup job {backup_job_name} for {service_name}")
|
||
|
|
||
|
if args.extra_safe:
|
||
|
print("Stopping job allocs")
|
||
|
stop_job = nomad_req("job", job_id, method="DELETE")
|
||
|
print(stop_job)
|
||
|
wait_for_job_alloc_status(job_id, "complete")
|
||
|
|
||
|
backup_job = cast(dict[str, Any], backup_job)
|
||
|
backup_job_id = backup_job["ID"]
|
||
|
|
||
|
dispatch = nomad_req(
|
||
|
"job",
|
||
|
backup_job_id,
|
||
|
"dispatch",
|
||
|
data={
|
||
|
"Payload": None,
|
||
|
"Meta": {
|
||
|
"job_name": service_name,
|
||
|
"task": args.action,
|
||
|
"snapshot": args.snapshot,
|
||
|
},
|
||
|
},
|
||
|
method="POST",
|
||
|
)
|
||
|
dispatch = cast(dict[str, Any], dispatch)
|
||
|
print(dispatch)
|
||
|
|
||
|
if args.extra_safe:
|
||
|
print(f"Wait for {args.action} to finish")
|
||
|
wait_for_eval_status(dispatch["EvalID"], "complete")
|
||
|
|
||
|
print("Backup complete. Verify success and restart job")
|
||
|
# If auto restarting, get versions and "revert" to version n-1 since n will be the recently stopped version
|