Merge pull request #3302 from quay/joseph.schorr/QUAY-1017/tag-backfill
Tag backfill worker
This commit is contained in:
commit
be2cece7b0
7 changed files with 220 additions and 67 deletions
|
@ -1,4 +0,0 @@
|
|||
#!/bin/sh
|
||||
|
||||
# Start the logger
|
||||
exec logger -i -t manifestbackfillworker
|
|
@ -1,9 +0,0 @@
|
|||
#! /bin/bash
|
||||
|
||||
echo 'Starting manifest backfill worker'
|
||||
|
||||
QUAYPATH=${QUAYPATH:-"."}
|
||||
cd ${QUAYDIR:-"/"}
|
||||
PYTHONPATH=$QUAYPATH venv/bin/python -m workers.manifestbackfillworker 2>&1
|
||||
|
||||
echo 'Repository manifest backfill exited'
|
4
conf/init/service/batch/tagbackfillworker/log/run
Executable file
4
conf/init/service/batch/tagbackfillworker/log/run
Executable file
|
@ -0,0 +1,4 @@
|
|||
#!/bin/sh
|
||||
|
||||
# Start the logger
|
||||
exec logger -i -t tagbackfillworker
|
9
conf/init/service/batch/tagbackfillworker/run
Executable file
9
conf/init/service/batch/tagbackfillworker/run
Executable file
|
@ -0,0 +1,9 @@
|
|||
#! /bin/bash
|
||||
|
||||
echo 'Starting tag backfill worker'
|
||||
|
||||
QUAYPATH=${QUAYPATH:-"."}
|
||||
cd ${QUAYDIR:-"/"}
|
||||
PYTHONPATH=$QUAYPATH venv/bin/python -m workers.tagbackfillworker 2>&1
|
||||
|
||||
echo 'Repository tag backfill exited'
|
|
@ -64,7 +64,7 @@ class PreOCIModel(SharedModel, RegistryDataInterface):
|
|||
if backfill_if_necessary:
|
||||
return self.backfill_manifest_for_tag(tag)
|
||||
|
||||
return
|
||||
return None
|
||||
|
||||
return Manifest.for_tag_manifest(tag_manifest)
|
||||
|
||||
|
|
|
@ -1,20 +1,23 @@
|
|||
import logging
|
||||
import logging.config
|
||||
import time
|
||||
|
||||
import time
|
||||
|
||||
from peewee import JOIN, fn, IntegrityError
|
||||
|
||||
from app import app
|
||||
from data.database import (UseThenDisconnect, TagManifest, TagManifestToManifest, Image,
|
||||
Manifest, db_transaction)
|
||||
from data.database import (UseThenDisconnect, TagToRepositoryTag, RepositoryTag,
|
||||
TagManifestToManifest, Tag, TagManifest, TagManifestToManifest, Image,
|
||||
Manifest, TagManifestLabel, ManifestLabel, TagManifestLabelMap, db_transaction)
|
||||
from data.model import DataModelException
|
||||
from data.model.image import get_parent_images
|
||||
from data.model.tag import populate_manifest
|
||||
from data.model.blob import get_repo_blob_by_digest, BlobDoesNotExist
|
||||
from data.registry_model import pre_oci_model
|
||||
from data.registry_model.datatypes import Tag as TagDataType
|
||||
from image.docker.schema1 import (DockerSchema1Manifest, ManifestException, ManifestInterface,
|
||||
DOCKER_SCHEMA1_SIGNED_MANIFEST_CONTENT_TYPE)
|
||||
|
||||
from workers.worker import Worker
|
||||
from util.log import logfile_path
|
||||
from util.migrate.allocator import yield_random_entries
|
||||
|
@ -23,6 +26,7 @@ logger = logging.getLogger(__name__)
|
|||
|
||||
WORKER_TIMEOUT = 600
|
||||
|
||||
|
||||
class BrokenManifest(ManifestInterface):
|
||||
""" Implementation of the ManifestInterface for "broken" manifests. This allows us to add the
|
||||
new manifest row while not adding any additional rows for it.
|
||||
|
@ -59,7 +63,7 @@ class BrokenManifest(ManifestInterface):
|
|||
@property
|
||||
def blob_digests(self):
|
||||
return []
|
||||
|
||||
|
||||
@property
|
||||
def local_blob_digests(self):
|
||||
return []
|
||||
|
@ -98,54 +102,104 @@ class BrokenManifest(ManifestInterface):
|
|||
def get_requires_empty_layer_blob(self, content_retriever):
|
||||
return False
|
||||
|
||||
def convert_manifest(self, media_types, namespace_name, repo_name, tag_name, lookup_fn):
|
||||
def convert_manifest(self, allowed_mediatypes, namespace_name, repo_name, tag_name,
|
||||
content_retriever):
|
||||
return None
|
||||
|
||||
|
||||
class ManifestBackfillWorker(Worker):
|
||||
class TagBackfillWorker(Worker):
|
||||
def __init__(self):
|
||||
super(ManifestBackfillWorker, self).__init__()
|
||||
self.add_operation(self._backfill_manifests, WORKER_TIMEOUT)
|
||||
super(TagBackfillWorker, self).__init__()
|
||||
self.add_operation(self._backfill_tags, WORKER_TIMEOUT)
|
||||
|
||||
def _candidates_to_backfill(self):
|
||||
def missing_tmt_query():
|
||||
return (TagManifest
|
||||
return (RepositoryTag
|
||||
.select()
|
||||
.join(TagManifestToManifest, JOIN.LEFT_OUTER)
|
||||
.where(TagManifestToManifest.id >> None))
|
||||
.join(TagToRepositoryTag, JOIN.LEFT_OUTER)
|
||||
.where(TagToRepositoryTag.id >> None, RepositoryTag.hidden == False))
|
||||
|
||||
min_id = (TagManifest
|
||||
.select(fn.Min(TagManifest.id))
|
||||
.join(TagManifestToManifest, JOIN.LEFT_OUTER)
|
||||
.where(TagManifestToManifest.id >> None)
|
||||
min_id = (RepositoryTag
|
||||
.select(fn.Min(RepositoryTag.id))
|
||||
.join(TagToRepositoryTag, JOIN.LEFT_OUTER)
|
||||
.where(TagToRepositoryTag.id >> None, RepositoryTag.hidden == False)
|
||||
.scalar())
|
||||
max_id = TagManifest.select(fn.Max(TagManifest.id)).scalar()
|
||||
max_id = RepositoryTag.select(fn.Max(RepositoryTag.id)).scalar()
|
||||
|
||||
iterator = yield_random_entries(
|
||||
missing_tmt_query,
|
||||
TagManifest.id,
|
||||
100,
|
||||
RepositoryTag.id,
|
||||
1000,
|
||||
max_id,
|
||||
min_id,
|
||||
)
|
||||
|
||||
return iterator
|
||||
|
||||
def _backfill_manifests(self):
|
||||
def _backfill_tags(self):
|
||||
with UseThenDisconnect(app.config):
|
||||
iterator = self._candidates_to_backfill()
|
||||
if iterator is None:
|
||||
logger.debug('Found no additional manifest to backfill')
|
||||
logger.debug('Found no additional tags to backfill')
|
||||
time.sleep(10000)
|
||||
return None
|
||||
|
||||
for candidate, abt, _ in iterator:
|
||||
if not backfill_manifest(candidate):
|
||||
logger.info('Another worker pre-empted us for manifest: %s', candidate.id)
|
||||
if not backfill_tag(candidate):
|
||||
logger.info('Another worker pre-empted us for label: %s', candidate.id)
|
||||
abt.set()
|
||||
|
||||
|
||||
def lookup_map_row(tag_manifest):
|
||||
def lookup_map_row(repositorytag):
|
||||
try:
|
||||
TagToRepositoryTag.get(repository_tag=repositorytag)
|
||||
return True
|
||||
except TagToRepositoryTag.DoesNotExist:
|
||||
return False
|
||||
|
||||
|
||||
def backfill_tag(repositorytag):
|
||||
logger.info('Backfilling tag %s', repositorytag.id)
|
||||
|
||||
# Ensure that a mapping row doesn't already exist. If it does, we've been preempted.
|
||||
if lookup_map_row(repositorytag):
|
||||
return False
|
||||
|
||||
# Grab the manifest for the RepositoryTag, backfilling is necessary.
|
||||
manifest_id = _get_manifest_id(repositorytag)
|
||||
if manifest_id is None:
|
||||
return False
|
||||
|
||||
lifetime_start_ms = (repositorytag.lifetime_start_ts * 1000
|
||||
if repositorytag.lifetime_start_ts else None)
|
||||
lifetime_end_ms = (repositorytag.lifetime_end_ts * 1000
|
||||
if repositorytag.lifetime_end_ts else None)
|
||||
|
||||
# Create the new Tag.
|
||||
with db_transaction():
|
||||
if lookup_map_row(repositorytag):
|
||||
return False
|
||||
|
||||
try:
|
||||
created = Tag.create(name=repositorytag.name,
|
||||
repository=repositorytag.repository,
|
||||
lifetime_start_ms=lifetime_start_ms,
|
||||
lifetime_end_ms=lifetime_end_ms,
|
||||
reversion=repositorytag.reversion,
|
||||
manifest=manifest_id,
|
||||
tag_kind=Tag.tag_kind.get_id('tag'))
|
||||
|
||||
TagToRepositoryTag.create(tag=created, repository_tag=repositorytag,
|
||||
repository=repositorytag.repository)
|
||||
except IntegrityError:
|
||||
logger.exception('Could not create tag for repo tag `%s`', repositorytag.id)
|
||||
return False
|
||||
|
||||
logger.info('Backfilled tag %s', repositorytag.id)
|
||||
return True
|
||||
|
||||
|
||||
def lookup_manifest_map_row(tag_manifest):
|
||||
try:
|
||||
TagManifestToManifest.get(tag_manifest=tag_manifest)
|
||||
return True
|
||||
|
@ -153,11 +207,42 @@ def lookup_map_row(tag_manifest):
|
|||
return False
|
||||
|
||||
|
||||
def backfill_manifest(tag_manifest):
|
||||
logger.info('Backfilling manifest %s', tag_manifest.id)
|
||||
def _get_manifest_id(repositorytag):
|
||||
repository_tag_datatype = TagDataType.for_repository_tag(repositorytag)
|
||||
|
||||
# Retrieve the TagManifest for the RepositoryTag, backfilling if necessary.
|
||||
with db_transaction():
|
||||
manifest_datatype = pre_oci_model.get_manifest_for_tag(repository_tag_datatype,
|
||||
backfill_if_necessary=True)
|
||||
if manifest_datatype is None:
|
||||
logger.error('Missing manifest for tag `%s`', repositorytag.id)
|
||||
return None
|
||||
|
||||
# Retrieve the new-style Manifest for the TagManifest, if any.
|
||||
try:
|
||||
tag_manifest = TagManifest.get(id=manifest_datatype._db_id)
|
||||
except TagManifest.DoesNotExist:
|
||||
logger.exception('Could not find tag manifest')
|
||||
return None
|
||||
|
||||
try:
|
||||
return TagManifestToManifest.get(tag_manifest=tag_manifest).manifest_id
|
||||
except TagManifestToManifest.DoesNotExist:
|
||||
# Could not find the new style manifest, so backfill.
|
||||
_backfill_manifest(tag_manifest)
|
||||
|
||||
# Try to retrieve the manifest again, since we've performed a backfill.
|
||||
try:
|
||||
return TagManifestToManifest.get(tag_manifest=tag_manifest).manifest_id
|
||||
except TagManifestToManifest.DoesNotExist:
|
||||
return None
|
||||
|
||||
|
||||
def _backfill_manifest(tag_manifest):
|
||||
logger.info('Backfilling manifest for tag manifest %s', tag_manifest.id)
|
||||
|
||||
# Ensure that a mapping row doesn't already exist. If it does, we've been preempted.
|
||||
if lookup_map_row(tag_manifest):
|
||||
if lookup_manifest_map_row(tag_manifest):
|
||||
return False
|
||||
|
||||
# Parse the manifest. If we cannot parse, then we treat the manifest as broken and just emit it
|
||||
|
@ -214,7 +299,7 @@ def backfill_manifest(tag_manifest):
|
|||
return True
|
||||
|
||||
# Ensure it wasn't already created.
|
||||
if lookup_map_row(tag_manifest):
|
||||
if lookup_manifest_map_row(tag_manifest):
|
||||
return False
|
||||
|
||||
# Check for a pre-existing manifest matching the digest in the repository. This can happen
|
||||
|
@ -235,18 +320,46 @@ def backfill_manifest(tag_manifest):
|
|||
try:
|
||||
TagManifestToManifest.create(tag_manifest=tag_manifest, manifest=manifest_row,
|
||||
broken=is_broken)
|
||||
return True
|
||||
except IntegrityError:
|
||||
return False
|
||||
|
||||
# Backfill any labels on the manifest.
|
||||
_backfill_labels(tag_manifest, manifest_row, repository)
|
||||
return True
|
||||
|
||||
|
||||
def _backfill_labels(tag_manifest, manifest, repository):
|
||||
tmls = list(TagManifestLabel.select().where(TagManifestLabel.annotated == tag_manifest))
|
||||
if not tmls:
|
||||
return
|
||||
|
||||
for tag_manifest_label in tmls:
|
||||
label = tag_manifest_label.label
|
||||
try:
|
||||
TagManifestLabelMap.get(tag_manifest_label=tag_manifest_label)
|
||||
continue
|
||||
except TagManifestLabelMap.DoesNotExist:
|
||||
pass
|
||||
|
||||
try:
|
||||
manifest_label = ManifestLabel.create(manifest=manifest, label=label,
|
||||
repository=repository)
|
||||
TagManifestLabelMap.create(manifest_label=manifest_label,
|
||||
tag_manifest_label=tag_manifest_label,
|
||||
label=label,
|
||||
manifest=manifest,
|
||||
tag_manifest=tag_manifest_label.annotated)
|
||||
except IntegrityError:
|
||||
continue
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging.config.fileConfig(logfile_path(debug=False), disable_existing_loggers=False)
|
||||
|
||||
if not app.config.get('BACKFILL_TAG_MANIFESTS', False):
|
||||
logger.debug('Manifest backfill disabled; skipping')
|
||||
if not app.config.get('BACKFILL_TAGS', False):
|
||||
logger.debug('Tag backfill disabled; skipping')
|
||||
while True:
|
||||
time.sleep(100000)
|
||||
|
||||
worker = ManifestBackfillWorker()
|
||||
worker = TagBackfillWorker()
|
||||
worker.start()
|
|
@ -4,11 +4,11 @@ from data.database import (TagManifestLabelMap, TagManifestToManifest, Manifest,
|
|||
ManifestLegacyImage, ManifestLabel, TagManifest, RepositoryTag, Image,
|
||||
TagManifestLabel, Tag, TagToRepositoryTag)
|
||||
from image.docker.schema1 import DockerSchema1ManifestBuilder
|
||||
from workers.manifestbackfillworker import backfill_manifest
|
||||
from workers.labelbackfillworker import backfill_label
|
||||
from workers.tagbackfillworker import backfill_tag, _backfill_manifest
|
||||
|
||||
from test.fixtures import *
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def clear_rows(initialized_db):
|
||||
# Remove all new-style rows so we can backfill.
|
||||
|
@ -22,15 +22,56 @@ def clear_rows(initialized_db):
|
|||
Manifest.delete().execute()
|
||||
|
||||
|
||||
def test_manifestbackfillworker(clear_rows, initialized_db):
|
||||
for tag_manifest in TagManifest.select():
|
||||
# Backfill the manifest.
|
||||
assert backfill_manifest(tag_manifest)
|
||||
@pytest.mark.parametrize('clear_all_rows', [
|
||||
True,
|
||||
False,
|
||||
])
|
||||
def test_tagbackfillworker(clear_all_rows, initialized_db):
|
||||
# Remove the new-style rows so we can backfill.
|
||||
TagToRepositoryTag.delete().execute()
|
||||
Tag.delete().execute()
|
||||
|
||||
if clear_all_rows:
|
||||
TagManifestLabelMap.delete().execute()
|
||||
ManifestLabel.delete().execute()
|
||||
ManifestBlob.delete().execute()
|
||||
ManifestLegacyImage.delete().execute()
|
||||
TagManifestToManifest.delete().execute()
|
||||
Manifest.delete().execute()
|
||||
|
||||
for repository_tag in list(RepositoryTag.select()):
|
||||
# Backfill the tag.
|
||||
assert backfill_tag(repository_tag)
|
||||
|
||||
# Ensure if we try again, the backfill is skipped.
|
||||
assert not backfill_manifest(tag_manifest)
|
||||
assert not backfill_tag(repository_tag)
|
||||
|
||||
# Ensure that we now have the expected tag rows.
|
||||
tag_to_repo_tag = TagToRepositoryTag.get(repository_tag=repository_tag)
|
||||
tag = tag_to_repo_tag.tag
|
||||
assert tag.name == repository_tag.name
|
||||
assert tag.repository == repository_tag.repository
|
||||
assert not tag.hidden
|
||||
assert tag.reversion == repository_tag.reversion
|
||||
|
||||
if repository_tag.lifetime_start_ts is None:
|
||||
assert tag.lifetime_start_ms is None
|
||||
else:
|
||||
assert tag.lifetime_start_ms == (repository_tag.lifetime_start_ts * 1000)
|
||||
|
||||
if repository_tag.lifetime_end_ts is None:
|
||||
assert tag.lifetime_end_ms is None
|
||||
else:
|
||||
assert tag.lifetime_end_ms == (repository_tag.lifetime_end_ts * 1000)
|
||||
|
||||
assert tag.manifest
|
||||
|
||||
# Ensure that we now have the expected manifest rows.
|
||||
try:
|
||||
tag_manifest = TagManifest.get(tag=repository_tag)
|
||||
except TagManifest.DoesNotExist:
|
||||
continue
|
||||
|
||||
map_row = TagManifestToManifest.get(tag_manifest=tag_manifest)
|
||||
assert not map_row.broken
|
||||
|
||||
|
@ -39,6 +80,8 @@ def test_manifestbackfillworker(clear_rows, initialized_db):
|
|||
assert manifest_row.digest == tag_manifest.digest
|
||||
assert manifest_row.repository == tag_manifest.tag.repository
|
||||
|
||||
assert tag.manifest == map_row.manifest
|
||||
|
||||
legacy_image = ManifestLegacyImage.get(manifest=manifest_row).image
|
||||
assert tag_manifest.tag.image == legacy_image
|
||||
|
||||
|
@ -50,15 +93,12 @@ def test_manifestbackfillworker(clear_rows, initialized_db):
|
|||
in ManifestBlob.select().where(ManifestBlob.manifest == manifest_row)}
|
||||
assert expected_storages == found_storages
|
||||
|
||||
# Ensure that backfilling labels now works.
|
||||
for tml in TagManifestLabel.select().where(TagManifestLabel.annotated == tag_manifest):
|
||||
assert backfill_label(tml)
|
||||
|
||||
label_map = TagManifestLabelMap.get(tag_manifest_label=tml)
|
||||
assert label_map.tag_manifest == tag_manifest
|
||||
assert label_map.manifest == manifest_row
|
||||
assert label_map.manifest_label.label == label_map.tag_manifest_label.label
|
||||
assert label_map.label == tml.label
|
||||
# Ensure the labels were copied over.
|
||||
tmls = list(TagManifestLabel.select().where(TagManifestLabel.annotated == tag_manifest))
|
||||
expected_labels = {tml.label_id for tml in tmls}
|
||||
found_labels = {m.label_id for m
|
||||
in ManifestLabel.select().where(ManifestLabel.manifest == manifest_row)}
|
||||
assert found_labels == expected_labels
|
||||
|
||||
|
||||
def test_manifestbackfillworker_broken_manifest(clear_rows, initialized_db):
|
||||
|
@ -69,9 +109,9 @@ def test_manifestbackfillworker_broken_manifest(clear_rows, initialized_db):
|
|||
# Add a broken manifest.
|
||||
broken_manifest = TagManifest.create(json_data='wat?', digest='sha256:foobar',
|
||||
tag=RepositoryTag.get())
|
||||
|
||||
|
||||
# Ensure the backfill works.
|
||||
assert backfill_manifest(broken_manifest)
|
||||
assert _backfill_manifest(broken_manifest)
|
||||
|
||||
# Ensure the mapping is marked as broken.
|
||||
map_row = TagManifestToManifest.get(tag_manifest=broken_manifest)
|
||||
|
@ -106,7 +146,7 @@ def test_manifestbackfillworker_mislinked_manifest(clear_rows, initialized_db):
|
|||
tag=tag_v50)
|
||||
|
||||
# Backfill the manifest and ensure its proper content checksum was linked.
|
||||
assert backfill_manifest(mislinked_manifest)
|
||||
assert _backfill_manifest(mislinked_manifest)
|
||||
|
||||
map_row = TagManifestToManifest.get(tag_manifest=mislinked_manifest)
|
||||
assert not map_row.broken
|
||||
|
@ -140,7 +180,7 @@ def test_manifestbackfillworker_mislinked_invalid_manifest(clear_rows, initializ
|
|||
tag=tag_v50)
|
||||
|
||||
# Backfill the manifest and ensure it is marked as broken.
|
||||
assert backfill_manifest(broken_manifest)
|
||||
assert _backfill_manifest(broken_manifest)
|
||||
|
||||
map_row = TagManifestToManifest.get(tag_manifest=broken_manifest)
|
||||
assert map_row.broken
|
||||
|
@ -174,8 +214,8 @@ def test_manifestbackfillworker_repeat_digest(clear_rows, initialized_db):
|
|||
tag=tag_v50)
|
||||
|
||||
# Backfill "both" manifests and ensure both are pointed to by a single resulting row.
|
||||
assert backfill_manifest(manifest_1)
|
||||
assert backfill_manifest(manifest_2)
|
||||
assert _backfill_manifest(manifest_1)
|
||||
assert _backfill_manifest(manifest_2)
|
||||
|
||||
map_row1 = TagManifestToManifest.get(tag_manifest=manifest_1)
|
||||
map_row2 = TagManifestToManifest.get(tag_manifest=manifest_2)
|
Reference in a new issue