import logging import logging.config from peewee import JOIN, fn, IntegrityError from app import app from data.database import (UseThenDisconnect, TagManifest, TagManifestToManifest, Image, db_transaction) from data.model.image import get_parent_images from data.model.tag import populate_manifest from data.model.blob import get_repo_blob_by_digest, BlobDoesNotExist from image.docker.schema1 import (DockerSchema1Manifest, ManifestException, ManifestInterface, DOCKER_SCHEMA1_SIGNED_MANIFEST_CONTENT_TYPE) from workers.worker import Worker from util.log import logfile_path from util.migrate.allocator import yield_random_entries logger = logging.getLogger(__name__) WORKER_TIMEOUT = 600 class BrokenManifest(ManifestInterface): """ Implementation of the ManifestInterface for "broken" manifests. This allows us to add the new manifest row while not adding any additional rows for it. """ def __init__(self, digest, payload): self._digest = digest self._payload = payload @property def digest(self): return self._digest @property def media_type(self): return DOCKER_SCHEMA1_SIGNED_MANIFEST_CONTENT_TYPE @property def manifest_dict(self): return {} @property def bytes(self): return self._payload @property def layers(self): return [] @property def leaf_layer_v1_image_id(self): return None @property def blob_digests(self): return [] class ManifestBackfillWorker(Worker): def __init__(self): super(ManifestBackfillWorker, self).__init__() self.add_operation(self._backfill_manifests, WORKER_TIMEOUT) def _candidates_to_backfill(self): def missing_tmt_query(): return (TagManifest .select() .join(TagManifestToManifest, JOIN.LEFT_OUTER) .where(TagManifestToManifest.id >> None)) min_id = (TagManifest .select(fn.Min(TagManifest.id)) .join(TagManifestToManifest, JOIN.LEFT_OUTER) .where(TagManifestToManifest.id >> None) .scalar()) max_id = TagManifest.select(fn.Max(TagManifest.id)).scalar() iterator = yield_random_entries( missing_tmt_query, TagManifest.id, 100, max_id, min_id, ) return iterator def _backfill_manifests(self): """ Performs garbage collection on repositories. """ with UseThenDisconnect(app.config): iterator = self._candidates_to_backfill() if iterator is None: logger.debug('Found no additional images to scan') return None for candidate, abt, _ in iterator: if not backfill_manifest(candidate): logger.info('Another worker pre-empted us for manifest: %s', candidate.id) abt.set() def lookup_map_row(tag_manifest): try: TagManifestToManifest.get(tag_manifest=tag_manifest) return True except TagManifestToManifest.DoesNotExist: return False def backfill_manifest(tag_manifest): logger.info('Backfilling manifest %s', tag_manifest.id) # Ensure that a mapping row doesn't already exist. If it does, we've been preempted. if lookup_map_row(tag_manifest): return False # Parse the manifest. If we cannot parse, then we treat the manifest as broken and just emit it # without additional rows or data, as it will eventually not be useful. is_broken = False try: manifest = DockerSchema1Manifest.for_latin1_bytes(tag_manifest.json_data, validate=False) except ManifestException: logger.exception('Exception when trying to parse manifest %s', tag_manifest.id) manifest = BrokenManifest(tag_manifest.digest, tag_manifest.json_data) is_broken = True # Lookup the storages for the digests. root_image = tag_manifest.tag.image repository = tag_manifest.tag.repository image_storage_id_map = {root_image.storage.content_checksum: root_image.storage.id} parent_images = get_parent_images(repository.namespace_user.username, repository.name, root_image) for parent_image in parent_images: image_storage_id_map[parent_image.storage.content_checksum] = parent_image.storage.id # Ensure that all the expected blobs have been found. If not, we lookup the blob under the repo # and add its storage ID. If the blob is not found, we mark the manifest as broken. storage_ids = set() for blob_digest in manifest.blob_digests: if blob_digest in image_storage_id_map: storage_ids.add(image_storage_id_map[blob_digest]) else: logger.debug('Blob `%s` not found in images for manifest `%s`; checking repo', blob_digest, tag_manifest.id) try: blob_storage = get_repo_blob_by_digest(repository.namespace_user.username, repository.name, blob_digest) storage_ids.add(blob_storage.id) except BlobDoesNotExist: logger.debug('Blob `%s` not found in repo for manifest `%s`', blob_digest, tag_manifest.id) is_broken = True with db_transaction(): # Re-retrieve the tag manifest to ensure it still exists and we're pointing at the correct tag. try: tag_manifest = TagManifest.get(id=tag_manifest.id) except TagManifest.DoesNotExist: return True # Create the new-style rows for the manifest. manifest_row = populate_manifest(tag_manifest.tag.repository, manifest, tag_manifest.tag.image, storage_ids) # Create the mapping row. If we find another was created for this tag manifest in the # meantime, then we've been preempted. try: TagManifestToManifest.create(tag_manifest=tag_manifest, manifest=manifest_row, broken=is_broken) return True except IntegrityError: return False if __name__ == "__main__": logging.config.fileConfig(logfile_path(debug=False), disable_existing_loggers=False) worker = ManifestBackfillWorker() worker.start()