""" Checks to see if newer tagged versions of running images exist When a newer tag based on semver is found, the tags will be printed and the script will exit with a non-zero code. """ from dataclasses import dataclass from typing import Generator from typing import List import docker # type: ignore import requests class NotComparableException(ValueError): pass @dataclass class ImageTag(object): image_tag: str image: str full_tag: str version: str tag_desc: str version_parts: List[int] @classmethod def from_str(cls, image_tag): image, _, full_tag = image_tag.partition(":") version, _, tag_desc = full_tag.partition("-") # Remove leading v if version[0].lower() == "v": version = version[1:] try: version_parts = [int(p) for p in version.split(".")] except ValueError: version_parts = [] return ImageTag( image_tag=image_tag, image=image, full_tag=full_tag, version=version, tag_desc=tag_desc, version_parts=version_parts, ) def is_same_image(self, other): return self.image == other.image def is_same_type(self, other): return self.tag_desc == other.tag_desc def is_same_grain(self, other): return len(self.version_parts) == len(other.version_parts) def is_comparable(self, other): return all( ( self.is_same_image(other), self.is_same_type(other), self.is_same_grain(other), ) ) def __eq__(self, other): if not self.is_comparable(other): raise NotComparableException() return self.version_parts == other.version_parts def __lt__(self, other): if not self.is_comparable(other): raise NotComparableException() for s, o in zip(self.version_parts, other.version_parts): if s < o: return True elif s > o: return False return False def is_newer_than(self, other): return self.is_comparable(other) and self > other def get_all_tags(image_name: str) -> Generator[ImageTag, None, None]: """Generates all tags for a given image""" if "/" not in image_name: image_name = f"library/{image_name}" url = "https://registry.hub.docker.com/v2/repositories/{}/tags".format( image_name, ) page_count = 0 max_pages = 1000 while url and page_count <= max_pages: data = requests.get(url).json() for tag in data["results"]: try: yield ImageTag.from_str(f"{image_name}:{tag['name']}") except ValueError: pass url = data["next"] page_count += 1 def generate_message(current: ImageTag, newer_tags: List[ImageTag]) -> str: if not current.version_parts: return f"[{current.image_tag}] No numeric version recognized" if not newer_tags: return f"[{current.image_tag}] No newer tags found for image tag" else: newer_tags = list(reversed(sorted(newer_tags))) tags_list = ", ".join(tag.version for tag in newer_tags) latest_tag = newer_tags[0].image_tag return f"[{current.image_tag}] New versions found {tags_list}. Recommended update to {latest_tag}" def run() -> int: client = docker.from_env() running_images = {container.image.tags[0] for container in client.containers.list()} has_update = False for image_name in running_images: current = ImageTag.from_str(image_name) newer_tags: List[ImageTag] = [] if current.version_parts: newer_tags = [ tag for tag in get_all_tags(current.image) if tag.is_newer_than(current) ] has_update |= bool(newer_tags) print(generate_message(current, newer_tags)) if has_update: return 1 return 0 if __name__ == "__main__": exit(run())