homelab-nomad/scripts/nomad_backup_job.py
Ian Fijolek 480fcf144c Improvements for future restore tests
A new mode to deploy without running automatic backups so that
I can test restoration without writing a blank backup.
2025-02-24 09:54:59 -08:00

166 lines
4.9 KiB
Python
Executable File

#! /usr/bin/env python3
from os import environ
from time import sleep
from typing import Any
from typing import cast
from argparse import ArgumentParser, Namespace
import requests
NOMAD_ADDR = environ.get("NOMAD_ADDR", "http://127.0.0.1:4646")
NOMAD_TOKEN = environ.get("NOMAD_TOKEN")
def nomad_req(
*path: str,
params: dict[str, Any] | None = None,
data: dict[str, Any] | None = None,
method="GET",
) -> list[dict[str, Any]] | dict[str, Any] | str:
headers = {
"Content-Type": "application/json",
}
if NOMAD_TOKEN:
headers["X-Nomad-Token"] = NOMAD_TOKEN
response = requests.request(
method,
f"{NOMAD_ADDR}/v1/{'/'.join(path)}",
params=params,
json=data,
headers=headers,
)
try:
response.raise_for_status()
except requests.exceptions.RequestException as ex:
print(response.text)
raise ex
try:
return response.json()
except requests.exceptions.JSONDecodeError:
return response.text
def wait_for_job_alloc_status(job_id: str, status: str):
allocs = nomad_req("job", job_id, "allocations")
allocs = cast(list[dict[str, Any]], allocs)
while not all(alloc["ClientStatus"] == status for alloc in allocs):
print(f"Waiting for all allocs to reach {status}...")
sleep(5)
allocs = nomad_req("job", job_id, "allocations")
allocs = cast(list[dict[str, Any]], allocs)
def wait_for_eval_status(eval_id: str, status: str):
eval = nomad_req("evaluation", eval_id)
eval = cast(dict[str, Any], eval)
while eval["Status"] != status:
print(f"Waiting for eval to reach {status}...")
sleep(5)
eval = nomad_req("evaluation", eval_id)
eval = cast(dict[str, Any], eval)
def restart_job(job_id: str) -> str|None:
job_versions = nomad_req("job", job_id, "versions")
job_versions = cast(dict[str, Any], job_versions)
latest_stable_version: int = -1
for version in job_versions["Versions"]:
if version["Stable"] and version["Version"] > latest_stable_version:
latest_stable_version = version["Version"]
if latest_stable_version == -1:
print("No stable versions found")
return None
print(f"Reverting to version {latest_stable_version}")
revert = nomad_req("job", job_id, "revert", data={"JobVersion": latest_stable_version}, method="POST")
revert = cast(dict[str, Any], revert)
return revert["EvalID"]
def parse_arguments() -> Namespace:
parser = ArgumentParser(
description="Execute one off backups and restores of services",
)
parser.add_argument("service_name", help="Name of the service to backup or restore")
parser.add_argument("-a", "--action", default="backup", choices=("backup", "restore"), help="Action to take, backup or restore")
parser.add_argument("-s", "--snapshot", default="latest", help="Backup snapshot to restore, if restore is the chosen action")
parser.add_argument("-x", "--extra-safe", action="store_true", help="Perform extra safe backup or restore by stoping target job first")
args = parser.parse_args()
return args
def main() -> int:
args = parse_arguments()
service_name = args.service_name
service_info = nomad_req("service", service_name, params={"choose": "1|backups"})
if not service_info:
print(f"Could not find service {service_name}")
return 1
service_info = cast(list[dict[str, Any]], service_info)
node_id = service_info[0]["NodeID"]
job_id = service_info[0]["JobID"]
node = nomad_req("node", node_id)
node = cast(dict[str, Any], node)
node_name = node["Name"]
backup_job_name = f"backup-oneoff-{node_name}"
backup_job = nomad_req("job", backup_job_name)
if not backup_job:
print(f"Could not find backup job {backup_job_name} for {service_name}")
if args.extra_safe:
print("Stopping job allocs")
_ = nomad_req("job", job_id, method="DELETE")
wait_for_job_alloc_status(job_id, "complete")
backup_job = cast(dict[str, Any], backup_job)
backup_job_id = backup_job["ID"]
dispatch = nomad_req(
"job",
backup_job_id,
"dispatch",
data={
"Payload": None,
"Meta": {
"job_name": service_name,
"task": args.action,
"snapshot": args.snapshot,
},
},
method="POST",
)
dispatch = cast(dict[str, Any], dispatch)
if args.extra_safe:
print(f"Wait for {args.action} to finish")
wait_for_eval_status(dispatch["EvalID"], "complete")
print(f"{args.action.capitalize()} complete. Verify success and restart job")
revert_eval_id = restart_job(job_id)
if revert_eval_id:
wait_for_eval_status(revert_eval_id, "running")
else:
print("No stable versions to revert to")
return 0
if __name__ == "__main__":
exit(main())