From 08f92f8ba599de3c73c5a31f2059907183109e07 Mon Sep 17 00:00:00 2001 From: Ian Fijolek Date: Sat, 26 Aug 2023 15:59:50 -0700 Subject: [PATCH] Add new backup and orphaned service scripts --- scripts/nomad_backup_job.py | 128 +++++++++++++++++++++++++++++++ scripts/nomad_orphan_services.py | 52 +++++++++++++ 2 files changed, 180 insertions(+) create mode 100755 scripts/nomad_backup_job.py create mode 100755 scripts/nomad_orphan_services.py diff --git a/scripts/nomad_backup_job.py b/scripts/nomad_backup_job.py new file mode 100755 index 0000000..12023ae --- /dev/null +++ b/scripts/nomad_backup_job.py @@ -0,0 +1,128 @@ +#! /usr/bin/env python3 +from os import environ +from time import sleep +from typing import Any +from typing import cast +from argparse import ArgumentParser + +import requests + + +NOMAD_ADDR = environ.get("NOMAD_ADDR", "http://127.0.0.1:4646") +NOMAD_TOKEN = environ.get("NOMAD_TOKEN") + + +def nomad_req( + *path: str, + params: dict[str, Any] | None = None, + data: dict[str, Any] | None = None, + method="GET", +) -> list[dict[str, Any]] | dict[str, Any] | str: + headers = { + "Content-Type": "application/json", + } + if NOMAD_TOKEN: + headers["X-Nomad-Token"] = NOMAD_TOKEN + + response = requests.request( + method, + f"{NOMAD_ADDR}/v1/{'/'.join(path)}", + params=params, + json=data, + headers=headers, + ) + try: + response.raise_for_status() + except requests.exceptions.RequestException as ex: + print(response.text) + raise ex + + try: + return response.json() + except requests.exceptions.JSONDecodeError: + return response.text + + +def wait_for_job_alloc_status(job_id: str, status: str): + allocs = nomad_req("job", job_id, "allocations") + allocs = cast(list[dict[str, Any]], allocs) + + while not all(alloc["ClientStatus"] == status for alloc in allocs): + print(f"Waiting for all allocs to reach {status}...") + sleep(5) + allocs = nomad_req("job", job_id, "allocations") + allocs = cast(list[dict[str, Any]], allocs) + + +def wait_for_eval_status(eval_id: str, status: str): + eval = nomad_req("evaluation", eval_id) + eval = cast(dict[str, Any], eval) + + while eval["Status"] != status: + print(f"Waiting for eval to reach {status}...") + sleep(5) + eval = nomad_req("evaluation", eval_id) + eval = cast(dict[str, Any], eval) + + +parser = ArgumentParser( + description="Execute one off backups and restores of services", +) +parser.add_argument("service_name", help="Name of the service to backup or restore") +parser.add_argument("-a", "--action", default="backup", choices=("backup", "restore"), help="Action to take, backup or restore") +parser.add_argument("-s", "--snapshot", default="latest", help="Backup snapshot to restore, if restore is the chosen action") +parser.add_argument("-x", "--extra-safe", action="store_true", help="Perform extra safe backup or restore by stoping target job first") +args = parser.parse_args() + +service_name = args.service_name +service_info = nomad_req("service", service_name, params={"choose": "1|backups"}) + +if not service_info: + print(f"Could not find service {service_name}") + exit(1) + +service_info = cast(list[dict[str, Any]], service_info) +node_id = service_info[0]["NodeID"] +job_id = service_info[0]["JobID"] + +node = nomad_req("node", node_id) +node = cast(dict[str, Any], node) +node_name = node["Name"] +backup_job_name = f"backup-oneoff-{node_name}" + +backup_job = nomad_req("job", backup_job_name) +if not backup_job: + print(f"Could not find backup job {backup_job_name} for {service_name}") + +if args.extra_safe: + print("Stopping job allocs") + stop_job = nomad_req("job", job_id, method="DELETE") + print(stop_job) + wait_for_job_alloc_status(job_id, "complete") + +backup_job = cast(dict[str, Any], backup_job) +backup_job_id = backup_job["ID"] + +dispatch = nomad_req( + "job", + backup_job_id, + "dispatch", + data={ + "Payload": None, + "Meta": { + "job_name": service_name, + "task": args.action, + "snapshot": args.snapshot, + }, + }, + method="POST", +) +dispatch = cast(dict[str, Any], dispatch) +print(dispatch) + +if args.extra_safe: + print(f"Wait for {args.action} to finish") + wait_for_eval_status(dispatch["EvalID"], "complete") + + print("Backup complete. Verify success and restart job") + # If auto restarting, get versions and "revert" to version n-1 since n will be the recently stopped version diff --git a/scripts/nomad_orphan_services.py b/scripts/nomad_orphan_services.py new file mode 100755 index 0000000..75a43ad --- /dev/null +++ b/scripts/nomad_orphan_services.py @@ -0,0 +1,52 @@ +#! /usr/bin/env python3 +from os import environ +from typing import Any +from typing import cast + +import requests + + +NOMAD_ADDR = environ.get("NOMAD_ADDR", "http://127.0.0.1:4646") +NOMAD_TOKEN = environ.get("NOMAD_TOKEN") + + +def nomad_req( + *path: str, params: dict[str, Any] | None = None, method="GET" +) -> list[dict[str, Any]] | dict[str, Any] | str: + headers = {} + if NOMAD_TOKEN: + headers["X-Nomad-Token"] = NOMAD_TOKEN + + response = requests.request( + method, + f"{NOMAD_ADDR}/v1/{'/'.join(path)}", + params=params, + headers=headers, + ) + response.raise_for_status() + + try: + return response.json() + except requests.exceptions.JSONDecodeError: + return response.text + + +for namespace in nomad_req("services"): + namespace = cast(dict[str, Any], namespace) + for service in namespace["Services"]: + service_name = service["ServiceName"] + for service_instance in nomad_req("service", service_name): + service_instance = cast(dict[str, Any], service_instance) + service_id = service_instance["ID"] + alloc_id = service_instance["AllocID"] + + try: + alloc = nomad_req("allocation", alloc_id) + continue + except requests.exceptions.HTTPError as e: + if e.response.status_code == 404: + print( + f"alloc {alloc_id} not found for {service_name}. Deleting {service_id}" + ) + nomad_req("service", service_name, service_id, method="DELETE") + raise e