Add a worker for backfilling labels on manifests that have already been backfilled
This commit is contained in:
parent
01ee1e693c
commit
aeceea0f97
5 changed files with 148 additions and 2 deletions
4
conf/init/service/batch/labelbackfillworker/log/run
Executable file
4
conf/init/service/batch/labelbackfillworker/log/run
Executable file
|
@ -0,0 +1,4 @@
|
||||||
|
#!/bin/sh
|
||||||
|
|
||||||
|
# Start the logger
|
||||||
|
exec logger -i -t labelbackfillworker
|
9
conf/init/service/batch/labelbackfillworker/run
Executable file
9
conf/init/service/batch/labelbackfillworker/run
Executable file
|
@ -0,0 +1,9 @@
|
||||||
|
#! /bin/bash
|
||||||
|
|
||||||
|
echo 'Starting label backfill worker'
|
||||||
|
|
||||||
|
QUAYPATH=${QUAYPATH:-"."}
|
||||||
|
cd ${QUAYDIR:-"/"}
|
||||||
|
PYTHONPATH=$QUAYPATH venv/bin/python -m workers.labelbackfillworker 2>&1
|
||||||
|
|
||||||
|
echo 'Repository label backfill exited'
|
120
workers/labelbackfillworker.py
Normal file
120
workers/labelbackfillworker.py
Normal file
|
@ -0,0 +1,120 @@
|
||||||
|
import logging
|
||||||
|
import logging.config
|
||||||
|
|
||||||
|
import time
|
||||||
|
|
||||||
|
from peewee import JOIN, fn, IntegrityError
|
||||||
|
|
||||||
|
from app import app
|
||||||
|
from data.database import (UseThenDisconnect, TagManifestLabel, TagManifestLabelMap,
|
||||||
|
TagManifestToManifest, ManifestLabel, db_transaction)
|
||||||
|
from workers.worker import Worker
|
||||||
|
from util.log import logfile_path
|
||||||
|
from util.migrate.allocator import yield_random_entries
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
WORKER_TIMEOUT = 600
|
||||||
|
|
||||||
|
class LabelBackfillWorker(Worker):
|
||||||
|
def __init__(self):
|
||||||
|
super(LabelBackfillWorker, self).__init__()
|
||||||
|
self.add_operation(self._backfill_labels, WORKER_TIMEOUT)
|
||||||
|
|
||||||
|
def _candidates_to_backfill(self):
|
||||||
|
def missing_tmt_query():
|
||||||
|
return (TagManifestLabel
|
||||||
|
.select()
|
||||||
|
.join(TagManifestLabelMap, JOIN.LEFT_OUTER)
|
||||||
|
.where(TagManifestLabelMap.id >> None))
|
||||||
|
|
||||||
|
min_id = (TagManifestLabel
|
||||||
|
.select(fn.Min(TagManifestLabel.id))
|
||||||
|
.join(TagManifestLabelMap, JOIN.LEFT_OUTER)
|
||||||
|
.where(TagManifestLabelMap.id >> None)
|
||||||
|
.scalar())
|
||||||
|
max_id = TagManifestLabel.select(fn.Max(TagManifestLabel.id)).scalar()
|
||||||
|
|
||||||
|
iterator = yield_random_entries(
|
||||||
|
missing_tmt_query,
|
||||||
|
TagManifestLabel.id,
|
||||||
|
100,
|
||||||
|
max_id,
|
||||||
|
min_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
return iterator
|
||||||
|
|
||||||
|
def _backfill_labels(self):
|
||||||
|
with UseThenDisconnect(app.config):
|
||||||
|
iterator = self._candidates_to_backfill()
|
||||||
|
if iterator is None:
|
||||||
|
logger.debug('Found no additional labels to backfill')
|
||||||
|
time.sleep(10000)
|
||||||
|
return None
|
||||||
|
|
||||||
|
for candidate, abt, _ in iterator:
|
||||||
|
if not backfill_label(candidate):
|
||||||
|
logger.info('Another worker pre-empted us for label: %s', candidate.id)
|
||||||
|
abt.set()
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def lookup_map_row(tag_manifest_label):
|
||||||
|
try:
|
||||||
|
TagManifestLabelMap.get(tag_manifest_label=tag_manifest_label)
|
||||||
|
return True
|
||||||
|
except TagManifestLabelMap.DoesNotExist:
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def backfill_label(tag_manifest_label):
|
||||||
|
logger.info('Backfilling label %s', tag_manifest_label.id)
|
||||||
|
|
||||||
|
# Ensure that a mapping row doesn't already exist. If it does, we've been preempted.
|
||||||
|
if lookup_map_row(tag_manifest_label):
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Ensure the tag manifest has been backfilled into the manifest table.
|
||||||
|
try:
|
||||||
|
tmt = TagManifestToManifest.get(tag_manifest=tag_manifest_label.annotated)
|
||||||
|
except TagManifestToManifest.DoesNotExist:
|
||||||
|
# We'll come back to this later.
|
||||||
|
logger.debug('Tag Manifest %s for label %s has not yet been backfilled',
|
||||||
|
tag_manifest_label.annotated.id, tag_manifest_label.id)
|
||||||
|
return True
|
||||||
|
|
||||||
|
repository = tag_manifest_label.repository
|
||||||
|
|
||||||
|
# Create the new mapping entry and label.
|
||||||
|
with db_transaction():
|
||||||
|
if lookup_map_row(tag_manifest_label):
|
||||||
|
return False
|
||||||
|
|
||||||
|
label = tag_manifest_label.label
|
||||||
|
if tmt.manifest:
|
||||||
|
try:
|
||||||
|
manifest_label = ManifestLabel.create(manifest=tmt.manifest, label=label,
|
||||||
|
repository=repository)
|
||||||
|
TagManifestLabelMap.create(manifest_label=manifest_label,
|
||||||
|
tag_manifest_label=tag_manifest_label,
|
||||||
|
label=label,
|
||||||
|
manifest=tmt.manifest,
|
||||||
|
tag_manifest=tag_manifest_label.annotated)
|
||||||
|
except IntegrityError:
|
||||||
|
return False
|
||||||
|
|
||||||
|
logger.info('Backfilled label %s', tag_manifest_label.id)
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
logging.config.fileConfig(logfile_path(debug=False), disable_existing_loggers=False)
|
||||||
|
|
||||||
|
if not app.config.get('BACKFILL_TAG_MANIFEST_LABELS', False):
|
||||||
|
logger.debug('Manifest label backfill disabled; skipping')
|
||||||
|
while True:
|
||||||
|
time.sleep(100000)
|
||||||
|
|
||||||
|
worker = LabelBackfillWorker()
|
||||||
|
worker.start()
|
|
@ -2,6 +2,8 @@ import logging
|
||||||
import logging.config
|
import logging.config
|
||||||
import time
|
import time
|
||||||
|
|
||||||
|
import time
|
||||||
|
|
||||||
from peewee import JOIN, fn, IntegrityError
|
from peewee import JOIN, fn, IntegrityError
|
||||||
|
|
||||||
from app import app
|
from app import app
|
||||||
|
@ -88,11 +90,11 @@ class ManifestBackfillWorker(Worker):
|
||||||
return iterator
|
return iterator
|
||||||
|
|
||||||
def _backfill_manifests(self):
|
def _backfill_manifests(self):
|
||||||
""" Performs garbage collection on repositories. """
|
|
||||||
with UseThenDisconnect(app.config):
|
with UseThenDisconnect(app.config):
|
||||||
iterator = self._candidates_to_backfill()
|
iterator = self._candidates_to_backfill()
|
||||||
if iterator is None:
|
if iterator is None:
|
||||||
logger.debug('Found no additional images to scan')
|
logger.debug('Found no additional manifest to backfill')
|
||||||
|
time.sleep(10000)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
for candidate, abt, _ in iterator:
|
for candidate, abt, _ in iterator:
|
||||||
|
|
|
@ -5,6 +5,7 @@ from data.database import (TagManifestLabelMap, TagManifestToManifest, Manifest,
|
||||||
TagManifestLabel)
|
TagManifestLabel)
|
||||||
from image.docker.schema1 import DockerSchema1ManifestBuilder
|
from image.docker.schema1 import DockerSchema1ManifestBuilder
|
||||||
from workers.manifestbackfillworker import backfill_manifest
|
from workers.manifestbackfillworker import backfill_manifest
|
||||||
|
from workers.labelbackfillworker import backfill_label
|
||||||
|
|
||||||
from test.fixtures import *
|
from test.fixtures import *
|
||||||
|
|
||||||
|
@ -47,6 +48,16 @@ def test_manifestbackfillworker(clear_rows, initialized_db):
|
||||||
in ManifestBlob.select().where(ManifestBlob.manifest == manifest_row)}
|
in ManifestBlob.select().where(ManifestBlob.manifest == manifest_row)}
|
||||||
assert expected_storages == found_storages
|
assert expected_storages == found_storages
|
||||||
|
|
||||||
|
# Ensure that backfilling labels now works.
|
||||||
|
for tml in TagManifestLabel.select().where(TagManifestLabel.annotated == tag_manifest):
|
||||||
|
assert backfill_label(tml)
|
||||||
|
|
||||||
|
label_map = TagManifestLabelMap.get(tag_manifest_label=tml)
|
||||||
|
assert label_map.tag_manifest == tag_manifest
|
||||||
|
assert label_map.manifest == manifest_row
|
||||||
|
assert label_map.manifest_label.label == label_map.tag_manifest_label.label
|
||||||
|
assert label_map.label == tml.label
|
||||||
|
|
||||||
|
|
||||||
def test_manifestbackfillworker_broken_manifest(clear_rows, initialized_db):
|
def test_manifestbackfillworker_broken_manifest(clear_rows, initialized_db):
|
||||||
# Delete existing tag manifest so we can reuse the tag.
|
# Delete existing tag manifest so we can reuse the tag.
|
||||||
|
|
Reference in a new issue