#! /usr/bin/env python3 from os import environ from time import sleep from typing import Any from typing import cast from argparse import ArgumentParser, Namespace import requests NOMAD_ADDR = environ.get("NOMAD_ADDR", "http://127.0.0.1:4646") NOMAD_TOKEN = environ.get("NOMAD_TOKEN") def nomad_req( *path: str, params: dict[str, Any] | None = None, data: dict[str, Any] | None = None, method="GET", ) -> list[dict[str, Any]] | dict[str, Any] | str: headers = { "Content-Type": "application/json", } if NOMAD_TOKEN: headers["X-Nomad-Token"] = NOMAD_TOKEN response = requests.request( method, f"{NOMAD_ADDR}/v1/{'/'.join(path)}", params=params, json=data, headers=headers, ) try: response.raise_for_status() except requests.exceptions.RequestException as ex: print(response.text) raise ex try: return response.json() except requests.exceptions.JSONDecodeError: return response.text def wait_for_job_alloc_status(job_id: str, status: str): allocs = nomad_req("job", job_id, "allocations") allocs = cast(list[dict[str, Any]], allocs) while not all(alloc["ClientStatus"] == status for alloc in allocs): print(f"Waiting for all allocs to reach {status}...") sleep(5) allocs = nomad_req("job", job_id, "allocations") allocs = cast(list[dict[str, Any]], allocs) def wait_for_eval_status(eval_id: str, status: str): eval = nomad_req("evaluation", eval_id) eval = cast(dict[str, Any], eval) while eval["Status"] != status: print(f"Waiting for eval to reach {status}...") sleep(5) eval = nomad_req("evaluation", eval_id) eval = cast(dict[str, Any], eval) def restart_job(job_id: str) -> str|None: job_versions = nomad_req("job", job_id, "versions") job_versions = cast(dict[str, Any], job_versions) latest_stable_version: int = -1 for version in job_versions["Versions"]: if version["Stable"] and version["Version"] > latest_stable_version: latest_stable_version = version["Version"] if latest_stable_version == -1: print("No stable versions found") return None print(f"Reverting to version {latest_stable_version}") revert = nomad_req("job", job_id, "revert", data={"JobVersion": latest_stable_version}, method="POST") revert = cast(dict[str, Any], revert) return revert["EvalID"] def parse_arguments() -> Namespace: parser = ArgumentParser( description="Execute one off backups and restores of services", ) parser.add_argument("service_name", help="Name of the service to backup or restore") parser.add_argument("-a", "--action", default="backup", choices=("backup", "restore"), help="Action to take, backup or restore") parser.add_argument("-s", "--snapshot", default="latest", help="Backup snapshot to restore, if restore is the chosen action") parser.add_argument("-x", "--extra-safe", action="store_true", help="Perform extra safe backup or restore by stoping target job first") args = parser.parse_args() return args def main() -> int: args = parse_arguments() service_name = args.service_name service_info = nomad_req("service", service_name, params={"choose": "1|backups"}) if not service_info: print(f"Could not find service {service_name}") return 1 service_info = cast(list[dict[str, Any]], service_info) node_id = service_info[0]["NodeID"] job_id = service_info[0]["JobID"] node = nomad_req("node", node_id) node = cast(dict[str, Any], node) node_name = node["Name"] backup_job_name = f"backup-oneoff-{node_name}" backup_job = nomad_req("job", backup_job_name) if not backup_job: print(f"Could not find backup job {backup_job_name} for {service_name}") if args.extra_safe: print("Stopping job allocs") _ = nomad_req("job", job_id, method="DELETE") wait_for_job_alloc_status(job_id, "complete") backup_job = cast(dict[str, Any], backup_job) backup_job_id = backup_job["ID"] dispatch = nomad_req( "job", backup_job_id, "dispatch", data={ "Payload": None, "Meta": { "job_name": service_name, "task": args.action, "snapshot": args.snapshot, }, }, method="POST", ) dispatch = cast(dict[str, Any], dispatch) if args.extra_safe: print(f"Wait for {args.action} to finish") wait_for_eval_status(dispatch["EvalID"], "complete") print(f"{args.action.capitalize()} complete. Verify success and restart job") revert_eval_id = restart_job(job_id) if revert_eval_id: wait_for_eval_status(revert_eval_id, "running") else: print("No stable versions to revert to") return 0 if __name__ == "__main__": exit(main())