From 6963107ec67d00d1096ba9907c9cf430c7eb9f39 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Mon, 10 Dec 2018 15:37:26 -0500 Subject: [PATCH] Fix after merge --- workers/manifestbackfillworker.py | 252 ------------------------------ 1 file changed, 252 deletions(-) delete mode 100644 workers/manifestbackfillworker.py diff --git a/workers/manifestbackfillworker.py b/workers/manifestbackfillworker.py deleted file mode 100644 index d64e10c3b..000000000 --- a/workers/manifestbackfillworker.py +++ /dev/null @@ -1,252 +0,0 @@ -import logging -import logging.config -import time - -import time - -from peewee import JOIN, fn, IntegrityError - -from app import app -from data.database import (UseThenDisconnect, TagManifest, TagManifestToManifest, Image, - Manifest, db_transaction) -from data.model import DataModelException -from data.model.image import get_parent_images -from data.model.tag import populate_manifest -from data.model.blob import get_repo_blob_by_digest, BlobDoesNotExist -from image.docker.schema1 import (DockerSchema1Manifest, ManifestException, ManifestInterface, - DOCKER_SCHEMA1_SIGNED_MANIFEST_CONTENT_TYPE) -from workers.worker import Worker -from util.log import logfile_path -from util.migrate.allocator import yield_random_entries - -logger = logging.getLogger(__name__) - -WORKER_TIMEOUT = 600 - -class BrokenManifest(ManifestInterface): - """ Implementation of the ManifestInterface for "broken" manifests. This allows us to add the - new manifest row while not adding any additional rows for it. - """ - def __init__(self, digest, payload): - self._digest = digest - self._payload = payload - - @property - def digest(self): - return self._digest - - @property - def media_type(self): - return DOCKER_SCHEMA1_SIGNED_MANIFEST_CONTENT_TYPE - - @property - def manifest_dict(self): - return {} - - @property - def bytes(self): - return self._payload - - def get_layers(self, content_retriever): - return None - - def get_legacy_image_ids(self, cr): - return [] - - def get_leaf_layer_v1_image_id(self, cr): - return None - - @property - def blob_digests(self): - return [] - - @property - def local_blob_digests(self): - return [] - - def child_manifests(self, lookup_manifest_fn): - return None - - def get_manifest_labels(self, lookup_config_fn): - return {} - - def unsigned(self): - return self - - def generate_legacy_layers(self, images_map, lookup_config_fn): - return None - - def get_schema1_manifest(self, namespace_name, repo_name, tag_name, lookup_fn): - return self - - @property - def schema_version(self): - return 1 - - @property - def layers_compressed_size(self): - return None - - @property - def is_manifest_list(self): - return False - - @property - def has_legacy_image(self): - return False - - def get_requires_empty_layer_blob(self, content_retriever): - return False - - def convert_manifest(self, media_types, namespace_name, repo_name, tag_name, lookup_fn): - return None - - -class ManifestBackfillWorker(Worker): - def __init__(self): - super(ManifestBackfillWorker, self).__init__() - self.add_operation(self._backfill_manifests, WORKER_TIMEOUT) - - def _candidates_to_backfill(self): - def missing_tmt_query(): - return (TagManifest - .select() - .join(TagManifestToManifest, JOIN.LEFT_OUTER) - .where(TagManifestToManifest.id >> None)) - - min_id = (TagManifest - .select(fn.Min(TagManifest.id)) - .join(TagManifestToManifest, JOIN.LEFT_OUTER) - .where(TagManifestToManifest.id >> None) - .scalar()) - max_id = TagManifest.select(fn.Max(TagManifest.id)).scalar() - - iterator = yield_random_entries( - missing_tmt_query, - TagManifest.id, - 100, - max_id, - min_id, - ) - - return iterator - - def _backfill_manifests(self): - with UseThenDisconnect(app.config): - iterator = self._candidates_to_backfill() - if iterator is None: - logger.debug('Found no additional manifest to backfill') - time.sleep(10000) - return None - - for candidate, abt, _ in iterator: - if not backfill_manifest(candidate): - logger.info('Another worker pre-empted us for manifest: %s', candidate.id) - abt.set() - - -def lookup_map_row(tag_manifest): - try: - TagManifestToManifest.get(tag_manifest=tag_manifest) - return True - except TagManifestToManifest.DoesNotExist: - return False - - -def backfill_manifest(tag_manifest): - logger.info('Backfilling manifest %s', tag_manifest.id) - - # Ensure that a mapping row doesn't already exist. If it does, we've been preempted. - if lookup_map_row(tag_manifest): - return False - - # Parse the manifest. If we cannot parse, then we treat the manifest as broken and just emit it - # without additional rows or data, as it will eventually not be useful. - is_broken = False - try: - manifest = DockerSchema1Manifest.for_latin1_bytes(tag_manifest.json_data, validate=False) - except ManifestException: - logger.exception('Exception when trying to parse manifest %s', tag_manifest.id) - manifest = BrokenManifest(tag_manifest.digest, tag_manifest.json_data) - is_broken = True - - # Lookup the storages for the digests. - root_image = tag_manifest.tag.image - repository = tag_manifest.tag.repository - - image_storage_id_map = {root_image.storage.content_checksum: root_image.storage.id} - - try: - parent_images = get_parent_images(repository.namespace_user.username, repository.name, - root_image) - except DataModelException: - logger.exception('Exception when trying to load parent images for manifest `%s`', - tag_manifest.id) - parent_images = {} - is_broken = True - - for parent_image in parent_images: - image_storage_id_map[parent_image.storage.content_checksum] = parent_image.storage.id - - # Ensure that all the expected blobs have been found. If not, we lookup the blob under the repo - # and add its storage ID. If the blob is not found, we mark the manifest as broken. - storage_ids = set() - for blob_digest in manifest.blob_digests: - if blob_digest in image_storage_id_map: - storage_ids.add(image_storage_id_map[blob_digest]) - else: - logger.debug('Blob `%s` not found in images for manifest `%s`; checking repo', - blob_digest, tag_manifest.id) - try: - blob_storage = get_repo_blob_by_digest(repository.namespace_user.username, repository.name, - blob_digest) - storage_ids.add(blob_storage.id) - except BlobDoesNotExist: - logger.debug('Blob `%s` not found in repo for manifest `%s`', - blob_digest, tag_manifest.id) - is_broken = True - - with db_transaction(): - # Re-retrieve the tag manifest to ensure it still exists and we're pointing at the correct tag. - try: - tag_manifest = TagManifest.get(id=tag_manifest.id) - except TagManifest.DoesNotExist: - return True - - # Ensure it wasn't already created. - if lookup_map_row(tag_manifest): - return False - - # Check for a pre-existing manifest matching the digest in the repository. This can happen - # if we've already created the manifest row (typically for tag reverision). - try: - manifest_row = Manifest.get(digest=manifest.digest, repository=tag_manifest.tag.repository) - except Manifest.DoesNotExist: - # Create the new-style rows for the manifest. - try: - manifest_row = populate_manifest(tag_manifest.tag.repository, manifest, - tag_manifest.tag.image, storage_ids) - except IntegrityError: - # Pre-empted. - return False - - # Create the mapping row. If we find another was created for this tag manifest in the - # meantime, then we've been preempted. - try: - TagManifestToManifest.create(tag_manifest=tag_manifest, manifest=manifest_row, - broken=is_broken) - return True - except IntegrityError: - return False - - -if __name__ == "__main__": - logging.config.fileConfig(logfile_path(debug=False), disable_existing_loggers=False) - - if not app.config.get('BACKFILL_TAG_MANIFESTS', False): - logger.debug('Manifest backfill disabled; skipping') - while True: - time.sleep(100000) - - worker = ManifestBackfillWorker() - worker.start()