Merge pull request #3194 from quay/joseph.schorr/QUAY-1013-1015/backfill-worker
Add worker to backfill the new manifest tables from the tagmanifest table
This commit is contained in:
commit
73e585df49
5 changed files with 256 additions and 7 deletions
4
conf/init/service/batch/manifestbackfillworker/log/run
Executable file
4
conf/init/service/batch/manifestbackfillworker/log/run
Executable file
|
@ -0,0 +1,4 @@
|
|||
#!/bin/sh
|
||||
|
||||
# Start the logger
|
||||
exec logger -i -t manifestbackfillworker
|
9
conf/init/service/batch/manifestbackfillworker/run
Executable file
9
conf/init/service/batch/manifestbackfillworker/run
Executable file
|
@ -0,0 +1,9 @@
|
|||
#! /bin/bash
|
||||
|
||||
echo 'Starting manifest backfill worker'
|
||||
|
||||
QUAYPATH=${QUAYPATH:-"."}
|
||||
cd ${QUAYDIR:-"/"}
|
||||
PYTHONPATH=$QUAYPATH venv/bin/python -m workers.manifestbackfillworker 2>&1
|
||||
|
||||
echo 'Repository manifest backfill exited'
|
|
@ -605,12 +605,21 @@ def associate_generated_tag_manifest(namespace, repo_name, tag_name, manifest, s
|
|||
|
||||
|
||||
def _create_manifest(tag, manifest, storage_id_map):
|
||||
media_type = Manifest.media_type.get_id(manifest.media_type)
|
||||
manifest_row = populate_manifest(tag.repository, manifest, tag.image, storage_id_map)
|
||||
|
||||
with db_transaction():
|
||||
manifest_row = Manifest.create(digest=manifest.digest, repository=tag.repository,
|
||||
tag_manifest = TagManifest.create(tag=tag, digest=manifest.digest, json_data=manifest.bytes)
|
||||
TagManifestToManifest.create(tag_manifest=tag_manifest, manifest=manifest_row)
|
||||
return tag_manifest
|
||||
|
||||
|
||||
def populate_manifest(repository, manifest, legacy_image, storage_id_map):
|
||||
""" Populates the rows for the manifest, including its blobs and legacy image. """
|
||||
media_type = Manifest.media_type.get_id(manifest.media_type)
|
||||
with db_transaction():
|
||||
manifest_row = Manifest.create(digest=manifest.digest, repository=repository,
|
||||
manifest_bytes=manifest.bytes, media_type=media_type)
|
||||
ManifestLegacyImage.create(manifest=manifest_row, repository=tag.repository, image=tag.image)
|
||||
ManifestLegacyImage.create(manifest=manifest_row, repository=repository, image=legacy_image)
|
||||
|
||||
blobs_to_insert = []
|
||||
blobs_created = set()
|
||||
|
@ -623,16 +632,14 @@ def _create_manifest(tag, manifest, storage_id_map):
|
|||
if image_storage_id in blobs_created:
|
||||
continue
|
||||
|
||||
blobs_to_insert.append(dict(manifest=manifest_row, repository=tag.repository,
|
||||
blobs_to_insert.append(dict(manifest=manifest_row, repository=repository,
|
||||
blob=image_storage_id))
|
||||
blobs_created.add(image_storage_id)
|
||||
|
||||
if blobs_to_insert:
|
||||
ManifestBlob.insert_many(blobs_to_insert).execute()
|
||||
|
||||
tag_manifest = TagManifest.create(tag=tag, digest=manifest.digest, json_data=manifest.bytes)
|
||||
TagManifestToManifest.create(tag_manifest=tag_manifest, manifest=manifest_row)
|
||||
return tag_manifest
|
||||
return manifest_row
|
||||
|
||||
|
||||
def load_tag_manifest(namespace, repo_name, tag_name):
|
||||
|
|
157
workers/manifestbackfillworker.py
Normal file
157
workers/manifestbackfillworker.py
Normal file
|
@ -0,0 +1,157 @@
|
|||
import logging
|
||||
import logging.config
|
||||
|
||||
from peewee import JOIN, fn, IntegrityError
|
||||
|
||||
from app import app
|
||||
from data.database import (UseThenDisconnect, TagManifest, TagManifestToManifest, Image,
|
||||
db_transaction)
|
||||
from data.model.tag import populate_manifest
|
||||
from image.docker.schema1 import (DockerSchema1Manifest, ManifestException, ManifestInterface,
|
||||
DOCKER_SCHEMA1_SIGNED_MANIFEST_CONTENT_TYPE)
|
||||
from workers.worker import Worker
|
||||
from util.log import logfile_path
|
||||
from util.migrate.allocator import yield_random_entries
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
WORKER_TIMEOUT = 600
|
||||
|
||||
class BrokenManifest(ManifestInterface):
|
||||
""" Implementation of the ManifestInterface for "broken" manifests. This allows us to add the
|
||||
new manifest row while not adding any additional rows for it.
|
||||
"""
|
||||
def __init__(self, digest, payload):
|
||||
self._digest = digest
|
||||
self._payload = payload
|
||||
|
||||
@property
|
||||
def digest(self):
|
||||
return self._digest
|
||||
|
||||
@property
|
||||
def media_type(self):
|
||||
return DOCKER_SCHEMA1_SIGNED_MANIFEST_CONTENT_TYPE
|
||||
|
||||
@property
|
||||
def manifest_dict(self):
|
||||
return {}
|
||||
|
||||
@property
|
||||
def bytes(self):
|
||||
return self._payload
|
||||
|
||||
@property
|
||||
def layers(self):
|
||||
return []
|
||||
|
||||
@property
|
||||
def leaf_layer_v1_image_id(self):
|
||||
return None
|
||||
|
||||
@property
|
||||
def blob_digests(self):
|
||||
return []
|
||||
|
||||
|
||||
class ManifestBackfillWorker(Worker):
|
||||
def __init__(self):
|
||||
super(ManifestBackfillWorker, self).__init__()
|
||||
self.add_operation(self._backfill_manifests, WORKER_TIMEOUT)
|
||||
|
||||
def _candidates_to_backfill(self):
|
||||
def missing_tmt_query():
|
||||
return (TagManifest
|
||||
.select()
|
||||
.join(TagManifestToManifest, JOIN.LEFT_OUTER)
|
||||
.where(TagManifestToManifest.id >> None))
|
||||
|
||||
min_id = (TagManifest
|
||||
.select(fn.Min(TagManifest.id))
|
||||
.join(TagManifestToManifest, JOIN.LEFT_OUTER)
|
||||
.where(TagManifestToManifest.id >> None)
|
||||
.scalar())
|
||||
max_id = TagManifest.select(fn.Max(TagManifest.id)).scalar()
|
||||
|
||||
iterator = yield_random_entries(
|
||||
missing_tmt_query,
|
||||
TagManifest.id,
|
||||
100,
|
||||
max_id,
|
||||
min_id,
|
||||
)
|
||||
|
||||
return iterator
|
||||
|
||||
def _backfill_manifests(self):
|
||||
""" Performs garbage collection on repositories. """
|
||||
with UseThenDisconnect(app.config):
|
||||
iterator = self._candidates_to_backfill()
|
||||
if iterator is None:
|
||||
logger.debug('Found no additional images to scan')
|
||||
return None
|
||||
|
||||
for candidate, abt, _ in iterator:
|
||||
if not backfill_manifest(candidate):
|
||||
logger.info('Another worker pre-empted us for manifest: %s', candidate.id)
|
||||
abt.set()
|
||||
|
||||
|
||||
def lookup_map_row(tag_manifest):
|
||||
try:
|
||||
TagManifestToManifest.get(tag_manifest=tag_manifest)
|
||||
return True
|
||||
except TagManifestToManifest.DoesNotExist:
|
||||
return False
|
||||
|
||||
|
||||
def backfill_manifest(tag_manifest):
|
||||
logger.info('Backfilling manifest %s', tag_manifest.id)
|
||||
|
||||
# Ensure that a mapping row doesn't already exist. If it does, we've been preempted.
|
||||
if lookup_map_row(tag_manifest):
|
||||
return False
|
||||
|
||||
# Parse the manifest. If we cannot parse, then we treat the manifest as broken and just emit it
|
||||
# without additional rows or data, as it will eventually not be useful.
|
||||
is_broken = False
|
||||
try:
|
||||
manifest = DockerSchema1Manifest(tag_manifest.json_data, validate=False)
|
||||
except ManifestException:
|
||||
logger.exception('Exception when trying to parse manifest %s', tag_manifest.id)
|
||||
manifest = BrokenManifest(tag_manifest.digest, tag_manifest.json_data)
|
||||
is_broken = True
|
||||
|
||||
# Lookup the storages for the digests.
|
||||
root_image = tag_manifest.tag.image
|
||||
storage_id_map = {root_image.storage.content_checksum: root_image.storage.id}
|
||||
for parent_image_id in root_image.ancestor_id_list():
|
||||
storage = Image.get(id=parent_image_id).storage
|
||||
storage_id_map[storage.content_checksum] = storage.id
|
||||
|
||||
with db_transaction():
|
||||
# Re-retrieve the tag manifest to ensure it still exists and we're pointing at the correct tag.
|
||||
try:
|
||||
tag_manifest = TagManifest.get(id=tag_manifest.id)
|
||||
except TagManifest.DoesNotExist:
|
||||
return True
|
||||
|
||||
# Create the new-style rows for the manifest.
|
||||
manifest_row = populate_manifest(tag_manifest.tag.repository, manifest,
|
||||
tag_manifest.tag.image, storage_id_map)
|
||||
|
||||
# Create the mapping row. If we find another was created for this tag manifest in the
|
||||
# meantime, then we've been preempted.
|
||||
try:
|
||||
TagManifestToManifest.create(tag_manifest=tag_manifest, manifest=manifest_row,
|
||||
broken=is_broken)
|
||||
return True
|
||||
except IntegrityError:
|
||||
return False
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging.config.fileConfig(logfile_path(debug=False), disable_existing_loggers=False)
|
||||
worker = ManifestBackfillWorker()
|
||||
worker.start()
|
72
workers/test/test_manifestbackfillworker.py
Normal file
72
workers/test/test_manifestbackfillworker.py
Normal file
|
@ -0,0 +1,72 @@
|
|||
from data.database import (TagManifestLabelMap, TagManifestToManifest, Manifest, ManifestBlob,
|
||||
ManifestLegacyImage, ManifestLabel, TagManifest, RepositoryTag, Image,
|
||||
TagManifestLabel)
|
||||
from workers.manifestbackfillworker import backfill_manifest
|
||||
|
||||
from test.fixtures import *
|
||||
|
||||
@pytest.fixture()
|
||||
def clear_rows(initialized_db):
|
||||
# Remove all new-style rows so we can backfill.
|
||||
TagManifestLabelMap.delete().execute()
|
||||
ManifestLabel.delete().execute()
|
||||
ManifestBlob.delete().execute()
|
||||
ManifestLegacyImage.delete().execute()
|
||||
TagManifestToManifest.delete().execute()
|
||||
Manifest.delete().execute()
|
||||
|
||||
|
||||
def test_manifestbackfillworker(clear_rows, initialized_db):
|
||||
for tag_manifest in TagManifest.select():
|
||||
# Backfill the manifest.
|
||||
assert backfill_manifest(tag_manifest)
|
||||
|
||||
# Ensure if we try again, the backfill is skipped.
|
||||
assert not backfill_manifest(tag_manifest)
|
||||
|
||||
# Ensure that we now have the expected manifest rows.
|
||||
map_row = TagManifestToManifest.get(tag_manifest=tag_manifest)
|
||||
assert not map_row.broken
|
||||
|
||||
manifest_row = map_row.manifest
|
||||
assert manifest_row.manifest_bytes == tag_manifest.json_data
|
||||
assert manifest_row.digest == tag_manifest.digest
|
||||
assert manifest_row.repository == tag_manifest.tag.repository
|
||||
|
||||
legacy_image = ManifestLegacyImage.get(manifest=manifest_row).image
|
||||
assert tag_manifest.tag.image == legacy_image
|
||||
|
||||
expected_storages = {tag_manifest.tag.image.storage.id}
|
||||
for parent_image_id in tag_manifest.tag.image.ancestor_id_list():
|
||||
expected_storages.add(Image.get(id=parent_image_id).storage_id)
|
||||
|
||||
found_storages = {manifest_blob.blob_id for manifest_blob
|
||||
in ManifestBlob.select().where(ManifestBlob.manifest == manifest_row)}
|
||||
assert expected_storages == found_storages
|
||||
|
||||
|
||||
def test_manifestbackfillworker_broken_manifest(clear_rows, initialized_db):
|
||||
# Delete existing tag manifest so we can reuse the tag.
|
||||
TagManifestLabel.delete().execute()
|
||||
TagManifest.delete().execute()
|
||||
|
||||
# Add a broken manifest.
|
||||
broken_manifest = TagManifest.create(json_data='wat?', digest='sha256:foobar',
|
||||
tag=RepositoryTag.get())
|
||||
|
||||
# Ensure the backfill works.
|
||||
assert backfill_manifest(broken_manifest)
|
||||
|
||||
# Ensure the mapping is marked as broken.
|
||||
map_row = TagManifestToManifest.get(tag_manifest=broken_manifest)
|
||||
assert map_row.broken
|
||||
|
||||
manifest_row = map_row.manifest
|
||||
assert manifest_row.manifest_bytes == broken_manifest.json_data
|
||||
assert manifest_row.digest == broken_manifest.digest
|
||||
assert manifest_row.repository == broken_manifest.tag.repository
|
||||
|
||||
legacy_image = ManifestLegacyImage.get(manifest=manifest_row).image
|
||||
assert broken_manifest.tag.image == legacy_image
|
||||
|
||||
assert not list(ManifestBlob.select().where(ManifestBlob.manifest == manifest_row))
|
Reference in a new issue