diff --git a/conf/init/service/batch/tagbackfillworker/log/run b/conf/init/service/batch/tagbackfillworker/log/run new file mode 100755 index 000000000..1aaabc9b5 --- /dev/null +++ b/conf/init/service/batch/tagbackfillworker/log/run @@ -0,0 +1,4 @@ +#!/bin/sh + +# Start the logger +exec logger -i -t tagbackfillworker \ No newline at end of file diff --git a/conf/init/service/batch/tagbackfillworker/run b/conf/init/service/batch/tagbackfillworker/run new file mode 100755 index 000000000..0a5ad5663 --- /dev/null +++ b/conf/init/service/batch/tagbackfillworker/run @@ -0,0 +1,9 @@ +#! /bin/bash + +echo 'Starting tag backfill worker' + +QUAYPATH=${QUAYPATH:-"."} +cd ${QUAYDIR:-"/"} +PYTHONPATH=$QUAYPATH venv/bin/python -m workers.tagbackfillworker 2>&1 + +echo 'Repository tag backfill exited' diff --git a/data/registry_model/registry_pre_oci_model.py b/data/registry_model/registry_pre_oci_model.py index 04ba3a117..8f0aa377d 100644 --- a/data/registry_model/registry_pre_oci_model.py +++ b/data/registry_model/registry_pre_oci_model.py @@ -64,7 +64,7 @@ class PreOCIModel(SharedModel, RegistryDataInterface): if backfill_if_necessary: return self.backfill_manifest_for_tag(tag) - return + return None return Manifest.for_tag_manifest(tag_manifest) diff --git a/workers/tagbackfillworker.py b/workers/tagbackfillworker.py new file mode 100644 index 000000000..2155fd38e --- /dev/null +++ b/workers/tagbackfillworker.py @@ -0,0 +1,361 @@ +import logging +import logging.config + +import time + +from peewee import JOIN, fn, IntegrityError + +from app import app +from data.database import (UseThenDisconnect, TagToRepositoryTag, RepositoryTag, + TagManifestToManifest, Tag, TagManifest, TagManifestToManifest, Image, + Manifest, TagManifestLabel, ManifestLabel, TagManifestLabelMap, db_transaction) +from data.model import DataModelException +from data.model.image import get_parent_images +from data.model.tag import populate_manifest +from data.model.blob import get_repo_blob_by_digest, BlobDoesNotExist +from data.registry_model import pre_oci_model +from data.registry_model.datatypes import Tag as TagDataType +from image.docker.schema1 import (DockerSchema1Manifest, ManifestException, ManifestInterface, + DOCKER_SCHEMA1_SIGNED_MANIFEST_CONTENT_TYPE) + +from workers.worker import Worker +from util.log import logfile_path +from util.migrate.allocator import yield_random_entries + +logger = logging.getLogger(__name__) + +WORKER_TIMEOUT = 600 + + +class BrokenManifest(ManifestInterface): + """ Implementation of the ManifestInterface for "broken" manifests. This allows us to add the + new manifest row while not adding any additional rows for it. + """ + def __init__(self, digest, payload): + self._digest = digest + self._payload = payload + + @property + def digest(self): + return self._digest + + @property + def media_type(self): + return DOCKER_SCHEMA1_SIGNED_MANIFEST_CONTENT_TYPE + + @property + def manifest_dict(self): + return {} + + @property + def bytes(self): + return self._payload + + def get_layers(self, content_retriever): + return None + + def get_legacy_image_ids(self, cr): + return [] + + def get_leaf_layer_v1_image_id(self, cr): + return None + + @property + def blob_digests(self): + return [] + + @property + def local_blob_digests(self): + return [] + + def child_manifests(self, lookup_manifest_fn): + return None + + def get_manifest_labels(self, lookup_config_fn): + return {} + + def unsigned(self): + return self + + def generate_legacy_layers(self, images_map, lookup_config_fn): + return None + + def get_schema1_manifest(self, namespace_name, repo_name, tag_name, lookup_fn): + return self + + @property + def schema_version(self): + return 1 + + @property + def layers_compressed_size(self): + return None + + @property + def is_manifest_list(self): + return False + + @property + def has_legacy_image(self): + return False + + def get_requires_empty_layer_blob(self, content_retriever): + return False + + +class TagBackfillWorker(Worker): + def __init__(self): + super(TagBackfillWorker, self).__init__() + self.add_operation(self._backfill_tags, WORKER_TIMEOUT) + + def _candidates_to_backfill(self): + def missing_tmt_query(): + return (RepositoryTag + .select() + .join(TagToRepositoryTag, JOIN.LEFT_OUTER) + .where(TagToRepositoryTag.id >> None, RepositoryTag.hidden == False)) + + min_id = (RepositoryTag + .select(fn.Min(RepositoryTag.id)) + .join(TagToRepositoryTag, JOIN.LEFT_OUTER) + .where(TagToRepositoryTag.id >> None, RepositoryTag.hidden == False) + .scalar()) + max_id = RepositoryTag.select(fn.Max(RepositoryTag.id)).scalar() + + iterator = yield_random_entries( + missing_tmt_query, + RepositoryTag.id, + 1000, + max_id, + min_id, + ) + + return iterator + + def _backfill_tags(self): + with UseThenDisconnect(app.config): + iterator = self._candidates_to_backfill() + if iterator is None: + logger.debug('Found no additional tags to backfill') + time.sleep(10000) + return None + + for candidate, abt, _ in iterator: + if not backfill_tag(candidate): + logger.info('Another worker pre-empted us for label: %s', candidate.id) + abt.set() + + +def lookup_map_row(repositorytag): + try: + TagToRepositoryTag.get(repository_tag=repositorytag) + return True + except TagToRepositoryTag.DoesNotExist: + return False + + +def backfill_tag(repositorytag): + logger.info('Backfilling tag %s', repositorytag.id) + + # Ensure that a mapping row doesn't already exist. If it does, we've been preempted. + if lookup_map_row(repositorytag): + return False + + # Grab the manifest for the RepositoryTag, backfilling is necessary. + manifest_id = _get_manifest_id(repositorytag) + if manifest_id is None: + return False + + lifetime_start_ms = (repositorytag.lifetime_start_ts * 1000 + if repositorytag.lifetime_start_ts else None) + lifetime_end_ms = (repositorytag.lifetime_end_ts * 1000 + if repositorytag.lifetime_end_ts else None) + + # Create the new Tag. + with db_transaction(): + if lookup_map_row(repositorytag): + return False + + try: + created = Tag.create(name=repositorytag.name, + repository=repositorytag.repository, + lifetime_start_ms=lifetime_start_ms, + lifetime_end_ms=lifetime_end_ms, + reversion=repositorytag.reversion, + manifest=manifest_id, + tag_kind=Tag.tag_kind.get_id('tag')) + + TagToRepositoryTag.create(tag=created, repository_tag=repositorytag, + repository=repositorytag.repository) + except IntegrityError: + logger.exception('Could not create tag for repo tag `%s`', repositorytag.id) + return False + + logger.info('Backfilled tag %s', repositorytag.id) + return True + + +def lookup_manifest_map_row(tag_manifest): + try: + TagManifestToManifest.get(tag_manifest=tag_manifest) + return True + except TagManifestToManifest.DoesNotExist: + return False + + +def _get_manifest_id(repositorytag): + repository_tag_datatype = TagDataType.for_repository_tag(repositorytag) + + # Retrieve the TagManifest for the RepositoryTag, backfilling if necessary. + with db_transaction(): + manifest_datatype = pre_oci_model.get_manifest_for_tag(repository_tag_datatype, + backfill_if_necessary=True) + if manifest_datatype is None: + logger.error('Missing manifest for tag `%s`', repositorytag.id) + return None + + # Retrieve the new-style Manifest for the TagManifest, if any. + try: + tag_manifest = TagManifest.get(id=manifest_datatype._db_id) + except TagManifest.DoesNotExist: + logger.exception('Could not find tag manifest') + return None + + try: + return TagManifestToManifest.get(tag_manifest=tag_manifest).manifest_id + except TagManifestToManifest.DoesNotExist: + # Could not find the new style manifest, so backfill. + _backfill_manifest(tag_manifest) + + # Try to retrieve the manifest again, since we've performed a backfill. + try: + return TagManifestToManifest.get(tag_manifest=tag_manifest).manifest_id + except TagManifestToManifest.DoesNotExist: + return None + + +def _backfill_manifest(tag_manifest): + logger.info('Backfilling manifest for tag manifest %s', tag_manifest.id) + + # Ensure that a mapping row doesn't already exist. If it does, we've been preempted. + if lookup_manifest_map_row(tag_manifest): + return False + + # Parse the manifest. If we cannot parse, then we treat the manifest as broken and just emit it + # without additional rows or data, as it will eventually not be useful. + is_broken = False + try: + manifest = DockerSchema1Manifest.for_latin1_bytes(tag_manifest.json_data, validate=False) + except ManifestException: + logger.exception('Exception when trying to parse manifest %s', tag_manifest.id) + manifest = BrokenManifest(tag_manifest.digest, tag_manifest.json_data) + is_broken = True + + # Lookup the storages for the digests. + root_image = tag_manifest.tag.image + repository = tag_manifest.tag.repository + + image_storage_id_map = {root_image.storage.content_checksum: root_image.storage.id} + + try: + parent_images = get_parent_images(repository.namespace_user.username, repository.name, + root_image) + except DataModelException: + logger.exception('Exception when trying to load parent images for manifest `%s`', + tag_manifest.id) + parent_images = {} + is_broken = True + + for parent_image in parent_images: + image_storage_id_map[parent_image.storage.content_checksum] = parent_image.storage.id + + # Ensure that all the expected blobs have been found. If not, we lookup the blob under the repo + # and add its storage ID. If the blob is not found, we mark the manifest as broken. + storage_ids = set() + for blob_digest in manifest.blob_digests: + if blob_digest in image_storage_id_map: + storage_ids.add(image_storage_id_map[blob_digest]) + else: + logger.debug('Blob `%s` not found in images for manifest `%s`; checking repo', + blob_digest, tag_manifest.id) + try: + blob_storage = get_repo_blob_by_digest(repository.namespace_user.username, repository.name, + blob_digest) + storage_ids.add(blob_storage.id) + except BlobDoesNotExist: + logger.debug('Blob `%s` not found in repo for manifest `%s`', + blob_digest, tag_manifest.id) + is_broken = True + + with db_transaction(): + # Re-retrieve the tag manifest to ensure it still exists and we're pointing at the correct tag. + try: + tag_manifest = TagManifest.get(id=tag_manifest.id) + except TagManifest.DoesNotExist: + return True + + # Ensure it wasn't already created. + if lookup_manifest_map_row(tag_manifest): + return False + + # Check for a pre-existing manifest matching the digest in the repository. This can happen + # if we've already created the manifest row (typically for tag reverision). + try: + manifest_row = Manifest.get(digest=manifest.digest, repository=tag_manifest.tag.repository) + except Manifest.DoesNotExist: + # Create the new-style rows for the manifest. + try: + manifest_row = populate_manifest(tag_manifest.tag.repository, manifest, + tag_manifest.tag.image, storage_ids) + except IntegrityError: + # Pre-empted. + return False + + # Create the mapping row. If we find another was created for this tag manifest in the + # meantime, then we've been preempted. + try: + TagManifestToManifest.create(tag_manifest=tag_manifest, manifest=manifest_row, + broken=is_broken) + except IntegrityError: + return False + + # Backfill any labels on the manifest. + _backfill_labels(tag_manifest, manifest_row, repository) + return True + + +def _backfill_labels(tag_manifest, manifest, repository): + tmls = list(TagManifestLabel.select().where(TagManifestLabel.annotated == tag_manifest)) + if not tmls: + return + + for tag_manifest_label in tmls: + label = tag_manifest_label.label + try: + TagManifestLabelMap.get(tag_manifest_label=tag_manifest_label) + continue + except TagManifestLabelMap.DoesNotExist: + pass + + try: + manifest_label = ManifestLabel.create(manifest=manifest, label=label, + repository=repository) + TagManifestLabelMap.create(manifest_label=manifest_label, + tag_manifest_label=tag_manifest_label, + label=label, + manifest=manifest, + tag_manifest=tag_manifest_label.annotated) + except IntegrityError: + continue + + +if __name__ == "__main__": + logging.config.fileConfig(logfile_path(debug=False), disable_existing_loggers=False) + + if not app.config.get('BACKFILL_TAGS', False): + logger.debug('Tag backfill disabled; skipping') + while True: + time.sleep(100000) + + worker = TagBackfillWorker() + worker.start() diff --git a/workers/test/test_tagbackfillworker.py b/workers/test/test_tagbackfillworker.py new file mode 100644 index 000000000..0dd2d2e70 --- /dev/null +++ b/workers/test/test_tagbackfillworker.py @@ -0,0 +1,223 @@ +from app import docker_v2_signing_key +from data import model +from data.database import (TagManifestLabelMap, TagManifestToManifest, Manifest, ManifestBlob, + ManifestLegacyImage, ManifestLabel, TagManifest, RepositoryTag, Image, + TagManifestLabel, Tag, TagToRepositoryTag) +from image.docker.schema1 import DockerSchema1ManifestBuilder +from workers.tagbackfillworker import backfill_tag, _backfill_manifest + +from test.fixtures import * + + +@pytest.fixture() +def clear_rows(initialized_db): + # Remove all new-style rows so we can backfill. + TagToRepositoryTag.delete().execute() + Tag.delete().execute() + TagManifestLabelMap.delete().execute() + ManifestLabel.delete().execute() + ManifestBlob.delete().execute() + ManifestLegacyImage.delete().execute() + TagManifestToManifest.delete().execute() + Manifest.delete().execute() + + +@pytest.mark.parametrize('clear_all_rows', [ + True, + False, +]) +def test_tagbackfillworker(clear_all_rows, initialized_db): + # Remove the new-style rows so we can backfill. + TagToRepositoryTag.delete().execute() + Tag.delete().execute() + + if clear_all_rows: + TagManifestLabelMap.delete().execute() + ManifestLabel.delete().execute() + ManifestBlob.delete().execute() + ManifestLegacyImage.delete().execute() + TagManifestToManifest.delete().execute() + Manifest.delete().execute() + + for repository_tag in list(RepositoryTag.select()): + # Backfill the tag. + assert backfill_tag(repository_tag) + + # Ensure if we try again, the backfill is skipped. + assert not backfill_tag(repository_tag) + + # Ensure that we now have the expected tag rows. + tag_to_repo_tag = TagToRepositoryTag.get(repository_tag=repository_tag) + tag = tag_to_repo_tag.tag + assert tag.name == repository_tag.name + assert tag.repository == repository_tag.repository + assert not tag.hidden + assert tag.reversion == repository_tag.reversion + + if repository_tag.lifetime_start_ts is None: + assert tag.lifetime_start_ms is None + else: + assert tag.lifetime_start_ms == (repository_tag.lifetime_start_ts * 1000) + + if repository_tag.lifetime_end_ts is None: + assert tag.lifetime_end_ms is None + else: + assert tag.lifetime_end_ms == (repository_tag.lifetime_end_ts * 1000) + + assert tag.manifest + + # Ensure that we now have the expected manifest rows. + try: + tag_manifest = TagManifest.get(tag=repository_tag) + except TagManifest.DoesNotExist: + continue + + map_row = TagManifestToManifest.get(tag_manifest=tag_manifest) + assert not map_row.broken + + manifest_row = map_row.manifest + assert manifest_row.manifest_bytes == tag_manifest.json_data + assert manifest_row.digest == tag_manifest.digest + assert manifest_row.repository == tag_manifest.tag.repository + + assert tag.manifest == map_row.manifest + + legacy_image = ManifestLegacyImage.get(manifest=manifest_row).image + assert tag_manifest.tag.image == legacy_image + + expected_storages = {tag_manifest.tag.image.storage.id} + for parent_image_id in tag_manifest.tag.image.ancestor_id_list(): + expected_storages.add(Image.get(id=parent_image_id).storage_id) + + found_storages = {manifest_blob.blob_id for manifest_blob + in ManifestBlob.select().where(ManifestBlob.manifest == manifest_row)} + assert expected_storages == found_storages + + # Ensure the labels were copied over. + tmls = list(TagManifestLabel.select().where(TagManifestLabel.annotated == tag_manifest)) + expected_labels = {tml.label_id for tml in tmls} + found_labels = {m.label_id for m + in ManifestLabel.select().where(ManifestLabel.manifest == manifest_row)} + assert found_labels == expected_labels + + +def test_manifestbackfillworker_broken_manifest(clear_rows, initialized_db): + # Delete existing tag manifest so we can reuse the tag. + TagManifestLabel.delete().execute() + TagManifest.delete().execute() + + # Add a broken manifest. + broken_manifest = TagManifest.create(json_data='wat?', digest='sha256:foobar', + tag=RepositoryTag.get()) + + # Ensure the backfill works. + assert _backfill_manifest(broken_manifest) + + # Ensure the mapping is marked as broken. + map_row = TagManifestToManifest.get(tag_manifest=broken_manifest) + assert map_row.broken + + manifest_row = map_row.manifest + assert manifest_row.manifest_bytes == broken_manifest.json_data + assert manifest_row.digest == broken_manifest.digest + assert manifest_row.repository == broken_manifest.tag.repository + + legacy_image = ManifestLegacyImage.get(manifest=manifest_row).image + assert broken_manifest.tag.image == legacy_image + + +def test_manifestbackfillworker_mislinked_manifest(clear_rows, initialized_db): + """ Tests that a manifest whose image is mislinked will have its storages relinked properly. """ + # Delete existing tag manifest so we can reuse the tag. + TagManifestLabel.delete().execute() + TagManifest.delete().execute() + + repo = model.repository.get_repository('devtable', 'complex') + tag_v30 = model.tag.get_active_tag('devtable', 'gargantuan', 'v3.0') + tag_v50 = model.tag.get_active_tag('devtable', 'gargantuan', 'v5.0') + + # Add a mislinked manifest, by having its layer point to a blob in v3.0 but its image + # be the v5.0 image. + builder = DockerSchema1ManifestBuilder('devtable', 'gargantuan', 'sometag') + builder.add_layer(tag_v30.image.storage.content_checksum, '{"id": "foo"}') + manifest = builder.build(docker_v2_signing_key) + + mislinked_manifest = TagManifest.create(json_data=manifest.bytes, digest=manifest.digest, + tag=tag_v50) + + # Backfill the manifest and ensure its proper content checksum was linked. + assert _backfill_manifest(mislinked_manifest) + + map_row = TagManifestToManifest.get(tag_manifest=mislinked_manifest) + assert not map_row.broken + + manifest_row = map_row.manifest + legacy_image = ManifestLegacyImage.get(manifest=manifest_row).image + assert legacy_image == tag_v50.image + + manifest_blobs = list(ManifestBlob.select().where(ManifestBlob.manifest == manifest_row)) + assert len(manifest_blobs) == 1 + assert manifest_blobs[0].blob.content_checksum == tag_v30.image.storage.content_checksum + + +def test_manifestbackfillworker_mislinked_invalid_manifest(clear_rows, initialized_db): + """ Tests that a manifest whose image is mislinked will attempt to have its storages relinked + properly. """ + # Delete existing tag manifest so we can reuse the tag. + TagManifestLabel.delete().execute() + TagManifest.delete().execute() + + repo = model.repository.get_repository('devtable', 'complex') + tag_v50 = model.tag.get_active_tag('devtable', 'gargantuan', 'v5.0') + + # Add a mislinked manifest, by having its layer point to an invalid blob but its image + # be the v5.0 image. + builder = DockerSchema1ManifestBuilder('devtable', 'gargantuan', 'sometag') + builder.add_layer('sha256:deadbeef', '{"id": "foo"}') + manifest = builder.build(docker_v2_signing_key) + + broken_manifest = TagManifest.create(json_data=manifest.bytes, digest=manifest.digest, + tag=tag_v50) + + # Backfill the manifest and ensure it is marked as broken. + assert _backfill_manifest(broken_manifest) + + map_row = TagManifestToManifest.get(tag_manifest=broken_manifest) + assert map_row.broken + + manifest_row = map_row.manifest + legacy_image = ManifestLegacyImage.get(manifest=manifest_row).image + assert legacy_image == tag_v50.image + + manifest_blobs = list(ManifestBlob.select().where(ManifestBlob.manifest == manifest_row)) + assert len(manifest_blobs) == 0 + + +def test_manifestbackfillworker_repeat_digest(clear_rows, initialized_db): + """ Tests that a manifest with a shared digest will be properly linked. """ + # Delete existing tag manifest so we can reuse the tag. + TagManifestLabel.delete().execute() + TagManifest.delete().execute() + + repo = model.repository.get_repository('devtable', 'gargantuan') + tag_v30 = model.tag.get_active_tag('devtable', 'gargantuan', 'v3.0') + tag_v50 = model.tag.get_active_tag('devtable', 'gargantuan', 'v5.0') + + # Build a manifest and assign it to both tags (this is allowed in the old model). + builder = DockerSchema1ManifestBuilder('devtable', 'gargantuan', 'sometag') + builder.add_layer('sha256:deadbeef', '{"id": "foo"}') + manifest = builder.build(docker_v2_signing_key) + + manifest_1 = TagManifest.create(json_data=manifest.bytes, digest=manifest.digest, + tag=tag_v30) + manifest_2 = TagManifest.create(json_data=manifest.bytes, digest=manifest.digest, + tag=tag_v50) + + # Backfill "both" manifests and ensure both are pointed to by a single resulting row. + assert _backfill_manifest(manifest_1) + assert _backfill_manifest(manifest_2) + + map_row1 = TagManifestToManifest.get(tag_manifest=manifest_1) + map_row2 = TagManifestToManifest.get(tag_manifest=manifest_2) + + assert map_row1.manifest == map_row2.manifest