a04658085b
This will allow us to easily turn off the backfill once it is initially complete, but also reenable it if necessary
208 lines
6.8 KiB
Python
208 lines
6.8 KiB
Python
import logging
|
|
import logging.config
|
|
import time
|
|
|
|
from peewee import JOIN, fn, IntegrityError
|
|
|
|
from app import app
|
|
from data.database import (UseThenDisconnect, TagManifest, TagManifestToManifest, Image,
|
|
Manifest, db_transaction)
|
|
from data.model import DataModelException
|
|
from data.model.image import get_parent_images
|
|
from data.model.tag import populate_manifest
|
|
from data.model.blob import get_repo_blob_by_digest, BlobDoesNotExist
|
|
from image.docker.schema1 import (DockerSchema1Manifest, ManifestException, ManifestInterface,
|
|
DOCKER_SCHEMA1_SIGNED_MANIFEST_CONTENT_TYPE)
|
|
from workers.worker import Worker
|
|
from util.log import logfile_path
|
|
from util.migrate.allocator import yield_random_entries
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
WORKER_TIMEOUT = 600
|
|
|
|
class BrokenManifest(ManifestInterface):
|
|
""" Implementation of the ManifestInterface for "broken" manifests. This allows us to add the
|
|
new manifest row while not adding any additional rows for it.
|
|
"""
|
|
def __init__(self, digest, payload):
|
|
self._digest = digest
|
|
self._payload = payload
|
|
|
|
@property
|
|
def digest(self):
|
|
return self._digest
|
|
|
|
@property
|
|
def media_type(self):
|
|
return DOCKER_SCHEMA1_SIGNED_MANIFEST_CONTENT_TYPE
|
|
|
|
@property
|
|
def manifest_dict(self):
|
|
return {}
|
|
|
|
@property
|
|
def bytes(self):
|
|
return self._payload
|
|
|
|
@property
|
|
def layers(self):
|
|
return []
|
|
|
|
@property
|
|
def leaf_layer_v1_image_id(self):
|
|
return None
|
|
|
|
@property
|
|
def blob_digests(self):
|
|
return []
|
|
|
|
|
|
class ManifestBackfillWorker(Worker):
|
|
def __init__(self):
|
|
super(ManifestBackfillWorker, self).__init__()
|
|
self.add_operation(self._backfill_manifests, WORKER_TIMEOUT)
|
|
|
|
def _candidates_to_backfill(self):
|
|
def missing_tmt_query():
|
|
return (TagManifest
|
|
.select()
|
|
.join(TagManifestToManifest, JOIN.LEFT_OUTER)
|
|
.where(TagManifestToManifest.id >> None))
|
|
|
|
min_id = (TagManifest
|
|
.select(fn.Min(TagManifest.id))
|
|
.join(TagManifestToManifest, JOIN.LEFT_OUTER)
|
|
.where(TagManifestToManifest.id >> None)
|
|
.scalar())
|
|
max_id = TagManifest.select(fn.Max(TagManifest.id)).scalar()
|
|
|
|
iterator = yield_random_entries(
|
|
missing_tmt_query,
|
|
TagManifest.id,
|
|
100,
|
|
max_id,
|
|
min_id,
|
|
)
|
|
|
|
return iterator
|
|
|
|
def _backfill_manifests(self):
|
|
""" Performs garbage collection on repositories. """
|
|
with UseThenDisconnect(app.config):
|
|
iterator = self._candidates_to_backfill()
|
|
if iterator is None:
|
|
logger.debug('Found no additional images to scan')
|
|
return None
|
|
|
|
for candidate, abt, _ in iterator:
|
|
if not backfill_manifest(candidate):
|
|
logger.info('Another worker pre-empted us for manifest: %s', candidate.id)
|
|
abt.set()
|
|
|
|
|
|
def lookup_map_row(tag_manifest):
|
|
try:
|
|
TagManifestToManifest.get(tag_manifest=tag_manifest)
|
|
return True
|
|
except TagManifestToManifest.DoesNotExist:
|
|
return False
|
|
|
|
|
|
def backfill_manifest(tag_manifest):
|
|
logger.info('Backfilling manifest %s', tag_manifest.id)
|
|
|
|
# Ensure that a mapping row doesn't already exist. If it does, we've been preempted.
|
|
if lookup_map_row(tag_manifest):
|
|
return False
|
|
|
|
# Parse the manifest. If we cannot parse, then we treat the manifest as broken and just emit it
|
|
# without additional rows or data, as it will eventually not be useful.
|
|
is_broken = False
|
|
try:
|
|
manifest = DockerSchema1Manifest.for_latin1_bytes(tag_manifest.json_data, validate=False)
|
|
except ManifestException:
|
|
logger.exception('Exception when trying to parse manifest %s', tag_manifest.id)
|
|
manifest = BrokenManifest(tag_manifest.digest, tag_manifest.json_data)
|
|
is_broken = True
|
|
|
|
# Lookup the storages for the digests.
|
|
root_image = tag_manifest.tag.image
|
|
repository = tag_manifest.tag.repository
|
|
|
|
image_storage_id_map = {root_image.storage.content_checksum: root_image.storage.id}
|
|
|
|
try:
|
|
parent_images = get_parent_images(repository.namespace_user.username, repository.name,
|
|
root_image)
|
|
except DataModelException:
|
|
logger.exception('Exception when trying to load parent images for manifest `%s`',
|
|
tag_manifest.id)
|
|
parent_images = {}
|
|
is_broken = True
|
|
|
|
for parent_image in parent_images:
|
|
image_storage_id_map[parent_image.storage.content_checksum] = parent_image.storage.id
|
|
|
|
# Ensure that all the expected blobs have been found. If not, we lookup the blob under the repo
|
|
# and add its storage ID. If the blob is not found, we mark the manifest as broken.
|
|
storage_ids = set()
|
|
for blob_digest in manifest.blob_digests:
|
|
if blob_digest in image_storage_id_map:
|
|
storage_ids.add(image_storage_id_map[blob_digest])
|
|
else:
|
|
logger.debug('Blob `%s` not found in images for manifest `%s`; checking repo',
|
|
blob_digest, tag_manifest.id)
|
|
try:
|
|
blob_storage = get_repo_blob_by_digest(repository.namespace_user.username, repository.name,
|
|
blob_digest)
|
|
storage_ids.add(blob_storage.id)
|
|
except BlobDoesNotExist:
|
|
logger.debug('Blob `%s` not found in repo for manifest `%s`',
|
|
blob_digest, tag_manifest.id)
|
|
is_broken = True
|
|
|
|
with db_transaction():
|
|
# Re-retrieve the tag manifest to ensure it still exists and we're pointing at the correct tag.
|
|
try:
|
|
tag_manifest = TagManifest.get(id=tag_manifest.id)
|
|
except TagManifest.DoesNotExist:
|
|
return True
|
|
|
|
# Ensure it wasn't already created.
|
|
if lookup_map_row(tag_manifest):
|
|
return False
|
|
|
|
# Check for a pre-existing manifest matching the digest in the repository. This can happen
|
|
# if we've already created the manifest row (typically for tag reverision).
|
|
try:
|
|
manifest_row = Manifest.get(digest=manifest.digest, repository=tag_manifest.tag.repository)
|
|
except Manifest.DoesNotExist:
|
|
# Create the new-style rows for the manifest.
|
|
try:
|
|
manifest_row = populate_manifest(tag_manifest.tag.repository, manifest,
|
|
tag_manifest.tag.image, storage_ids)
|
|
except IntegrityError:
|
|
# Pre-empted.
|
|
return False
|
|
|
|
# Create the mapping row. If we find another was created for this tag manifest in the
|
|
# meantime, then we've been preempted.
|
|
try:
|
|
TagManifestToManifest.create(tag_manifest=tag_manifest, manifest=manifest_row,
|
|
broken=is_broken)
|
|
return True
|
|
except IntegrityError:
|
|
return False
|
|
|
|
|
|
if __name__ == "__main__":
|
|
logging.config.fileConfig(logfile_path(debug=False), disable_existing_loggers=False)
|
|
|
|
if not app.config.get('BACKFILL_TAG_MANIFESTS', False):
|
|
logger.debug('Manifest backfill disabled; skipping')
|
|
while True:
|
|
time.sleep(100000)
|
|
|
|
worker = ManifestBackfillWorker()
|
|
worker.start()
|