Fix after merge
This commit is contained in:
parent
eb7591183d
commit
6963107ec6
1 changed files with 0 additions and 252 deletions
|
@ -1,252 +0,0 @@
|
||||||
import logging
|
|
||||||
import logging.config
|
|
||||||
import time
|
|
||||||
|
|
||||||
import time
|
|
||||||
|
|
||||||
from peewee import JOIN, fn, IntegrityError
|
|
||||||
|
|
||||||
from app import app
|
|
||||||
from data.database import (UseThenDisconnect, TagManifest, TagManifestToManifest, Image,
|
|
||||||
Manifest, db_transaction)
|
|
||||||
from data.model import DataModelException
|
|
||||||
from data.model.image import get_parent_images
|
|
||||||
from data.model.tag import populate_manifest
|
|
||||||
from data.model.blob import get_repo_blob_by_digest, BlobDoesNotExist
|
|
||||||
from image.docker.schema1 import (DockerSchema1Manifest, ManifestException, ManifestInterface,
|
|
||||||
DOCKER_SCHEMA1_SIGNED_MANIFEST_CONTENT_TYPE)
|
|
||||||
from workers.worker import Worker
|
|
||||||
from util.log import logfile_path
|
|
||||||
from util.migrate.allocator import yield_random_entries
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
WORKER_TIMEOUT = 600
|
|
||||||
|
|
||||||
class BrokenManifest(ManifestInterface):
|
|
||||||
""" Implementation of the ManifestInterface for "broken" manifests. This allows us to add the
|
|
||||||
new manifest row while not adding any additional rows for it.
|
|
||||||
"""
|
|
||||||
def __init__(self, digest, payload):
|
|
||||||
self._digest = digest
|
|
||||||
self._payload = payload
|
|
||||||
|
|
||||||
@property
|
|
||||||
def digest(self):
|
|
||||||
return self._digest
|
|
||||||
|
|
||||||
@property
|
|
||||||
def media_type(self):
|
|
||||||
return DOCKER_SCHEMA1_SIGNED_MANIFEST_CONTENT_TYPE
|
|
||||||
|
|
||||||
@property
|
|
||||||
def manifest_dict(self):
|
|
||||||
return {}
|
|
||||||
|
|
||||||
@property
|
|
||||||
def bytes(self):
|
|
||||||
return self._payload
|
|
||||||
|
|
||||||
def get_layers(self, content_retriever):
|
|
||||||
return None
|
|
||||||
|
|
||||||
def get_legacy_image_ids(self, cr):
|
|
||||||
return []
|
|
||||||
|
|
||||||
def get_leaf_layer_v1_image_id(self, cr):
|
|
||||||
return None
|
|
||||||
|
|
||||||
@property
|
|
||||||
def blob_digests(self):
|
|
||||||
return []
|
|
||||||
|
|
||||||
@property
|
|
||||||
def local_blob_digests(self):
|
|
||||||
return []
|
|
||||||
|
|
||||||
def child_manifests(self, lookup_manifest_fn):
|
|
||||||
return None
|
|
||||||
|
|
||||||
def get_manifest_labels(self, lookup_config_fn):
|
|
||||||
return {}
|
|
||||||
|
|
||||||
def unsigned(self):
|
|
||||||
return self
|
|
||||||
|
|
||||||
def generate_legacy_layers(self, images_map, lookup_config_fn):
|
|
||||||
return None
|
|
||||||
|
|
||||||
def get_schema1_manifest(self, namespace_name, repo_name, tag_name, lookup_fn):
|
|
||||||
return self
|
|
||||||
|
|
||||||
@property
|
|
||||||
def schema_version(self):
|
|
||||||
return 1
|
|
||||||
|
|
||||||
@property
|
|
||||||
def layers_compressed_size(self):
|
|
||||||
return None
|
|
||||||
|
|
||||||
@property
|
|
||||||
def is_manifest_list(self):
|
|
||||||
return False
|
|
||||||
|
|
||||||
@property
|
|
||||||
def has_legacy_image(self):
|
|
||||||
return False
|
|
||||||
|
|
||||||
def get_requires_empty_layer_blob(self, content_retriever):
|
|
||||||
return False
|
|
||||||
|
|
||||||
def convert_manifest(self, media_types, namespace_name, repo_name, tag_name, lookup_fn):
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
class ManifestBackfillWorker(Worker):
|
|
||||||
def __init__(self):
|
|
||||||
super(ManifestBackfillWorker, self).__init__()
|
|
||||||
self.add_operation(self._backfill_manifests, WORKER_TIMEOUT)
|
|
||||||
|
|
||||||
def _candidates_to_backfill(self):
|
|
||||||
def missing_tmt_query():
|
|
||||||
return (TagManifest
|
|
||||||
.select()
|
|
||||||
.join(TagManifestToManifest, JOIN.LEFT_OUTER)
|
|
||||||
.where(TagManifestToManifest.id >> None))
|
|
||||||
|
|
||||||
min_id = (TagManifest
|
|
||||||
.select(fn.Min(TagManifest.id))
|
|
||||||
.join(TagManifestToManifest, JOIN.LEFT_OUTER)
|
|
||||||
.where(TagManifestToManifest.id >> None)
|
|
||||||
.scalar())
|
|
||||||
max_id = TagManifest.select(fn.Max(TagManifest.id)).scalar()
|
|
||||||
|
|
||||||
iterator = yield_random_entries(
|
|
||||||
missing_tmt_query,
|
|
||||||
TagManifest.id,
|
|
||||||
100,
|
|
||||||
max_id,
|
|
||||||
min_id,
|
|
||||||
)
|
|
||||||
|
|
||||||
return iterator
|
|
||||||
|
|
||||||
def _backfill_manifests(self):
|
|
||||||
with UseThenDisconnect(app.config):
|
|
||||||
iterator = self._candidates_to_backfill()
|
|
||||||
if iterator is None:
|
|
||||||
logger.debug('Found no additional manifest to backfill')
|
|
||||||
time.sleep(10000)
|
|
||||||
return None
|
|
||||||
|
|
||||||
for candidate, abt, _ in iterator:
|
|
||||||
if not backfill_manifest(candidate):
|
|
||||||
logger.info('Another worker pre-empted us for manifest: %s', candidate.id)
|
|
||||||
abt.set()
|
|
||||||
|
|
||||||
|
|
||||||
def lookup_map_row(tag_manifest):
|
|
||||||
try:
|
|
||||||
TagManifestToManifest.get(tag_manifest=tag_manifest)
|
|
||||||
return True
|
|
||||||
except TagManifestToManifest.DoesNotExist:
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
def backfill_manifest(tag_manifest):
|
|
||||||
logger.info('Backfilling manifest %s', tag_manifest.id)
|
|
||||||
|
|
||||||
# Ensure that a mapping row doesn't already exist. If it does, we've been preempted.
|
|
||||||
if lookup_map_row(tag_manifest):
|
|
||||||
return False
|
|
||||||
|
|
||||||
# Parse the manifest. If we cannot parse, then we treat the manifest as broken and just emit it
|
|
||||||
# without additional rows or data, as it will eventually not be useful.
|
|
||||||
is_broken = False
|
|
||||||
try:
|
|
||||||
manifest = DockerSchema1Manifest.for_latin1_bytes(tag_manifest.json_data, validate=False)
|
|
||||||
except ManifestException:
|
|
||||||
logger.exception('Exception when trying to parse manifest %s', tag_manifest.id)
|
|
||||||
manifest = BrokenManifest(tag_manifest.digest, tag_manifest.json_data)
|
|
||||||
is_broken = True
|
|
||||||
|
|
||||||
# Lookup the storages for the digests.
|
|
||||||
root_image = tag_manifest.tag.image
|
|
||||||
repository = tag_manifest.tag.repository
|
|
||||||
|
|
||||||
image_storage_id_map = {root_image.storage.content_checksum: root_image.storage.id}
|
|
||||||
|
|
||||||
try:
|
|
||||||
parent_images = get_parent_images(repository.namespace_user.username, repository.name,
|
|
||||||
root_image)
|
|
||||||
except DataModelException:
|
|
||||||
logger.exception('Exception when trying to load parent images for manifest `%s`',
|
|
||||||
tag_manifest.id)
|
|
||||||
parent_images = {}
|
|
||||||
is_broken = True
|
|
||||||
|
|
||||||
for parent_image in parent_images:
|
|
||||||
image_storage_id_map[parent_image.storage.content_checksum] = parent_image.storage.id
|
|
||||||
|
|
||||||
# Ensure that all the expected blobs have been found. If not, we lookup the blob under the repo
|
|
||||||
# and add its storage ID. If the blob is not found, we mark the manifest as broken.
|
|
||||||
storage_ids = set()
|
|
||||||
for blob_digest in manifest.blob_digests:
|
|
||||||
if blob_digest in image_storage_id_map:
|
|
||||||
storage_ids.add(image_storage_id_map[blob_digest])
|
|
||||||
else:
|
|
||||||
logger.debug('Blob `%s` not found in images for manifest `%s`; checking repo',
|
|
||||||
blob_digest, tag_manifest.id)
|
|
||||||
try:
|
|
||||||
blob_storage = get_repo_blob_by_digest(repository.namespace_user.username, repository.name,
|
|
||||||
blob_digest)
|
|
||||||
storage_ids.add(blob_storage.id)
|
|
||||||
except BlobDoesNotExist:
|
|
||||||
logger.debug('Blob `%s` not found in repo for manifest `%s`',
|
|
||||||
blob_digest, tag_manifest.id)
|
|
||||||
is_broken = True
|
|
||||||
|
|
||||||
with db_transaction():
|
|
||||||
# Re-retrieve the tag manifest to ensure it still exists and we're pointing at the correct tag.
|
|
||||||
try:
|
|
||||||
tag_manifest = TagManifest.get(id=tag_manifest.id)
|
|
||||||
except TagManifest.DoesNotExist:
|
|
||||||
return True
|
|
||||||
|
|
||||||
# Ensure it wasn't already created.
|
|
||||||
if lookup_map_row(tag_manifest):
|
|
||||||
return False
|
|
||||||
|
|
||||||
# Check for a pre-existing manifest matching the digest in the repository. This can happen
|
|
||||||
# if we've already created the manifest row (typically for tag reverision).
|
|
||||||
try:
|
|
||||||
manifest_row = Manifest.get(digest=manifest.digest, repository=tag_manifest.tag.repository)
|
|
||||||
except Manifest.DoesNotExist:
|
|
||||||
# Create the new-style rows for the manifest.
|
|
||||||
try:
|
|
||||||
manifest_row = populate_manifest(tag_manifest.tag.repository, manifest,
|
|
||||||
tag_manifest.tag.image, storage_ids)
|
|
||||||
except IntegrityError:
|
|
||||||
# Pre-empted.
|
|
||||||
return False
|
|
||||||
|
|
||||||
# Create the mapping row. If we find another was created for this tag manifest in the
|
|
||||||
# meantime, then we've been preempted.
|
|
||||||
try:
|
|
||||||
TagManifestToManifest.create(tag_manifest=tag_manifest, manifest=manifest_row,
|
|
||||||
broken=is_broken)
|
|
||||||
return True
|
|
||||||
except IntegrityError:
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
logging.config.fileConfig(logfile_path(debug=False), disable_existing_loggers=False)
|
|
||||||
|
|
||||||
if not app.config.get('BACKFILL_TAG_MANIFESTS', False):
|
|
||||||
logger.debug('Manifest backfill disabled; skipping')
|
|
||||||
while True:
|
|
||||||
time.sleep(100000)
|
|
||||||
|
|
||||||
worker = ManifestBackfillWorker()
|
|
||||||
worker.start()
|
|
Reference in a new issue