Merge pull request #3192 from quay/fix-blobs-stored-for-manifest
Fix blobs stored for manifest
This commit is contained in:
commit
c7baf6cf58
9 changed files with 93 additions and 48 deletions
|
@ -1407,14 +1407,12 @@ class ManifestBlob(BaseModel):
|
|||
repository = ForeignKeyField(Repository, index=True)
|
||||
manifest = ForeignKeyField(Manifest)
|
||||
blob = ForeignKeyField(ImageStorage)
|
||||
blob_index = IntegerField() # 0-indexed location of the blob in the manifest.
|
||||
|
||||
class Meta:
|
||||
database = db
|
||||
read_slaves = (read_slave,)
|
||||
indexes = (
|
||||
(('manifest', 'blob'), True),
|
||||
(('manifest', 'blob_index'), True),
|
||||
)
|
||||
|
||||
|
||||
|
|
|
@ -0,0 +1,28 @@
|
|||
"""Remove blob_index from ManifestBlob table
|
||||
|
||||
Revision ID: eafdeadcebc7
|
||||
Revises: 9093adccc784
|
||||
Create Date: 2018-08-07 15:57:54.001225
|
||||
|
||||
"""
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = 'eafdeadcebc7'
|
||||
down_revision = '9093adccc784'
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy.dialects import mysql
|
||||
|
||||
def upgrade(tables, tester):
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.drop_index('manifestblob_manifest_id_blob_index', table_name='manifestblob')
|
||||
op.drop_column('manifestblob', 'blob_index')
|
||||
# ### end Alembic commands ###
|
||||
|
||||
|
||||
def downgrade(tables, tester):
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.add_column('manifestblob', sa.Column('blob_index', mysql.INTEGER(display_width=11), autoincrement=False, nullable=False))
|
||||
op.create_index('manifestblob_manifest_id_blob_index', 'manifestblob', ['manifest_id', 'blob_index'], unique=True)
|
||||
# ### end Alembic commands ###
|
|
@ -4,7 +4,7 @@ from calendar import timegm
|
|||
from uuid import uuid4
|
||||
|
||||
from peewee import IntegrityError, JOIN, fn
|
||||
from data.model import (image, db_transaction, DataModelException, _basequery,
|
||||
from data.model import (image, storage, db_transaction, DataModelException, _basequery,
|
||||
InvalidManifestException, TagAlreadyCreatedException, StaleTagException,
|
||||
config)
|
||||
from data.database import (RepositoryTag, Repository, Image, ImageStorage, Namespace, TagManifest,
|
||||
|
@ -549,8 +549,8 @@ def restore_tag_to_image(repo_obj, tag_name, docker_image_id):
|
|||
return existing_image
|
||||
|
||||
|
||||
def store_tag_manifest(namespace_name, repository_name, tag_name, manifest, leaf_layer_id=None,
|
||||
reversion=False):
|
||||
def store_tag_manifest_for_testing(namespace_name, repository_name, tag_name, manifest,
|
||||
leaf_layer_id, storage_id_map):
|
||||
""" Stores a tag manifest for a specific tag name in the database. Returns the TagManifest
|
||||
object, as well as a boolean indicating whether the TagManifest was created.
|
||||
"""
|
||||
|
@ -559,22 +559,16 @@ def store_tag_manifest(namespace_name, repository_name, tag_name, manifest, leaf
|
|||
except Repository.DoesNotExist:
|
||||
raise DataModelException('Invalid repository %s/%s' % (namespace_name, repository_name))
|
||||
|
||||
return store_tag_manifest_for_repo(repo.id, tag_name, manifest, leaf_layer_id=leaf_layer_id,
|
||||
reversion=False)
|
||||
return store_tag_manifest_for_repo(repo.id, tag_name, manifest, leaf_layer_id, storage_id_map)
|
||||
|
||||
|
||||
def store_tag_manifest_for_repo(repository_id, tag_name, manifest, leaf_layer_id=None,
|
||||
def store_tag_manifest_for_repo(repository_id, tag_name, manifest, leaf_layer_id, storage_id_map,
|
||||
reversion=False):
|
||||
""" Stores a tag manifest for a specific tag name in the database. Returns the TagManifest
|
||||
object, as well as a boolean indicating whether the TagManifest was created.
|
||||
"""
|
||||
# Lookup all blobs in the manifest.
|
||||
blobs = ImageStorage.select().where(ImageStorage.content_checksum << list(manifest.blob_digests))
|
||||
blob_map = {blob.content_checksum: blob for blob in blobs}
|
||||
|
||||
docker_image_id = leaf_layer_id or manifest.leaf_layer_v1_image_id
|
||||
with db_transaction():
|
||||
tag = create_or_update_tag_for_repo(repository_id, tag_name, docker_image_id,
|
||||
tag = create_or_update_tag_for_repo(repository_id, tag_name, leaf_layer_id,
|
||||
reversion=reversion)
|
||||
|
||||
try:
|
||||
|
@ -583,7 +577,7 @@ def store_tag_manifest_for_repo(repository_id, tag_name, manifest, leaf_layer_id
|
|||
manifest.save()
|
||||
return manifest, False
|
||||
except TagManifest.DoesNotExist:
|
||||
return _create_manifest(tag, manifest, blob_map), True
|
||||
return _create_manifest(tag, manifest, storage_id_map), True
|
||||
|
||||
|
||||
def get_active_tag(namespace, repo_name, tag_name):
|
||||
|
@ -604,16 +598,12 @@ def get_possibly_expired_tag(namespace, repo_name, tag_name):
|
|||
Namespace.username == namespace)).get()
|
||||
|
||||
|
||||
def associate_generated_tag_manifest(namespace, repo_name, tag_name, manifest):
|
||||
def associate_generated_tag_manifest(namespace, repo_name, tag_name, manifest, storage_id_map):
|
||||
tag = get_active_tag(namespace, repo_name, tag_name)
|
||||
|
||||
# Lookup all blobs in the manifest.
|
||||
blobs = ImageStorage.select().where(ImageStorage.content_checksum << list(manifest.blob_digests))
|
||||
blob_map = {blob.content_checksum: blob for blob in blobs}
|
||||
return _create_manifest(tag, manifest, blob_map)
|
||||
return _create_manifest(tag, manifest, storage_id_map)
|
||||
|
||||
|
||||
def _create_manifest(tag, manifest, blob_map):
|
||||
def _create_manifest(tag, manifest, storage_id_map):
|
||||
media_type = Manifest.media_type.get_id(manifest.media_type)
|
||||
|
||||
with db_transaction():
|
||||
|
@ -622,17 +612,17 @@ def _create_manifest(tag, manifest, blob_map):
|
|||
ManifestLegacyImage.create(manifest=manifest_row, repository=tag.repository, image=tag.image)
|
||||
|
||||
blobs_created = set()
|
||||
for index, blob_digest in enumerate(reversed(manifest.blob_digests)):
|
||||
image_storage = blob_map.get(blob_digest)
|
||||
if image_storage is None:
|
||||
raise DataModelException('Missing blob for manifest')
|
||||
for blob_digest in reversed(manifest.blob_digests):
|
||||
image_storage_id = storage_id_map.get(blob_digest)
|
||||
if image_storage_id is None:
|
||||
logger.error('Missing blob for manifest `%s` in: %s', blob_digest, storage_id_map)
|
||||
raise DataModelException('Missing blob for manifest `%s`' % blob_digest)
|
||||
|
||||
if image_storage.id in blobs_created:
|
||||
if image_storage_id in blobs_created:
|
||||
continue
|
||||
|
||||
blobs_created.add(image_storage.id)
|
||||
ManifestBlob.create(manifest=manifest_row, repository=tag.repository, blob=image_storage,
|
||||
blob_index=index)
|
||||
ManifestBlob.create(manifest=manifest_row, repository=tag.repository, blob=image_storage_id)
|
||||
blobs_created.add(image_storage_id)
|
||||
|
||||
tag_manifest = TagManifest.create(tag=tag, digest=manifest.digest, json_data=manifest.bytes)
|
||||
TagManifestToManifest.create(tag_manifest=tag_manifest, manifest=manifest_row)
|
||||
|
|
|
@ -65,15 +65,17 @@ def create_image(docker_image_id, repository_obj, username):
|
|||
|
||||
def store_tag_manifest(namespace, repo_name, tag_name, image_id):
|
||||
builder = DockerSchema1ManifestBuilder(namespace, repo_name, tag_name)
|
||||
storage_id_map = {}
|
||||
try:
|
||||
image_storage = ImageStorage.select().where(~(ImageStorage.content_checksum >> None)).get()
|
||||
builder.add_layer(image_storage.content_checksum, '{"id": "foo"}')
|
||||
storage_id_map[image_storage.content_checksum] = image_storage.id
|
||||
except ImageStorage.DoesNotExist:
|
||||
pass
|
||||
|
||||
manifest = builder.build(docker_v2_signing_key)
|
||||
manifest_row, _ = model.tag.store_tag_manifest(namespace, repo_name, tag_name, manifest,
|
||||
leaf_layer_id=image_id)
|
||||
manifest_row, _ = model.tag.store_tag_manifest_for_testing(namespace, repo_name, tag_name,
|
||||
manifest, image_id, storage_id_map)
|
||||
return manifest_row
|
||||
|
||||
|
||||
|
|
|
@ -13,7 +13,7 @@ from data.database import (Image, RepositoryTag, ImageStorage, Repository, Manif
|
|||
from data.model.repository import create_repository
|
||||
from data.model.tag import (list_active_repo_tags, create_or_update_tag, delete_tag,
|
||||
get_matching_tags, _tag_alive, get_matching_tags_for_images,
|
||||
change_tag_expiration, get_active_tag, store_tag_manifest)
|
||||
change_tag_expiration, get_active_tag, store_tag_manifest_for_testing)
|
||||
from data.model.image import find_create_or_link_image
|
||||
from image.docker.schema1 import DockerSchema1ManifestBuilder
|
||||
from util.timedeltastring import convert_to_timedelta
|
||||
|
@ -220,21 +220,37 @@ def test_change_tag_expiration(expiration_offset, expected_offset, initialized_d
|
|||
assert (expected_end_date - end_date).total_seconds() < 5 # variance in test
|
||||
|
||||
|
||||
def test_store_tag_manifest(initialized_db):
|
||||
def random_storages():
|
||||
return list(ImageStorage.select().where(~(ImageStorage.content_checksum >> None)).limit(10))
|
||||
|
||||
|
||||
def repeated_storages():
|
||||
storages = list(ImageStorage.select().where(~(ImageStorage.content_checksum >> None)).limit(5))
|
||||
return storages + storages
|
||||
|
||||
|
||||
@pytest.mark.parametrize('get_storages', [
|
||||
random_storages,
|
||||
repeated_storages,
|
||||
])
|
||||
def test_store_tag_manifest(get_storages, initialized_db):
|
||||
# Create a manifest with some layers.
|
||||
builder = DockerSchema1ManifestBuilder('devtable', 'simple', 'sometag')
|
||||
|
||||
storages = list(ImageStorage.select().where(~(ImageStorage.content_checksum >> None)).limit(10))
|
||||
storages = get_storages()
|
||||
assert storages
|
||||
|
||||
repo = model.repository.get_repository('devtable', 'simple')
|
||||
storage_id_map = {}
|
||||
for index, storage in enumerate(storages):
|
||||
image_id = 'someimage%s' % index
|
||||
builder.add_layer(storage.content_checksum, json.dumps({'id': image_id}))
|
||||
find_create_or_link_image(image_id, repo, 'devtable', {}, 'local_us')
|
||||
storage_id_map[storage.content_checksum] = storage.id
|
||||
|
||||
manifest = builder.build(docker_v2_signing_key)
|
||||
tag_manifest, _ = store_tag_manifest('devtable', 'simple', 'sometag', manifest)
|
||||
tag_manifest, _ = store_tag_manifest_for_testing('devtable', 'simple', 'sometag', manifest,
|
||||
manifest.leaf_layer_v1_image_id, storage_id_map)
|
||||
|
||||
# Ensure we have the new-model expected rows.
|
||||
mapping_row = TagManifestToManifest.get(tag_manifest=tag_manifest)
|
||||
|
|
|
@ -157,10 +157,10 @@ def _write_manifest(namespace_name, repo_name, manifest):
|
|||
raise ManifestInvalid(detail={'message': 'manifest does not reference any layers'})
|
||||
|
||||
# Ensure all the blobs in the manifest exist.
|
||||
storage_map = model.lookup_blobs_by_digest(repo, manifest.checksums)
|
||||
blob_map = model.lookup_blobs_by_digest(repo, manifest.checksums)
|
||||
for layer in manifest.layers:
|
||||
digest_str = str(layer.digest)
|
||||
if digest_str not in storage_map:
|
||||
if digest_str not in blob_map:
|
||||
raise BlobUnknown(detail={'digest': digest_str})
|
||||
|
||||
# Lookup all the images and their parent images (if any) inside the manifest.
|
||||
|
@ -177,7 +177,7 @@ def _write_manifest(namespace_name, repo_name, manifest):
|
|||
if not rewritten_image.image_id in images_map:
|
||||
model.synthesize_v1_image(
|
||||
repo,
|
||||
storage_map[rewritten_image.content_checksum],
|
||||
blob_map[rewritten_image.content_checksum],
|
||||
rewritten_image.image_id,
|
||||
rewritten_image.created,
|
||||
rewritten_image.comment,
|
||||
|
@ -191,7 +191,7 @@ def _write_manifest(namespace_name, repo_name, manifest):
|
|||
|
||||
# Store the manifest pointing to the tag.
|
||||
leaf_layer_id = rewritten_images[-1].image_id
|
||||
newly_created = model.save_manifest(repo, manifest.tag, manifest, leaf_layer_id)
|
||||
newly_created = model.save_manifest(repo, manifest.tag, manifest, leaf_layer_id, blob_map)
|
||||
if newly_created:
|
||||
# TODO: make this batch
|
||||
labels = []
|
||||
|
@ -202,18 +202,18 @@ def _write_manifest(namespace_name, repo_name, manifest):
|
|||
|
||||
model.create_manifest_labels(namespace_name, repo_name, manifest.digest, labels)
|
||||
|
||||
return repo, storage_map
|
||||
return repo, blob_map
|
||||
|
||||
|
||||
def _write_manifest_and_log(namespace_name, repo_name, manifest):
|
||||
repo, storage_map = _write_manifest(namespace_name, repo_name, manifest)
|
||||
repo, blob_map = _write_manifest(namespace_name, repo_name, manifest)
|
||||
|
||||
# Queue all blob manifests for replication.
|
||||
if features.STORAGE_REPLICATION:
|
||||
with queue_replication_batch(namespace_name) as queue_storage_replication:
|
||||
for layer in manifest.layers:
|
||||
digest_str = str(layer.digest)
|
||||
queue_storage_replication(storage_map[digest_str])
|
||||
queue_storage_replication(blob_map[digest_str])
|
||||
|
||||
track_and_log('push_repo', repo, tag=manifest.tag)
|
||||
spawn_notification(repo, 'repo_push', {'updated_tags': [manifest.tag]})
|
||||
|
@ -254,6 +254,9 @@ def delete_manifest_by_digest(namespace_name, repo_name, manifest_ref):
|
|||
|
||||
|
||||
def _generate_and_store_manifest(namespace_name, repo_name, tag_name):
|
||||
""" Generates and stores a manifest for an existing V1-only tag. """
|
||||
# TODO(jschorr): Remove once we are fully on Manifest-based model.
|
||||
|
||||
# Find the v1 metadata for this image and its parents.
|
||||
v1_metadata = model.get_docker_v1_metadata_by_tag(namespace_name, repo_name, tag_name)
|
||||
parents_v1_metadata = model.get_parents_docker_v1_metadata(namespace_name, repo_name,
|
||||
|
|
|
@ -155,7 +155,7 @@ class DockerRegistryV2DataInterface(object):
|
|||
pass
|
||||
|
||||
@abstractmethod
|
||||
def save_manifest(self, repository, tag_name, manifest):
|
||||
def save_manifest(self, repository, tag_name, manifest, blob_map):
|
||||
"""
|
||||
Saves a manifest, under the matching repository as a tag with the given name.
|
||||
|
||||
|
|
|
@ -93,8 +93,15 @@ class PreOCIModel(DockerRegistryV2DataInterface):
|
|||
|
||||
def create_manifest_and_update_tag(self, namespace_name, repo_name, tag_name, manifest):
|
||||
assert isinstance(manifest, ManifestInterface)
|
||||
repo = model.repository.get_repository(namespace_name, repo_name)
|
||||
if repo is None:
|
||||
return
|
||||
|
||||
blob_map = self.lookup_blobs_by_digest(repo, manifest.checksums)
|
||||
storage_map = {blob.digest: blob.id for blob_digest, blob in blob_map.iteritems()}
|
||||
try:
|
||||
model.tag.associate_generated_tag_manifest(namespace_name, repo_name, tag_name, manifest)
|
||||
model.tag.associate_generated_tag_manifest(namespace_name, repo_name, tag_name, manifest,
|
||||
storage_map)
|
||||
except IntegrityError:
|
||||
# It's already there!
|
||||
pass
|
||||
|
@ -112,10 +119,11 @@ class PreOCIModel(DockerRegistryV2DataInterface):
|
|||
parent_image)
|
||||
return _docker_v1_metadata(repository.namespace_name, repository.name, repo_image)
|
||||
|
||||
def save_manifest(self, repository, tag_name, manifest, leaf_layer_id=None):
|
||||
def save_manifest(self, repository, tag_name, manifest, leaf_layer_id, blob_map):
|
||||
assert isinstance(manifest, ManifestInterface)
|
||||
storage_map = {blob.digest: blob.id for blob_digest, blob in blob_map.iteritems()}
|
||||
(_, newly_created) = model.tag.store_tag_manifest_for_repo(repository.id, tag_name, manifest,
|
||||
leaf_layer_id=leaf_layer_id)
|
||||
leaf_layer_id, storage_map)
|
||||
return newly_created
|
||||
|
||||
def repository_tags(self, namespace_name, repo_name, start_id, limit):
|
||||
|
|
|
@ -52,4 +52,4 @@ def test_e2e_query_count_manifest_norewrite(client, app):
|
|||
conduct_call(client, 'v2.write_manifest_by_digest', url_for, 'PUT', params, expected_code=202,
|
||||
headers=headers, raw_body=tag_manifest.json_data)
|
||||
|
||||
assert counter.count < 15
|
||||
assert counter.count <= 15
|
||||
|
|
Reference in a new issue