Add a tag backfill worker to fully backfill the new-style Tag's in the background

This commit is contained in:
Joseph Schorr 2018-12-06 16:30:47 -05:00
parent 57e93a82c9
commit eb7591183d
5 changed files with 598 additions and 1 deletions

View file

@ -0,0 +1,4 @@
#!/bin/sh
# Start the logger
exec logger -i -t tagbackfillworker

View file

@ -0,0 +1,9 @@
#! /bin/bash
echo 'Starting tag backfill worker'
QUAYPATH=${QUAYPATH:-"."}
cd ${QUAYDIR:-"/"}
PYTHONPATH=$QUAYPATH venv/bin/python -m workers.tagbackfillworker 2>&1
echo 'Repository tag backfill exited'

View file

@ -64,7 +64,7 @@ class PreOCIModel(SharedModel, RegistryDataInterface):
if backfill_if_necessary:
return self.backfill_manifest_for_tag(tag)
return
return None
return Manifest.for_tag_manifest(tag_manifest)

View file

@ -0,0 +1,361 @@
import logging
import logging.config
import time
from peewee import JOIN, fn, IntegrityError
from app import app
from data.database import (UseThenDisconnect, TagToRepositoryTag, RepositoryTag,
TagManifestToManifest, Tag, TagManifest, TagManifestToManifest, Image,
Manifest, TagManifestLabel, ManifestLabel, TagManifestLabelMap, db_transaction)
from data.model import DataModelException
from data.model.image import get_parent_images
from data.model.tag import populate_manifest
from data.model.blob import get_repo_blob_by_digest, BlobDoesNotExist
from data.registry_model import pre_oci_model
from data.registry_model.datatypes import Tag as TagDataType
from image.docker.schema1 import (DockerSchema1Manifest, ManifestException, ManifestInterface,
DOCKER_SCHEMA1_SIGNED_MANIFEST_CONTENT_TYPE)
from workers.worker import Worker
from util.log import logfile_path
from util.migrate.allocator import yield_random_entries
logger = logging.getLogger(__name__)
WORKER_TIMEOUT = 600
class BrokenManifest(ManifestInterface):
""" Implementation of the ManifestInterface for "broken" manifests. This allows us to add the
new manifest row while not adding any additional rows for it.
"""
def __init__(self, digest, payload):
self._digest = digest
self._payload = payload
@property
def digest(self):
return self._digest
@property
def media_type(self):
return DOCKER_SCHEMA1_SIGNED_MANIFEST_CONTENT_TYPE
@property
def manifest_dict(self):
return {}
@property
def bytes(self):
return self._payload
def get_layers(self, content_retriever):
return None
def get_legacy_image_ids(self, cr):
return []
def get_leaf_layer_v1_image_id(self, cr):
return None
@property
def blob_digests(self):
return []
@property
def local_blob_digests(self):
return []
def child_manifests(self, lookup_manifest_fn):
return None
def get_manifest_labels(self, lookup_config_fn):
return {}
def unsigned(self):
return self
def generate_legacy_layers(self, images_map, lookup_config_fn):
return None
def get_schema1_manifest(self, namespace_name, repo_name, tag_name, lookup_fn):
return self
@property
def schema_version(self):
return 1
@property
def layers_compressed_size(self):
return None
@property
def is_manifest_list(self):
return False
@property
def has_legacy_image(self):
return False
def get_requires_empty_layer_blob(self, content_retriever):
return False
class TagBackfillWorker(Worker):
def __init__(self):
super(TagBackfillWorker, self).__init__()
self.add_operation(self._backfill_tags, WORKER_TIMEOUT)
def _candidates_to_backfill(self):
def missing_tmt_query():
return (RepositoryTag
.select()
.join(TagToRepositoryTag, JOIN.LEFT_OUTER)
.where(TagToRepositoryTag.id >> None, RepositoryTag.hidden == False))
min_id = (RepositoryTag
.select(fn.Min(RepositoryTag.id))
.join(TagToRepositoryTag, JOIN.LEFT_OUTER)
.where(TagToRepositoryTag.id >> None, RepositoryTag.hidden == False)
.scalar())
max_id = RepositoryTag.select(fn.Max(RepositoryTag.id)).scalar()
iterator = yield_random_entries(
missing_tmt_query,
RepositoryTag.id,
1000,
max_id,
min_id,
)
return iterator
def _backfill_tags(self):
with UseThenDisconnect(app.config):
iterator = self._candidates_to_backfill()
if iterator is None:
logger.debug('Found no additional tags to backfill')
time.sleep(10000)
return None
for candidate, abt, _ in iterator:
if not backfill_tag(candidate):
logger.info('Another worker pre-empted us for label: %s', candidate.id)
abt.set()
def lookup_map_row(repositorytag):
try:
TagToRepositoryTag.get(repository_tag=repositorytag)
return True
except TagToRepositoryTag.DoesNotExist:
return False
def backfill_tag(repositorytag):
logger.info('Backfilling tag %s', repositorytag.id)
# Ensure that a mapping row doesn't already exist. If it does, we've been preempted.
if lookup_map_row(repositorytag):
return False
# Grab the manifest for the RepositoryTag, backfilling is necessary.
manifest_id = _get_manifest_id(repositorytag)
if manifest_id is None:
return False
lifetime_start_ms = (repositorytag.lifetime_start_ts * 1000
if repositorytag.lifetime_start_ts else None)
lifetime_end_ms = (repositorytag.lifetime_end_ts * 1000
if repositorytag.lifetime_end_ts else None)
# Create the new Tag.
with db_transaction():
if lookup_map_row(repositorytag):
return False
try:
created = Tag.create(name=repositorytag.name,
repository=repositorytag.repository,
lifetime_start_ms=lifetime_start_ms,
lifetime_end_ms=lifetime_end_ms,
reversion=repositorytag.reversion,
manifest=manifest_id,
tag_kind=Tag.tag_kind.get_id('tag'))
TagToRepositoryTag.create(tag=created, repository_tag=repositorytag,
repository=repositorytag.repository)
except IntegrityError:
logger.exception('Could not create tag for repo tag `%s`', repositorytag.id)
return False
logger.info('Backfilled tag %s', repositorytag.id)
return True
def lookup_manifest_map_row(tag_manifest):
try:
TagManifestToManifest.get(tag_manifest=tag_manifest)
return True
except TagManifestToManifest.DoesNotExist:
return False
def _get_manifest_id(repositorytag):
repository_tag_datatype = TagDataType.for_repository_tag(repositorytag)
# Retrieve the TagManifest for the RepositoryTag, backfilling if necessary.
with db_transaction():
manifest_datatype = pre_oci_model.get_manifest_for_tag(repository_tag_datatype,
backfill_if_necessary=True)
if manifest_datatype is None:
logger.error('Missing manifest for tag `%s`', repositorytag.id)
return None
# Retrieve the new-style Manifest for the TagManifest, if any.
try:
tag_manifest = TagManifest.get(id=manifest_datatype._db_id)
except TagManifest.DoesNotExist:
logger.exception('Could not find tag manifest')
return None
try:
return TagManifestToManifest.get(tag_manifest=tag_manifest).manifest_id
except TagManifestToManifest.DoesNotExist:
# Could not find the new style manifest, so backfill.
_backfill_manifest(tag_manifest)
# Try to retrieve the manifest again, since we've performed a backfill.
try:
return TagManifestToManifest.get(tag_manifest=tag_manifest).manifest_id
except TagManifestToManifest.DoesNotExist:
return None
def _backfill_manifest(tag_manifest):
logger.info('Backfilling manifest for tag manifest %s', tag_manifest.id)
# Ensure that a mapping row doesn't already exist. If it does, we've been preempted.
if lookup_manifest_map_row(tag_manifest):
return False
# Parse the manifest. If we cannot parse, then we treat the manifest as broken and just emit it
# without additional rows or data, as it will eventually not be useful.
is_broken = False
try:
manifest = DockerSchema1Manifest.for_latin1_bytes(tag_manifest.json_data, validate=False)
except ManifestException:
logger.exception('Exception when trying to parse manifest %s', tag_manifest.id)
manifest = BrokenManifest(tag_manifest.digest, tag_manifest.json_data)
is_broken = True
# Lookup the storages for the digests.
root_image = tag_manifest.tag.image
repository = tag_manifest.tag.repository
image_storage_id_map = {root_image.storage.content_checksum: root_image.storage.id}
try:
parent_images = get_parent_images(repository.namespace_user.username, repository.name,
root_image)
except DataModelException:
logger.exception('Exception when trying to load parent images for manifest `%s`',
tag_manifest.id)
parent_images = {}
is_broken = True
for parent_image in parent_images:
image_storage_id_map[parent_image.storage.content_checksum] = parent_image.storage.id
# Ensure that all the expected blobs have been found. If not, we lookup the blob under the repo
# and add its storage ID. If the blob is not found, we mark the manifest as broken.
storage_ids = set()
for blob_digest in manifest.blob_digests:
if blob_digest in image_storage_id_map:
storage_ids.add(image_storage_id_map[blob_digest])
else:
logger.debug('Blob `%s` not found in images for manifest `%s`; checking repo',
blob_digest, tag_manifest.id)
try:
blob_storage = get_repo_blob_by_digest(repository.namespace_user.username, repository.name,
blob_digest)
storage_ids.add(blob_storage.id)
except BlobDoesNotExist:
logger.debug('Blob `%s` not found in repo for manifest `%s`',
blob_digest, tag_manifest.id)
is_broken = True
with db_transaction():
# Re-retrieve the tag manifest to ensure it still exists and we're pointing at the correct tag.
try:
tag_manifest = TagManifest.get(id=tag_manifest.id)
except TagManifest.DoesNotExist:
return True
# Ensure it wasn't already created.
if lookup_manifest_map_row(tag_manifest):
return False
# Check for a pre-existing manifest matching the digest in the repository. This can happen
# if we've already created the manifest row (typically for tag reverision).
try:
manifest_row = Manifest.get(digest=manifest.digest, repository=tag_manifest.tag.repository)
except Manifest.DoesNotExist:
# Create the new-style rows for the manifest.
try:
manifest_row = populate_manifest(tag_manifest.tag.repository, manifest,
tag_manifest.tag.image, storage_ids)
except IntegrityError:
# Pre-empted.
return False
# Create the mapping row. If we find another was created for this tag manifest in the
# meantime, then we've been preempted.
try:
TagManifestToManifest.create(tag_manifest=tag_manifest, manifest=manifest_row,
broken=is_broken)
except IntegrityError:
return False
# Backfill any labels on the manifest.
_backfill_labels(tag_manifest, manifest_row, repository)
return True
def _backfill_labels(tag_manifest, manifest, repository):
tmls = list(TagManifestLabel.select().where(TagManifestLabel.annotated == tag_manifest))
if not tmls:
return
for tag_manifest_label in tmls:
label = tag_manifest_label.label
try:
TagManifestLabelMap.get(tag_manifest_label=tag_manifest_label)
continue
except TagManifestLabelMap.DoesNotExist:
pass
try:
manifest_label = ManifestLabel.create(manifest=manifest, label=label,
repository=repository)
TagManifestLabelMap.create(manifest_label=manifest_label,
tag_manifest_label=tag_manifest_label,
label=label,
manifest=manifest,
tag_manifest=tag_manifest_label.annotated)
except IntegrityError:
continue
if __name__ == "__main__":
logging.config.fileConfig(logfile_path(debug=False), disable_existing_loggers=False)
if not app.config.get('BACKFILL_TAGS', False):
logger.debug('Tag backfill disabled; skipping')
while True:
time.sleep(100000)
worker = TagBackfillWorker()
worker.start()

View file

@ -0,0 +1,223 @@
from app import docker_v2_signing_key
from data import model
from data.database import (TagManifestLabelMap, TagManifestToManifest, Manifest, ManifestBlob,
ManifestLegacyImage, ManifestLabel, TagManifest, RepositoryTag, Image,
TagManifestLabel, Tag, TagToRepositoryTag)
from image.docker.schema1 import DockerSchema1ManifestBuilder
from workers.tagbackfillworker import backfill_tag, _backfill_manifest
from test.fixtures import *
@pytest.fixture()
def clear_rows(initialized_db):
# Remove all new-style rows so we can backfill.
TagToRepositoryTag.delete().execute()
Tag.delete().execute()
TagManifestLabelMap.delete().execute()
ManifestLabel.delete().execute()
ManifestBlob.delete().execute()
ManifestLegacyImage.delete().execute()
TagManifestToManifest.delete().execute()
Manifest.delete().execute()
@pytest.mark.parametrize('clear_all_rows', [
True,
False,
])
def test_tagbackfillworker(clear_all_rows, initialized_db):
# Remove the new-style rows so we can backfill.
TagToRepositoryTag.delete().execute()
Tag.delete().execute()
if clear_all_rows:
TagManifestLabelMap.delete().execute()
ManifestLabel.delete().execute()
ManifestBlob.delete().execute()
ManifestLegacyImage.delete().execute()
TagManifestToManifest.delete().execute()
Manifest.delete().execute()
for repository_tag in list(RepositoryTag.select()):
# Backfill the tag.
assert backfill_tag(repository_tag)
# Ensure if we try again, the backfill is skipped.
assert not backfill_tag(repository_tag)
# Ensure that we now have the expected tag rows.
tag_to_repo_tag = TagToRepositoryTag.get(repository_tag=repository_tag)
tag = tag_to_repo_tag.tag
assert tag.name == repository_tag.name
assert tag.repository == repository_tag.repository
assert not tag.hidden
assert tag.reversion == repository_tag.reversion
if repository_tag.lifetime_start_ts is None:
assert tag.lifetime_start_ms is None
else:
assert tag.lifetime_start_ms == (repository_tag.lifetime_start_ts * 1000)
if repository_tag.lifetime_end_ts is None:
assert tag.lifetime_end_ms is None
else:
assert tag.lifetime_end_ms == (repository_tag.lifetime_end_ts * 1000)
assert tag.manifest
# Ensure that we now have the expected manifest rows.
try:
tag_manifest = TagManifest.get(tag=repository_tag)
except TagManifest.DoesNotExist:
continue
map_row = TagManifestToManifest.get(tag_manifest=tag_manifest)
assert not map_row.broken
manifest_row = map_row.manifest
assert manifest_row.manifest_bytes == tag_manifest.json_data
assert manifest_row.digest == tag_manifest.digest
assert manifest_row.repository == tag_manifest.tag.repository
assert tag.manifest == map_row.manifest
legacy_image = ManifestLegacyImage.get(manifest=manifest_row).image
assert tag_manifest.tag.image == legacy_image
expected_storages = {tag_manifest.tag.image.storage.id}
for parent_image_id in tag_manifest.tag.image.ancestor_id_list():
expected_storages.add(Image.get(id=parent_image_id).storage_id)
found_storages = {manifest_blob.blob_id for manifest_blob
in ManifestBlob.select().where(ManifestBlob.manifest == manifest_row)}
assert expected_storages == found_storages
# Ensure the labels were copied over.
tmls = list(TagManifestLabel.select().where(TagManifestLabel.annotated == tag_manifest))
expected_labels = {tml.label_id for tml in tmls}
found_labels = {m.label_id for m
in ManifestLabel.select().where(ManifestLabel.manifest == manifest_row)}
assert found_labels == expected_labels
def test_manifestbackfillworker_broken_manifest(clear_rows, initialized_db):
# Delete existing tag manifest so we can reuse the tag.
TagManifestLabel.delete().execute()
TagManifest.delete().execute()
# Add a broken manifest.
broken_manifest = TagManifest.create(json_data='wat?', digest='sha256:foobar',
tag=RepositoryTag.get())
# Ensure the backfill works.
assert _backfill_manifest(broken_manifest)
# Ensure the mapping is marked as broken.
map_row = TagManifestToManifest.get(tag_manifest=broken_manifest)
assert map_row.broken
manifest_row = map_row.manifest
assert manifest_row.manifest_bytes == broken_manifest.json_data
assert manifest_row.digest == broken_manifest.digest
assert manifest_row.repository == broken_manifest.tag.repository
legacy_image = ManifestLegacyImage.get(manifest=manifest_row).image
assert broken_manifest.tag.image == legacy_image
def test_manifestbackfillworker_mislinked_manifest(clear_rows, initialized_db):
""" Tests that a manifest whose image is mislinked will have its storages relinked properly. """
# Delete existing tag manifest so we can reuse the tag.
TagManifestLabel.delete().execute()
TagManifest.delete().execute()
repo = model.repository.get_repository('devtable', 'complex')
tag_v30 = model.tag.get_active_tag('devtable', 'gargantuan', 'v3.0')
tag_v50 = model.tag.get_active_tag('devtable', 'gargantuan', 'v5.0')
# Add a mislinked manifest, by having its layer point to a blob in v3.0 but its image
# be the v5.0 image.
builder = DockerSchema1ManifestBuilder('devtable', 'gargantuan', 'sometag')
builder.add_layer(tag_v30.image.storage.content_checksum, '{"id": "foo"}')
manifest = builder.build(docker_v2_signing_key)
mislinked_manifest = TagManifest.create(json_data=manifest.bytes, digest=manifest.digest,
tag=tag_v50)
# Backfill the manifest and ensure its proper content checksum was linked.
assert _backfill_manifest(mislinked_manifest)
map_row = TagManifestToManifest.get(tag_manifest=mislinked_manifest)
assert not map_row.broken
manifest_row = map_row.manifest
legacy_image = ManifestLegacyImage.get(manifest=manifest_row).image
assert legacy_image == tag_v50.image
manifest_blobs = list(ManifestBlob.select().where(ManifestBlob.manifest == manifest_row))
assert len(manifest_blobs) == 1
assert manifest_blobs[0].blob.content_checksum == tag_v30.image.storage.content_checksum
def test_manifestbackfillworker_mislinked_invalid_manifest(clear_rows, initialized_db):
""" Tests that a manifest whose image is mislinked will attempt to have its storages relinked
properly. """
# Delete existing tag manifest so we can reuse the tag.
TagManifestLabel.delete().execute()
TagManifest.delete().execute()
repo = model.repository.get_repository('devtable', 'complex')
tag_v50 = model.tag.get_active_tag('devtable', 'gargantuan', 'v5.0')
# Add a mislinked manifest, by having its layer point to an invalid blob but its image
# be the v5.0 image.
builder = DockerSchema1ManifestBuilder('devtable', 'gargantuan', 'sometag')
builder.add_layer('sha256:deadbeef', '{"id": "foo"}')
manifest = builder.build(docker_v2_signing_key)
broken_manifest = TagManifest.create(json_data=manifest.bytes, digest=manifest.digest,
tag=tag_v50)
# Backfill the manifest and ensure it is marked as broken.
assert _backfill_manifest(broken_manifest)
map_row = TagManifestToManifest.get(tag_manifest=broken_manifest)
assert map_row.broken
manifest_row = map_row.manifest
legacy_image = ManifestLegacyImage.get(manifest=manifest_row).image
assert legacy_image == tag_v50.image
manifest_blobs = list(ManifestBlob.select().where(ManifestBlob.manifest == manifest_row))
assert len(manifest_blobs) == 0
def test_manifestbackfillworker_repeat_digest(clear_rows, initialized_db):
""" Tests that a manifest with a shared digest will be properly linked. """
# Delete existing tag manifest so we can reuse the tag.
TagManifestLabel.delete().execute()
TagManifest.delete().execute()
repo = model.repository.get_repository('devtable', 'gargantuan')
tag_v30 = model.tag.get_active_tag('devtable', 'gargantuan', 'v3.0')
tag_v50 = model.tag.get_active_tag('devtable', 'gargantuan', 'v5.0')
# Build a manifest and assign it to both tags (this is allowed in the old model).
builder = DockerSchema1ManifestBuilder('devtable', 'gargantuan', 'sometag')
builder.add_layer('sha256:deadbeef', '{"id": "foo"}')
manifest = builder.build(docker_v2_signing_key)
manifest_1 = TagManifest.create(json_data=manifest.bytes, digest=manifest.digest,
tag=tag_v30)
manifest_2 = TagManifest.create(json_data=manifest.bytes, digest=manifest.digest,
tag=tag_v50)
# Backfill "both" manifests and ensure both are pointed to by a single resulting row.
assert _backfill_manifest(manifest_1)
assert _backfill_manifest(manifest_2)
map_row1 = TagManifestToManifest.get(tag_manifest=manifest_1)
map_row2 = TagManifestToManifest.get(tag_manifest=manifest_2)
assert map_row1.manifest == map_row2.manifest