diff --git a/conf/init/service/batch/labelbackfillworker/log/run b/conf/init/service/batch/labelbackfillworker/log/run new file mode 100755 index 000000000..2437a88f1 --- /dev/null +++ b/conf/init/service/batch/labelbackfillworker/log/run @@ -0,0 +1,4 @@ +#!/bin/sh + +# Start the logger +exec logger -i -t labelbackfillworker \ No newline at end of file diff --git a/conf/init/service/batch/labelbackfillworker/run b/conf/init/service/batch/labelbackfillworker/run new file mode 100755 index 000000000..1b7c3d799 --- /dev/null +++ b/conf/init/service/batch/labelbackfillworker/run @@ -0,0 +1,9 @@ +#! /bin/bash + +echo 'Starting label backfill worker' + +QUAYPATH=${QUAYPATH:-"."} +cd ${QUAYDIR:-"/"} +PYTHONPATH=$QUAYPATH venv/bin/python -m workers.labelbackfillworker 2>&1 + +echo 'Repository label backfill exited' diff --git a/workers/labelbackfillworker.py b/workers/labelbackfillworker.py new file mode 100644 index 000000000..b2407f606 --- /dev/null +++ b/workers/labelbackfillworker.py @@ -0,0 +1,120 @@ +import logging +import logging.config + +import time + +from peewee import JOIN, fn, IntegrityError + +from app import app +from data.database import (UseThenDisconnect, TagManifestLabel, TagManifestLabelMap, + TagManifestToManifest, ManifestLabel, db_transaction) +from workers.worker import Worker +from util.log import logfile_path +from util.migrate.allocator import yield_random_entries + +logger = logging.getLogger(__name__) + +WORKER_TIMEOUT = 600 + +class LabelBackfillWorker(Worker): + def __init__(self): + super(LabelBackfillWorker, self).__init__() + self.add_operation(self._backfill_labels, WORKER_TIMEOUT) + + def _candidates_to_backfill(self): + def missing_tmt_query(): + return (TagManifestLabel + .select() + .join(TagManifestLabelMap, JOIN.LEFT_OUTER) + .where(TagManifestLabelMap.id >> None)) + + min_id = (TagManifestLabel + .select(fn.Min(TagManifestLabel.id)) + .join(TagManifestLabelMap, JOIN.LEFT_OUTER) + .where(TagManifestLabelMap.id >> None) + .scalar()) + max_id = TagManifestLabel.select(fn.Max(TagManifestLabel.id)).scalar() + + iterator = yield_random_entries( + missing_tmt_query, + TagManifestLabel.id, + 100, + max_id, + min_id, + ) + + return iterator + + def _backfill_labels(self): + with UseThenDisconnect(app.config): + iterator = self._candidates_to_backfill() + if iterator is None: + logger.debug('Found no additional labels to backfill') + time.sleep(10000) + return None + + for candidate, abt, _ in iterator: + if not backfill_label(candidate): + logger.info('Another worker pre-empted us for label: %s', candidate.id) + abt.set() + + + +def lookup_map_row(tag_manifest_label): + try: + TagManifestLabelMap.get(tag_manifest_label=tag_manifest_label) + return True + except TagManifestLabelMap.DoesNotExist: + return False + + +def backfill_label(tag_manifest_label): + logger.info('Backfilling label %s', tag_manifest_label.id) + + # Ensure that a mapping row doesn't already exist. If it does, we've been preempted. + if lookup_map_row(tag_manifest_label): + return False + + # Ensure the tag manifest has been backfilled into the manifest table. + try: + tmt = TagManifestToManifest.get(tag_manifest=tag_manifest_label.annotated) + except TagManifestToManifest.DoesNotExist: + # We'll come back to this later. + logger.debug('Tag Manifest %s for label %s has not yet been backfilled', + tag_manifest_label.annotated.id, tag_manifest_label.id) + return True + + repository = tag_manifest_label.repository + + # Create the new mapping entry and label. + with db_transaction(): + if lookup_map_row(tag_manifest_label): + return False + + label = tag_manifest_label.label + if tmt.manifest: + try: + manifest_label = ManifestLabel.create(manifest=tmt.manifest, label=label, + repository=repository) + TagManifestLabelMap.create(manifest_label=manifest_label, + tag_manifest_label=tag_manifest_label, + label=label, + manifest=tmt.manifest, + tag_manifest=tag_manifest_label.annotated) + except IntegrityError: + return False + + logger.info('Backfilled label %s', tag_manifest_label.id) + return True + + +if __name__ == "__main__": + logging.config.fileConfig(logfile_path(debug=False), disable_existing_loggers=False) + + if not app.config.get('BACKFILL_TAG_MANIFEST_LABELS', False): + logger.debug('Manifest label backfill disabled; skipping') + while True: + time.sleep(100000) + + worker = LabelBackfillWorker() + worker.start() diff --git a/workers/manifestbackfillworker.py b/workers/manifestbackfillworker.py index 391b1879c..8593fccdd 100644 --- a/workers/manifestbackfillworker.py +++ b/workers/manifestbackfillworker.py @@ -2,6 +2,8 @@ import logging import logging.config import time +import time + from peewee import JOIN, fn, IntegrityError from app import app @@ -88,11 +90,11 @@ class ManifestBackfillWorker(Worker): return iterator def _backfill_manifests(self): - """ Performs garbage collection on repositories. """ with UseThenDisconnect(app.config): iterator = self._candidates_to_backfill() if iterator is None: - logger.debug('Found no additional images to scan') + logger.debug('Found no additional manifest to backfill') + time.sleep(10000) return None for candidate, abt, _ in iterator: diff --git a/workers/test/test_manifestbackfillworker.py b/workers/test/test_manifestbackfillworker.py index 4d8a9182e..58a918524 100644 --- a/workers/test/test_manifestbackfillworker.py +++ b/workers/test/test_manifestbackfillworker.py @@ -5,6 +5,7 @@ from data.database import (TagManifestLabelMap, TagManifestToManifest, Manifest, TagManifestLabel) from image.docker.schema1 import DockerSchema1ManifestBuilder from workers.manifestbackfillworker import backfill_manifest +from workers.labelbackfillworker import backfill_label from test.fixtures import * @@ -47,6 +48,16 @@ def test_manifestbackfillworker(clear_rows, initialized_db): in ManifestBlob.select().where(ManifestBlob.manifest == manifest_row)} assert expected_storages == found_storages + # Ensure that backfilling labels now works. + for tml in TagManifestLabel.select().where(TagManifestLabel.annotated == tag_manifest): + assert backfill_label(tml) + + label_map = TagManifestLabelMap.get(tag_manifest_label=tml) + assert label_map.tag_manifest == tag_manifest + assert label_map.manifest == manifest_row + assert label_map.manifest_label.label == label_map.tag_manifest_label.label + assert label_map.label == tml.label + def test_manifestbackfillworker_broken_manifest(clear_rows, initialized_db): # Delete existing tag manifest so we can reuse the tag.