nomad-service-fixers/nomad_missing_services.py

108 lines
3.2 KiB
Python
Executable File

#! /usr/bin/env python3
from argparse import ArgumentParser
from os import environ
from typing import Any
from typing import cast
import requests_unixsocket
from requests.exceptions import JSONDecodeError
session = requests_unixsocket.Session()
NOMAD_ADDR = environ.get("NOMAD_ADDR", "http://127.0.0.1:4646")
NOMAD_TOKEN = environ.get("NOMAD_TOKEN")
def nomad_req(
*path: str, params: dict[str, Any] | None = None, method="GET"
) -> list[dict[str, Any]] | dict[str, Any] | str:
headers = {}
if NOMAD_TOKEN:
headers["X-Nomad-Token"] = NOMAD_TOKEN
response = session.request(
method,
f"{NOMAD_ADDR}/v1/{'/'.join(path)}",
params=params,
headers=headers,
)
response.raise_for_status()
try:
return response.json()
except JSONDecodeError:
return response.text
def extract_job_services(job: dict[str, Any]) -> dict[str, str]:
services: dict[str, str] = dict()
for group in job["TaskGroups"]:
for service in group.get("Services") or []:
services[service["Name"]] = group["Name"]
for task in group["Tasks"]:
for service in task.get("Services") or []:
services[service["Name"]] = group["Name"]
return services
exit_code = 0
parser = ArgumentParser(
description="Checks for missing services and optionally restarts their allocs.",
)
parser.add_argument(
"-r", "--restart", action="store_true", help="Restart allocs for missing services"
)
args = parser.parse_args()
for job in nomad_req("jobs"):
job = cast(dict[str, Any], job)
if job["Type"] in ("batch", "sysbatch"):
continue
if job["Status"] != "running":
print(f"WARNING: job {job['Name']} is {job['Status']}")
continue
job_detail = nomad_req("job", job["ID"])
job_detail = cast(dict[str, Any], job_detail)
expected_services = extract_job_services(job_detail)
found_services: set[str] = set()
for service in nomad_req("job", job_detail["ID"], "services"):
service = cast(dict[str, Any], service)
found_services.add(service["ServiceName"])
missing_services = set(expected_services) - found_services
restart_groups: set[str] = set()
for missing_service in missing_services:
print(f"ERROR: Missing service {missing_service} for job {job_detail['Name']}")
# print(job)
exit_code = 1
# Add group associated with missing service to set
restart_groups.add(expected_services[missing_service])
if not restart_groups or not args.restart:
continue
# Get allocts for groups that are missing services
restart_allocs: set[str] = set()
for allocation in nomad_req("job", job_detail["ID"], "allocations"):
allocation = cast(dict[str, Any], allocation)
if (
allocation["ClientStatus"] == "running"
and allocation["TaskGroup"] in restart_groups
):
restart_allocs.add(allocation["ID"])
# Restart allocs associated with missing services
for allocation in restart_allocs:
print(f"INFO: Restarting allocation {allocation}")
nomad_req("client", "allocation", allocation, "restart")
exit(exit_code)