Add worker to backfill the new manifest tables from the tagmanifest table
This commit is contained in:
parent
805e928dff
commit
03ea3a3250
3 changed files with 241 additions and 7 deletions
155
workers/manifestbackfillworker.py
Normal file
155
workers/manifestbackfillworker.py
Normal file
|
@ -0,0 +1,155 @@
|
|||
import logging
|
||||
import logging.config
|
||||
|
||||
from peewee import JOIN, fn, IntegrityError
|
||||
|
||||
from app import app
|
||||
from data.database import (UseThenDisconnect, TagManifest, TagManifestToManifest, Image,
|
||||
db_transaction)
|
||||
from data.model.tag import populate_manifest
|
||||
from image.docker.schema1 import (DockerSchema1Manifest, ManifestException, ManifestInterface,
|
||||
DOCKER_SCHEMA1_SIGNED_MANIFEST_CONTENT_TYPE)
|
||||
from workers.worker import Worker
|
||||
from util.log import logfile_path
|
||||
from util.migrate.allocator import yield_random_entries
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class BrokenManifest(ManifestInterface):
|
||||
""" Implementation of the ManifestInterface for "broken" manifests. This allows us to add the
|
||||
new manifest row while not adding any additional rows for it.
|
||||
"""
|
||||
def __init__(self, digest, payload):
|
||||
self._digest = digest
|
||||
self._payload = payload
|
||||
|
||||
@property
|
||||
def digest(self):
|
||||
return self._digest
|
||||
|
||||
@property
|
||||
def media_type(self):
|
||||
return DOCKER_SCHEMA1_SIGNED_MANIFEST_CONTENT_TYPE
|
||||
|
||||
@property
|
||||
def manifest_dict(self):
|
||||
return {}
|
||||
|
||||
@property
|
||||
def bytes(self):
|
||||
return self._payload
|
||||
|
||||
@property
|
||||
def layers(self):
|
||||
return []
|
||||
|
||||
@property
|
||||
def leaf_layer_v1_image_id(self):
|
||||
return None
|
||||
|
||||
@property
|
||||
def blob_digests(self):
|
||||
return []
|
||||
|
||||
|
||||
class ManifestBackfillWorker(Worker):
|
||||
def __init__(self):
|
||||
super(ManifestBackfillWorker, self).__init__()
|
||||
self.add_operation(self._backfill_manifests, 1)
|
||||
|
||||
def _candidates_to_backfill(self):
|
||||
def missing_tmt_query():
|
||||
return (TagManifest
|
||||
.select()
|
||||
.join(TagManifestToManifest, JOIN.LEFT_OUTER)
|
||||
.where(TagManifestToManifest.id >> None))
|
||||
|
||||
min_id = (TagManifest
|
||||
.select(fn.Min(TagManifest.id))
|
||||
.join(TagManifestToManifest, JOIN.LEFT_OUTER)
|
||||
.where(TagManifestToManifest.id >> None)
|
||||
.scalar())
|
||||
max_id = TagManifest.select(fn.Max(TagManifest.id)).scalar()
|
||||
|
||||
iterator = yield_random_entries(
|
||||
missing_tmt_query,
|
||||
TagManifest.id,
|
||||
100,
|
||||
max_id,
|
||||
min_id,
|
||||
)
|
||||
|
||||
return iterator
|
||||
|
||||
def _backfill_manifests(self):
|
||||
""" Performs garbage collection on repositories. """
|
||||
with UseThenDisconnect(app.config):
|
||||
iterator = self._candidates_to_backfill()
|
||||
if iterator is None:
|
||||
logger.debug('Found no additional images to scan')
|
||||
return None
|
||||
|
||||
for candidate, abt, _ in iterator:
|
||||
if not backfill_manifest(candidate):
|
||||
logger.info('Another worker pre-empted us for manifest: %s', candidate.id)
|
||||
abt.set()
|
||||
|
||||
|
||||
def lookup_map_row(tag_manifest):
|
||||
try:
|
||||
TagManifestToManifest.get(tag_manifest=tag_manifest)
|
||||
return True
|
||||
except TagManifestToManifest.DoesNotExist:
|
||||
return False
|
||||
|
||||
|
||||
def backfill_manifest(tag_manifest):
|
||||
logger.info('Backfilling manifest %s', tag_manifest.id)
|
||||
|
||||
# Ensure that a mapping row doesn't already exist. If it does, we've been preempted.
|
||||
if lookup_map_row(tag_manifest):
|
||||
return False
|
||||
|
||||
# Parse the manifest. If we cannot parse, then we treat the manifest as broken and just emit it
|
||||
# without additional rows or data, as it will eventually not be useful.
|
||||
is_broken = False
|
||||
try:
|
||||
manifest = DockerSchema1Manifest(tag_manifest.json_data, validate=False)
|
||||
except ManifestException:
|
||||
logger.exception('Exception when trying to parse manifest %s', tag_manifest.id)
|
||||
manifest = BrokenManifest(tag_manifest.digest, tag_manifest.json_data)
|
||||
is_broken = True
|
||||
|
||||
# Lookup the storages for the digests.
|
||||
root_image = tag_manifest.tag.image
|
||||
storage_id_map = {root_image.storage.content_checksum: root_image.storage.id}
|
||||
for parent_image_id in root_image.ancestor_id_list():
|
||||
storage = Image.get(id=parent_image_id).storage
|
||||
storage_id_map[storage.content_checksum] = storage.id
|
||||
|
||||
with db_transaction():
|
||||
# Re-retrieve the tag manifest to ensure it still exists and we're pointing at the correct tag.
|
||||
try:
|
||||
tag_manifest = TagManifest.get(id=tag_manifest.id)
|
||||
except TagManifest.DoesNotExist:
|
||||
return True
|
||||
|
||||
# Create the new-style rows for the manifest.
|
||||
manifest_row = populate_manifest(tag_manifest.tag.repository, manifest,
|
||||
tag_manifest.tag.image, storage_id_map)
|
||||
|
||||
# Create the mapping row. If we find another was created for this tag manifest in the
|
||||
# meantime, then we've been preempted.
|
||||
try:
|
||||
TagManifestToManifest.create(tag_manifest=tag_manifest, manifest=manifest_row,
|
||||
broken=is_broken)
|
||||
return True
|
||||
except IntegrityError:
|
||||
return False
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging.config.fileConfig(logfile_path(debug=False), disable_existing_loggers=False)
|
||||
worker = ManifestBackfillWorker()
|
||||
worker.start()
|
Reference in a new issue