From 03ea3a32506cdac1c7c46de92c457704b215f39a Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Wed, 8 Aug 2018 17:51:10 -0400 Subject: [PATCH] Add worker to backfill the new manifest tables from the tagmanifest table --- data/model/tag.py | 21 ++- workers/manifestbackfillworker.py | 155 ++++++++++++++++++++ workers/test/test_manifestbackfillworker.py | 72 +++++++++ 3 files changed, 241 insertions(+), 7 deletions(-) create mode 100644 workers/manifestbackfillworker.py create mode 100644 workers/test/test_manifestbackfillworker.py diff --git a/data/model/tag.py b/data/model/tag.py index a9e9285aa..ef49cf51f 100644 --- a/data/model/tag.py +++ b/data/model/tag.py @@ -605,12 +605,21 @@ def associate_generated_tag_manifest(namespace, repo_name, tag_name, manifest, s def _create_manifest(tag, manifest, storage_id_map): - media_type = Manifest.media_type.get_id(manifest.media_type) + manifest_row = populate_manifest(tag.repository, manifest, tag.image, storage_id_map) with db_transaction(): - manifest_row = Manifest.create(digest=manifest.digest, repository=tag.repository, + tag_manifest = TagManifest.create(tag=tag, digest=manifest.digest, json_data=manifest.bytes) + TagManifestToManifest.create(tag_manifest=tag_manifest, manifest=manifest_row) + return tag_manifest + + +def populate_manifest(repository, manifest, legacy_image, storage_id_map): + """ Populates the rows for the manifest, including its blobs and legacy image. """ + media_type = Manifest.media_type.get_id(manifest.media_type) + with db_transaction(): + manifest_row = Manifest.create(digest=manifest.digest, repository=repository, manifest_bytes=manifest.bytes, media_type=media_type) - ManifestLegacyImage.create(manifest=manifest_row, repository=tag.repository, image=tag.image) + ManifestLegacyImage.create(manifest=manifest_row, repository=repository, image=legacy_image) blobs_to_insert = [] blobs_created = set() @@ -623,16 +632,14 @@ def _create_manifest(tag, manifest, storage_id_map): if image_storage_id in blobs_created: continue - blobs_to_insert.append(dict(manifest=manifest_row, repository=tag.repository, + blobs_to_insert.append(dict(manifest=manifest_row, repository=repository, blob=image_storage_id)) blobs_created.add(image_storage_id) if blobs_to_insert: ManifestBlob.insert_many(blobs_to_insert).execute() - tag_manifest = TagManifest.create(tag=tag, digest=manifest.digest, json_data=manifest.bytes) - TagManifestToManifest.create(tag_manifest=tag_manifest, manifest=manifest_row) - return tag_manifest + return manifest_row def load_tag_manifest(namespace, repo_name, tag_name): diff --git a/workers/manifestbackfillworker.py b/workers/manifestbackfillworker.py new file mode 100644 index 000000000..5183ae0f5 --- /dev/null +++ b/workers/manifestbackfillworker.py @@ -0,0 +1,155 @@ +import logging +import logging.config + +from peewee import JOIN, fn, IntegrityError + +from app import app +from data.database import (UseThenDisconnect, TagManifest, TagManifestToManifest, Image, + db_transaction) +from data.model.tag import populate_manifest +from image.docker.schema1 import (DockerSchema1Manifest, ManifestException, ManifestInterface, + DOCKER_SCHEMA1_SIGNED_MANIFEST_CONTENT_TYPE) +from workers.worker import Worker +from util.log import logfile_path +from util.migrate.allocator import yield_random_entries + +logger = logging.getLogger(__name__) + +class BrokenManifest(ManifestInterface): + """ Implementation of the ManifestInterface for "broken" manifests. This allows us to add the + new manifest row while not adding any additional rows for it. + """ + def __init__(self, digest, payload): + self._digest = digest + self._payload = payload + + @property + def digest(self): + return self._digest + + @property + def media_type(self): + return DOCKER_SCHEMA1_SIGNED_MANIFEST_CONTENT_TYPE + + @property + def manifest_dict(self): + return {} + + @property + def bytes(self): + return self._payload + + @property + def layers(self): + return [] + + @property + def leaf_layer_v1_image_id(self): + return None + + @property + def blob_digests(self): + return [] + + +class ManifestBackfillWorker(Worker): + def __init__(self): + super(ManifestBackfillWorker, self).__init__() + self.add_operation(self._backfill_manifests, 1) + + def _candidates_to_backfill(self): + def missing_tmt_query(): + return (TagManifest + .select() + .join(TagManifestToManifest, JOIN.LEFT_OUTER) + .where(TagManifestToManifest.id >> None)) + + min_id = (TagManifest + .select(fn.Min(TagManifest.id)) + .join(TagManifestToManifest, JOIN.LEFT_OUTER) + .where(TagManifestToManifest.id >> None) + .scalar()) + max_id = TagManifest.select(fn.Max(TagManifest.id)).scalar() + + iterator = yield_random_entries( + missing_tmt_query, + TagManifest.id, + 100, + max_id, + min_id, + ) + + return iterator + + def _backfill_manifests(self): + """ Performs garbage collection on repositories. """ + with UseThenDisconnect(app.config): + iterator = self._candidates_to_backfill() + if iterator is None: + logger.debug('Found no additional images to scan') + return None + + for candidate, abt, _ in iterator: + if not backfill_manifest(candidate): + logger.info('Another worker pre-empted us for manifest: %s', candidate.id) + abt.set() + + +def lookup_map_row(tag_manifest): + try: + TagManifestToManifest.get(tag_manifest=tag_manifest) + return True + except TagManifestToManifest.DoesNotExist: + return False + + +def backfill_manifest(tag_manifest): + logger.info('Backfilling manifest %s', tag_manifest.id) + + # Ensure that a mapping row doesn't already exist. If it does, we've been preempted. + if lookup_map_row(tag_manifest): + return False + + # Parse the manifest. If we cannot parse, then we treat the manifest as broken and just emit it + # without additional rows or data, as it will eventually not be useful. + is_broken = False + try: + manifest = DockerSchema1Manifest(tag_manifest.json_data, validate=False) + except ManifestException: + logger.exception('Exception when trying to parse manifest %s', tag_manifest.id) + manifest = BrokenManifest(tag_manifest.digest, tag_manifest.json_data) + is_broken = True + + # Lookup the storages for the digests. + root_image = tag_manifest.tag.image + storage_id_map = {root_image.storage.content_checksum: root_image.storage.id} + for parent_image_id in root_image.ancestor_id_list(): + storage = Image.get(id=parent_image_id).storage + storage_id_map[storage.content_checksum] = storage.id + + with db_transaction(): + # Re-retrieve the tag manifest to ensure it still exists and we're pointing at the correct tag. + try: + tag_manifest = TagManifest.get(id=tag_manifest.id) + except TagManifest.DoesNotExist: + return True + + # Create the new-style rows for the manifest. + manifest_row = populate_manifest(tag_manifest.tag.repository, manifest, + tag_manifest.tag.image, storage_id_map) + + # Create the mapping row. If we find another was created for this tag manifest in the + # meantime, then we've been preempted. + try: + TagManifestToManifest.create(tag_manifest=tag_manifest, manifest=manifest_row, + broken=is_broken) + return True + except IntegrityError: + return False + + + +if __name__ == "__main__": + logging.config.fileConfig(logfile_path(debug=False), disable_existing_loggers=False) + worker = ManifestBackfillWorker() + worker.start() diff --git a/workers/test/test_manifestbackfillworker.py b/workers/test/test_manifestbackfillworker.py new file mode 100644 index 000000000..cbc678af9 --- /dev/null +++ b/workers/test/test_manifestbackfillworker.py @@ -0,0 +1,72 @@ +from data.database import (TagManifestLabelMap, TagManifestToManifest, Manifest, ManifestBlob, + ManifestLegacyImage, ManifestLabel, TagManifest, RepositoryTag, Image, + TagManifestLabel) +from workers.manifestbackfillworker import backfill_manifest + +from test.fixtures import * + +@pytest.fixture() +def clear_rows(initialized_db): + # Remove all new-style rows so we can backfill. + TagManifestLabelMap.delete().execute() + ManifestLabel.delete().execute() + ManifestBlob.delete().execute() + ManifestLegacyImage.delete().execute() + TagManifestToManifest.delete().execute() + Manifest.delete().execute() + + +def test_manifestbackfillworker(clear_rows, initialized_db): + for tag_manifest in TagManifest.select(): + # Backfill the manifest. + assert backfill_manifest(tag_manifest) + + # Ensure if we try again, the backfill is skipped. + assert not backfill_manifest(tag_manifest) + + # Ensure that we now have the expected manifest rows. + map_row = TagManifestToManifest.get(tag_manifest=tag_manifest) + assert not map_row.broken + + manifest_row = map_row.manifest + assert manifest_row.manifest_bytes == tag_manifest.json_data + assert manifest_row.digest == tag_manifest.digest + assert manifest_row.repository == tag_manifest.tag.repository + + legacy_image = ManifestLegacyImage.get(manifest=manifest_row).image + assert tag_manifest.tag.image == legacy_image + + expected_storages = {tag_manifest.tag.image.storage.id} + for parent_image_id in tag_manifest.tag.image.ancestor_id_list(): + expected_storages.add(Image.get(id=parent_image_id).storage_id) + + found_storages = {manifest_blob.blob_id for manifest_blob + in ManifestBlob.select().where(ManifestBlob.manifest == manifest_row)} + assert expected_storages == found_storages + + +def test_manifestbackfillworker_broken_manifest(clear_rows, initialized_db): + # Delete existing tag manifest so we can reuse the tag. + TagManifestLabel.delete().execute() + TagManifest.delete().execute() + + # Add a broken manifest. + broken_manifest = TagManifest.create(json_data='wat?', digest='sha256:foobar', + tag=RepositoryTag.get()) + + # Ensure the backfill works. + assert backfill_manifest(broken_manifest) + + # Ensure the mapping is marked as broken. + map_row = TagManifestToManifest.get(tag_manifest=broken_manifest) + assert map_row.broken + + manifest_row = map_row.manifest + assert manifest_row.manifest_bytes == broken_manifest.json_data + assert manifest_row.digest == broken_manifest.digest + assert manifest_row.repository == broken_manifest.tag.repository + + legacy_image = ManifestLegacyImage.get(manifest=manifest_row).image + assert broken_manifest.tag.image == legacy_image + + assert not list(ManifestBlob.select().where(ManifestBlob.manifest == manifest_row))