#! /usr/bin/env python3 from os import environ from time import sleep from typing import Any from typing import cast from argparse import ArgumentParser import requests NOMAD_ADDR = environ.get("NOMAD_ADDR", "http://127.0.0.1:4646") NOMAD_TOKEN = environ.get("NOMAD_TOKEN") def nomad_req( *path: str, params: dict[str, Any] | None = None, data: dict[str, Any] | None = None, method="GET", ) -> list[dict[str, Any]] | dict[str, Any] | str: headers = { "Content-Type": "application/json", } if NOMAD_TOKEN: headers["X-Nomad-Token"] = NOMAD_TOKEN response = requests.request( method, f"{NOMAD_ADDR}/v1/{'/'.join(path)}", params=params, json=data, headers=headers, ) try: response.raise_for_status() except requests.exceptions.RequestException as ex: print(response.text) raise ex try: return response.json() except requests.exceptions.JSONDecodeError: return response.text def wait_for_job_alloc_status(job_id: str, status: str): allocs = nomad_req("job", job_id, "allocations") allocs = cast(list[dict[str, Any]], allocs) while not all(alloc["ClientStatus"] == status for alloc in allocs): print(f"Waiting for all allocs to reach {status}...") sleep(5) allocs = nomad_req("job", job_id, "allocations") allocs = cast(list[dict[str, Any]], allocs) def wait_for_eval_status(eval_id: str, status: str): eval = nomad_req("evaluation", eval_id) eval = cast(dict[str, Any], eval) while eval["Status"] != status: print(f"Waiting for eval to reach {status}...") sleep(5) eval = nomad_req("evaluation", eval_id) eval = cast(dict[str, Any], eval) parser = ArgumentParser( description="Execute one off backups and restores of services", ) parser.add_argument("service_name", help="Name of the service to backup or restore") parser.add_argument("-a", "--action", default="backup", choices=("backup", "restore"), help="Action to take, backup or restore") parser.add_argument("-s", "--snapshot", default="latest", help="Backup snapshot to restore, if restore is the chosen action") parser.add_argument("-x", "--extra-safe", action="store_true", help="Perform extra safe backup or restore by stoping target job first") args = parser.parse_args() service_name = args.service_name service_info = nomad_req("service", service_name, params={"choose": "1|backups"}) if not service_info: print(f"Could not find service {service_name}") exit(1) service_info = cast(list[dict[str, Any]], service_info) node_id = service_info[0]["NodeID"] job_id = service_info[0]["JobID"] node = nomad_req("node", node_id) node = cast(dict[str, Any], node) node_name = node["Name"] backup_job_name = f"backup-oneoff-{node_name}" backup_job = nomad_req("job", backup_job_name) if not backup_job: print(f"Could not find backup job {backup_job_name} for {service_name}") if args.extra_safe: print("Stopping job allocs") stop_job = nomad_req("job", job_id, method="DELETE") print(stop_job) wait_for_job_alloc_status(job_id, "complete") backup_job = cast(dict[str, Any], backup_job) backup_job_id = backup_job["ID"] dispatch = nomad_req( "job", backup_job_id, "dispatch", data={ "Payload": None, "Meta": { "job_name": service_name, "task": args.action, "snapshot": args.snapshot, }, }, method="POST", ) dispatch = cast(dict[str, Any], dispatch) print(dispatch) if args.extra_safe: print(f"Wait for {args.action} to finish") wait_for_eval_status(dispatch["EvalID"], "complete") print("Backup complete. Verify success and restart job") # If auto restarting, get versions and "revert" to version n-1 since n will be the recently stopped version