diff --git a/conf/init/service/batch/manifestbackfillworker/log/run b/conf/init/service/batch/manifestbackfillworker/log/run deleted file mode 100755 index de0b8e251..000000000 --- a/conf/init/service/batch/manifestbackfillworker/log/run +++ /dev/null @@ -1,4 +0,0 @@ -#!/bin/sh - -# Start the logger -exec logger -i -t manifestbackfillworker \ No newline at end of file diff --git a/conf/init/service/batch/manifestbackfillworker/run b/conf/init/service/batch/manifestbackfillworker/run deleted file mode 100755 index 4dc77e7f6..000000000 --- a/conf/init/service/batch/manifestbackfillworker/run +++ /dev/null @@ -1,9 +0,0 @@ -#! /bin/bash - -echo 'Starting manifest backfill worker' - -QUAYPATH=${QUAYPATH:-"."} -cd ${QUAYDIR:-"/"} -PYTHONPATH=$QUAYPATH venv/bin/python -m workers.manifestbackfillworker 2>&1 - -echo 'Repository manifest backfill exited' diff --git a/conf/init/service/batch/tagbackfillworker/log/run b/conf/init/service/batch/tagbackfillworker/log/run new file mode 100755 index 000000000..1aaabc9b5 --- /dev/null +++ b/conf/init/service/batch/tagbackfillworker/log/run @@ -0,0 +1,4 @@ +#!/bin/sh + +# Start the logger +exec logger -i -t tagbackfillworker \ No newline at end of file diff --git a/conf/init/service/batch/tagbackfillworker/run b/conf/init/service/batch/tagbackfillworker/run new file mode 100755 index 000000000..0a5ad5663 --- /dev/null +++ b/conf/init/service/batch/tagbackfillworker/run @@ -0,0 +1,9 @@ +#! /bin/bash + +echo 'Starting tag backfill worker' + +QUAYPATH=${QUAYPATH:-"."} +cd ${QUAYDIR:-"/"} +PYTHONPATH=$QUAYPATH venv/bin/python -m workers.tagbackfillworker 2>&1 + +echo 'Repository tag backfill exited' diff --git a/data/registry_model/registry_pre_oci_model.py b/data/registry_model/registry_pre_oci_model.py index 04ba3a117..8f0aa377d 100644 --- a/data/registry_model/registry_pre_oci_model.py +++ b/data/registry_model/registry_pre_oci_model.py @@ -64,7 +64,7 @@ class PreOCIModel(SharedModel, RegistryDataInterface): if backfill_if_necessary: return self.backfill_manifest_for_tag(tag) - return + return None return Manifest.for_tag_manifest(tag_manifest) diff --git a/workers/manifestbackfillworker.py b/workers/tagbackfillworker.py similarity index 53% rename from workers/manifestbackfillworker.py rename to workers/tagbackfillworker.py index d64e10c3b..ed3bceb23 100644 --- a/workers/manifestbackfillworker.py +++ b/workers/tagbackfillworker.py @@ -1,20 +1,23 @@ import logging import logging.config -import time import time from peewee import JOIN, fn, IntegrityError from app import app -from data.database import (UseThenDisconnect, TagManifest, TagManifestToManifest, Image, - Manifest, db_transaction) +from data.database import (UseThenDisconnect, TagToRepositoryTag, RepositoryTag, + TagManifestToManifest, Tag, TagManifest, TagManifestToManifest, Image, + Manifest, TagManifestLabel, ManifestLabel, TagManifestLabelMap, db_transaction) from data.model import DataModelException from data.model.image import get_parent_images from data.model.tag import populate_manifest from data.model.blob import get_repo_blob_by_digest, BlobDoesNotExist +from data.registry_model import pre_oci_model +from data.registry_model.datatypes import Tag as TagDataType from image.docker.schema1 import (DockerSchema1Manifest, ManifestException, ManifestInterface, DOCKER_SCHEMA1_SIGNED_MANIFEST_CONTENT_TYPE) + from workers.worker import Worker from util.log import logfile_path from util.migrate.allocator import yield_random_entries @@ -23,6 +26,7 @@ logger = logging.getLogger(__name__) WORKER_TIMEOUT = 600 + class BrokenManifest(ManifestInterface): """ Implementation of the ManifestInterface for "broken" manifests. This allows us to add the new manifest row while not adding any additional rows for it. @@ -59,7 +63,7 @@ class BrokenManifest(ManifestInterface): @property def blob_digests(self): return [] - + @property def local_blob_digests(self): return [] @@ -98,54 +102,104 @@ class BrokenManifest(ManifestInterface): def get_requires_empty_layer_blob(self, content_retriever): return False - def convert_manifest(self, media_types, namespace_name, repo_name, tag_name, lookup_fn): + def convert_manifest(self, allowed_mediatypes, namespace_name, repo_name, tag_name, + content_retriever): return None -class ManifestBackfillWorker(Worker): +class TagBackfillWorker(Worker): def __init__(self): - super(ManifestBackfillWorker, self).__init__() - self.add_operation(self._backfill_manifests, WORKER_TIMEOUT) + super(TagBackfillWorker, self).__init__() + self.add_operation(self._backfill_tags, WORKER_TIMEOUT) def _candidates_to_backfill(self): def missing_tmt_query(): - return (TagManifest + return (RepositoryTag .select() - .join(TagManifestToManifest, JOIN.LEFT_OUTER) - .where(TagManifestToManifest.id >> None)) + .join(TagToRepositoryTag, JOIN.LEFT_OUTER) + .where(TagToRepositoryTag.id >> None, RepositoryTag.hidden == False)) - min_id = (TagManifest - .select(fn.Min(TagManifest.id)) - .join(TagManifestToManifest, JOIN.LEFT_OUTER) - .where(TagManifestToManifest.id >> None) + min_id = (RepositoryTag + .select(fn.Min(RepositoryTag.id)) + .join(TagToRepositoryTag, JOIN.LEFT_OUTER) + .where(TagToRepositoryTag.id >> None, RepositoryTag.hidden == False) .scalar()) - max_id = TagManifest.select(fn.Max(TagManifest.id)).scalar() + max_id = RepositoryTag.select(fn.Max(RepositoryTag.id)).scalar() iterator = yield_random_entries( missing_tmt_query, - TagManifest.id, - 100, + RepositoryTag.id, + 1000, max_id, min_id, ) return iterator - def _backfill_manifests(self): + def _backfill_tags(self): with UseThenDisconnect(app.config): iterator = self._candidates_to_backfill() if iterator is None: - logger.debug('Found no additional manifest to backfill') + logger.debug('Found no additional tags to backfill') time.sleep(10000) return None for candidate, abt, _ in iterator: - if not backfill_manifest(candidate): - logger.info('Another worker pre-empted us for manifest: %s', candidate.id) + if not backfill_tag(candidate): + logger.info('Another worker pre-empted us for label: %s', candidate.id) abt.set() -def lookup_map_row(tag_manifest): +def lookup_map_row(repositorytag): + try: + TagToRepositoryTag.get(repository_tag=repositorytag) + return True + except TagToRepositoryTag.DoesNotExist: + return False + + +def backfill_tag(repositorytag): + logger.info('Backfilling tag %s', repositorytag.id) + + # Ensure that a mapping row doesn't already exist. If it does, we've been preempted. + if lookup_map_row(repositorytag): + return False + + # Grab the manifest for the RepositoryTag, backfilling is necessary. + manifest_id = _get_manifest_id(repositorytag) + if manifest_id is None: + return False + + lifetime_start_ms = (repositorytag.lifetime_start_ts * 1000 + if repositorytag.lifetime_start_ts else None) + lifetime_end_ms = (repositorytag.lifetime_end_ts * 1000 + if repositorytag.lifetime_end_ts else None) + + # Create the new Tag. + with db_transaction(): + if lookup_map_row(repositorytag): + return False + + try: + created = Tag.create(name=repositorytag.name, + repository=repositorytag.repository, + lifetime_start_ms=lifetime_start_ms, + lifetime_end_ms=lifetime_end_ms, + reversion=repositorytag.reversion, + manifest=manifest_id, + tag_kind=Tag.tag_kind.get_id('tag')) + + TagToRepositoryTag.create(tag=created, repository_tag=repositorytag, + repository=repositorytag.repository) + except IntegrityError: + logger.exception('Could not create tag for repo tag `%s`', repositorytag.id) + return False + + logger.info('Backfilled tag %s', repositorytag.id) + return True + + +def lookup_manifest_map_row(tag_manifest): try: TagManifestToManifest.get(tag_manifest=tag_manifest) return True @@ -153,11 +207,42 @@ def lookup_map_row(tag_manifest): return False -def backfill_manifest(tag_manifest): - logger.info('Backfilling manifest %s', tag_manifest.id) +def _get_manifest_id(repositorytag): + repository_tag_datatype = TagDataType.for_repository_tag(repositorytag) + + # Retrieve the TagManifest for the RepositoryTag, backfilling if necessary. + with db_transaction(): + manifest_datatype = pre_oci_model.get_manifest_for_tag(repository_tag_datatype, + backfill_if_necessary=True) + if manifest_datatype is None: + logger.error('Missing manifest for tag `%s`', repositorytag.id) + return None + + # Retrieve the new-style Manifest for the TagManifest, if any. + try: + tag_manifest = TagManifest.get(id=manifest_datatype._db_id) + except TagManifest.DoesNotExist: + logger.exception('Could not find tag manifest') + return None + + try: + return TagManifestToManifest.get(tag_manifest=tag_manifest).manifest_id + except TagManifestToManifest.DoesNotExist: + # Could not find the new style manifest, so backfill. + _backfill_manifest(tag_manifest) + + # Try to retrieve the manifest again, since we've performed a backfill. + try: + return TagManifestToManifest.get(tag_manifest=tag_manifest).manifest_id + except TagManifestToManifest.DoesNotExist: + return None + + +def _backfill_manifest(tag_manifest): + logger.info('Backfilling manifest for tag manifest %s', tag_manifest.id) # Ensure that a mapping row doesn't already exist. If it does, we've been preempted. - if lookup_map_row(tag_manifest): + if lookup_manifest_map_row(tag_manifest): return False # Parse the manifest. If we cannot parse, then we treat the manifest as broken and just emit it @@ -214,7 +299,7 @@ def backfill_manifest(tag_manifest): return True # Ensure it wasn't already created. - if lookup_map_row(tag_manifest): + if lookup_manifest_map_row(tag_manifest): return False # Check for a pre-existing manifest matching the digest in the repository. This can happen @@ -235,18 +320,46 @@ def backfill_manifest(tag_manifest): try: TagManifestToManifest.create(tag_manifest=tag_manifest, manifest=manifest_row, broken=is_broken) - return True except IntegrityError: return False + # Backfill any labels on the manifest. + _backfill_labels(tag_manifest, manifest_row, repository) + return True + + +def _backfill_labels(tag_manifest, manifest, repository): + tmls = list(TagManifestLabel.select().where(TagManifestLabel.annotated == tag_manifest)) + if not tmls: + return + + for tag_manifest_label in tmls: + label = tag_manifest_label.label + try: + TagManifestLabelMap.get(tag_manifest_label=tag_manifest_label) + continue + except TagManifestLabelMap.DoesNotExist: + pass + + try: + manifest_label = ManifestLabel.create(manifest=manifest, label=label, + repository=repository) + TagManifestLabelMap.create(manifest_label=manifest_label, + tag_manifest_label=tag_manifest_label, + label=label, + manifest=manifest, + tag_manifest=tag_manifest_label.annotated) + except IntegrityError: + continue + if __name__ == "__main__": logging.config.fileConfig(logfile_path(debug=False), disable_existing_loggers=False) - if not app.config.get('BACKFILL_TAG_MANIFESTS', False): - logger.debug('Manifest backfill disabled; skipping') + if not app.config.get('BACKFILL_TAGS', False): + logger.debug('Tag backfill disabled; skipping') while True: time.sleep(100000) - worker = ManifestBackfillWorker() + worker = TagBackfillWorker() worker.start() diff --git a/workers/test/test_manifestbackfillworker.py b/workers/test/test_tagbackfillworker.py similarity index 75% rename from workers/test/test_manifestbackfillworker.py rename to workers/test/test_tagbackfillworker.py index c1c31d6bf..0dd2d2e70 100644 --- a/workers/test/test_manifestbackfillworker.py +++ b/workers/test/test_tagbackfillworker.py @@ -4,11 +4,11 @@ from data.database import (TagManifestLabelMap, TagManifestToManifest, Manifest, ManifestLegacyImage, ManifestLabel, TagManifest, RepositoryTag, Image, TagManifestLabel, Tag, TagToRepositoryTag) from image.docker.schema1 import DockerSchema1ManifestBuilder -from workers.manifestbackfillworker import backfill_manifest -from workers.labelbackfillworker import backfill_label +from workers.tagbackfillworker import backfill_tag, _backfill_manifest from test.fixtures import * + @pytest.fixture() def clear_rows(initialized_db): # Remove all new-style rows so we can backfill. @@ -22,15 +22,56 @@ def clear_rows(initialized_db): Manifest.delete().execute() -def test_manifestbackfillworker(clear_rows, initialized_db): - for tag_manifest in TagManifest.select(): - # Backfill the manifest. - assert backfill_manifest(tag_manifest) +@pytest.mark.parametrize('clear_all_rows', [ + True, + False, +]) +def test_tagbackfillworker(clear_all_rows, initialized_db): + # Remove the new-style rows so we can backfill. + TagToRepositoryTag.delete().execute() + Tag.delete().execute() + + if clear_all_rows: + TagManifestLabelMap.delete().execute() + ManifestLabel.delete().execute() + ManifestBlob.delete().execute() + ManifestLegacyImage.delete().execute() + TagManifestToManifest.delete().execute() + Manifest.delete().execute() + + for repository_tag in list(RepositoryTag.select()): + # Backfill the tag. + assert backfill_tag(repository_tag) # Ensure if we try again, the backfill is skipped. - assert not backfill_manifest(tag_manifest) + assert not backfill_tag(repository_tag) + + # Ensure that we now have the expected tag rows. + tag_to_repo_tag = TagToRepositoryTag.get(repository_tag=repository_tag) + tag = tag_to_repo_tag.tag + assert tag.name == repository_tag.name + assert tag.repository == repository_tag.repository + assert not tag.hidden + assert tag.reversion == repository_tag.reversion + + if repository_tag.lifetime_start_ts is None: + assert tag.lifetime_start_ms is None + else: + assert tag.lifetime_start_ms == (repository_tag.lifetime_start_ts * 1000) + + if repository_tag.lifetime_end_ts is None: + assert tag.lifetime_end_ms is None + else: + assert tag.lifetime_end_ms == (repository_tag.lifetime_end_ts * 1000) + + assert tag.manifest # Ensure that we now have the expected manifest rows. + try: + tag_manifest = TagManifest.get(tag=repository_tag) + except TagManifest.DoesNotExist: + continue + map_row = TagManifestToManifest.get(tag_manifest=tag_manifest) assert not map_row.broken @@ -39,6 +80,8 @@ def test_manifestbackfillworker(clear_rows, initialized_db): assert manifest_row.digest == tag_manifest.digest assert manifest_row.repository == tag_manifest.tag.repository + assert tag.manifest == map_row.manifest + legacy_image = ManifestLegacyImage.get(manifest=manifest_row).image assert tag_manifest.tag.image == legacy_image @@ -50,15 +93,12 @@ def test_manifestbackfillworker(clear_rows, initialized_db): in ManifestBlob.select().where(ManifestBlob.manifest == manifest_row)} assert expected_storages == found_storages - # Ensure that backfilling labels now works. - for tml in TagManifestLabel.select().where(TagManifestLabel.annotated == tag_manifest): - assert backfill_label(tml) - - label_map = TagManifestLabelMap.get(tag_manifest_label=tml) - assert label_map.tag_manifest == tag_manifest - assert label_map.manifest == manifest_row - assert label_map.manifest_label.label == label_map.tag_manifest_label.label - assert label_map.label == tml.label + # Ensure the labels were copied over. + tmls = list(TagManifestLabel.select().where(TagManifestLabel.annotated == tag_manifest)) + expected_labels = {tml.label_id for tml in tmls} + found_labels = {m.label_id for m + in ManifestLabel.select().where(ManifestLabel.manifest == manifest_row)} + assert found_labels == expected_labels def test_manifestbackfillworker_broken_manifest(clear_rows, initialized_db): @@ -69,9 +109,9 @@ def test_manifestbackfillworker_broken_manifest(clear_rows, initialized_db): # Add a broken manifest. broken_manifest = TagManifest.create(json_data='wat?', digest='sha256:foobar', tag=RepositoryTag.get()) - + # Ensure the backfill works. - assert backfill_manifest(broken_manifest) + assert _backfill_manifest(broken_manifest) # Ensure the mapping is marked as broken. map_row = TagManifestToManifest.get(tag_manifest=broken_manifest) @@ -106,7 +146,7 @@ def test_manifestbackfillworker_mislinked_manifest(clear_rows, initialized_db): tag=tag_v50) # Backfill the manifest and ensure its proper content checksum was linked. - assert backfill_manifest(mislinked_manifest) + assert _backfill_manifest(mislinked_manifest) map_row = TagManifestToManifest.get(tag_manifest=mislinked_manifest) assert not map_row.broken @@ -140,7 +180,7 @@ def test_manifestbackfillworker_mislinked_invalid_manifest(clear_rows, initializ tag=tag_v50) # Backfill the manifest and ensure it is marked as broken. - assert backfill_manifest(broken_manifest) + assert _backfill_manifest(broken_manifest) map_row = TagManifestToManifest.get(tag_manifest=broken_manifest) assert map_row.broken @@ -174,8 +214,8 @@ def test_manifestbackfillworker_repeat_digest(clear_rows, initialized_db): tag=tag_v50) # Backfill "both" manifests and ensure both are pointed to by a single resulting row. - assert backfill_manifest(manifest_1) - assert backfill_manifest(manifest_2) + assert _backfill_manifest(manifest_1) + assert _backfill_manifest(manifest_2) map_row1 = TagManifestToManifest.get(tag_manifest=manifest_1) map_row2 = TagManifestToManifest.get(tag_manifest=manifest_2)