Fix pulling of squashed versions of the legacy image in a manifest lists

This commit is contained in:
Joseph Schorr 2018-11-20 16:36:49 +02:00
parent 001768c043
commit 1f03fdb27e
10 changed files with 198 additions and 31 deletions

View file

@ -197,6 +197,13 @@ class RegistryDataInterface(object):
Returns None if the manifest could not be parsed and validated.
"""
@abstractmethod
def list_parsed_manifest_layers(self, repository_ref, parsed_manifest, include_placements=False):
""" Returns an *ordered list* of the layers found in the parsed manifest, starting at the base
and working towards the leaf, including the associated Blob and its placements
(if specified).
"""
@abstractmethod
def lookup_derived_image(self, manifest, verb, varying_metadata=None, include_placements=False):
"""
@ -205,8 +212,8 @@ class RegistryDataInterface(object):
"""
@abstractmethod
def lookup_or_create_derived_image(self, manifest, verb, storage_location, varying_metadata=None,
include_placements=False):
def lookup_or_create_derived_image(self, manifest, verb, storage_location, storage,
varying_metadata=None, include_placements=False):
"""
Looks up the derived image for the given maniest, verb and optional varying metadata
and returns it. If none exists, a new derived image is created.

View file

@ -54,6 +54,30 @@ class OCIModel(SharedModel, RegistryDataInterface):
return tags_map
def _get_legacy_compatible_image_for_manifest(self, manifest, storage):
# Check for a legacy image directly on the manifest.
if manifest.media_type != DOCKER_SCHEMA2_MANIFESTLIST_CONTENT_TYPE:
return oci.shared.get_legacy_image_for_manifest(manifest._db_id)
# Otherwise, lookup a legacy image associated with the v1-compatible manifest
# in the list.
try:
manifest_obj = database.Manifest.get(id=manifest._db_id)
except database.Manifest.DoesNotExist:
logger.exception('Could not find manifest for manifest `%s`', manifest._db_id)
return None
# See if we can lookup a schema1 legacy image.
v1_compatible = self.get_schema1_parsed_manifest(manifest, '', '', '', storage)
if v1_compatible is None:
return None
v1_id = v1_compatible.leaf_layer_v1_image_id
if v1_id is None:
return None
return model.image.get_image(manifest_obj.repository_id, v1_id)
def find_matching_tag(self, repository_ref, tag_names):
""" Finds an alive tag in the repository matching one of the given tag names and returns it
or None if none.
@ -400,7 +424,13 @@ class OCIModel(SharedModel, RegistryDataInterface):
logger.exception('Could not find manifest for manifest `%s`', manifest._db_id)
return None
return self._list_manifest_layers(manifest, manifest_obj.repository_id, include_placements)
try:
parsed = manifest.get_parsed_manifest()
except ManifestException:
logger.exception('Could not parse and validate manifest `%s`', manifest._db_id)
return None
return self._list_manifest_layers(manifest_obj.repository_id, parsed, include_placements)
def lookup_derived_image(self, manifest, verb, varying_metadata=None, include_placements=False):
"""
@ -414,13 +444,14 @@ class OCIModel(SharedModel, RegistryDataInterface):
derived = model.image.find_derived_storage_for_image(legacy_image, verb, varying_metadata)
return self._build_derived(derived, verb, varying_metadata, include_placements)
def lookup_or_create_derived_image(self, manifest, verb, storage_location, varying_metadata=None,
def lookup_or_create_derived_image(self, manifest, verb, storage_location, storage,
varying_metadata=None,
include_placements=False):
"""
Looks up the derived image for the given maniest, verb and optional varying metadata
and returns it. If none exists, a new derived image is created.
"""
legacy_image = oci.shared.get_legacy_image_for_manifest(manifest._db_id)
legacy_image = self._get_legacy_compatible_image_for_manifest(manifest, storage)
if legacy_image is None:
return None

View file

@ -11,7 +11,8 @@ from data import model
from data.database import db_transaction
from data.registry_model.interface import RegistryDataInterface
from data.registry_model.datatypes import (Tag, Manifest, LegacyImage, Label,
SecurityScanStatus, ManifestLayer, Blob, DerivedImage)
SecurityScanStatus, ManifestLayer, Blob, DerivedImage,
RepositoryReference)
from data.registry_model.shared import SharedModel
from data.registry_model.label_handlers import apply_label_to_manifest
from image.docker.schema1 import (DockerSchema1ManifestBuilder, ManifestException,
@ -489,7 +490,14 @@ class PreOCIModel(SharedModel, RegistryDataInterface):
logger.exception('Could not find tag manifest for manifest `%s`', manifest._db_id)
return None
return self._list_manifest_layers(manifest, tag_manifest.tag.repository_id, include_placements)
try:
parsed = manifest.get_parsed_manifest()
except ManifestException:
logger.exception('Could not parse and validate manifest `%s`', manifest._db_id)
return None
repo_ref = RepositoryReference.for_id(tag_manifest.tag.repository_id)
return self.list_parsed_manifest_layers(repo_ref, parsed, include_placements)
def lookup_derived_image(self, manifest, verb, varying_metadata=None, include_placements=False):
"""
@ -506,8 +514,8 @@ class PreOCIModel(SharedModel, RegistryDataInterface):
derived = model.image.find_derived_storage_for_image(repo_image, verb, varying_metadata)
return self._build_derived(derived, verb, varying_metadata, include_placements)
def lookup_or_create_derived_image(self, manifest, verb, storage_location, varying_metadata=None,
include_placements=False):
def lookup_or_create_derived_image(self, manifest, verb, storage_location, storage,
varying_metadata=None, include_placements=False):
"""
Looks up the derived image for the given maniest, verb and optional varying metadata
and returns it. If none exists, a new derived image is created.

View file

@ -306,17 +306,18 @@ class SharedModel:
return LegacyImage.for_image(image, images_map=parent_images_map, blob=blob)
def _list_manifest_layers(self, manifest, repo_id, include_placements=False):
def list_parsed_manifest_layers(self, repository_ref, parsed_manifest, include_placements=False):
""" Returns an *ordered list* of the layers found in the parsed manifest, starting at the base
and working towards the leaf, including the associated Blob and its placements
(if specified).
"""
return self._list_manifest_layers(repository_ref._db_id, parsed_manifest, include_placements)
def _list_manifest_layers(self, repo_id, parsed, include_placements=False):
""" Returns an *ordered list* of the layers found in the manifest, starting at the base and
working towards the leaf, including the associated Blob and its placements (if specified).
Returns None if the manifest could not be parsed and validated.
"""
try:
parsed = manifest.get_parsed_manifest()
except ManifestException:
logger.exception('Could not parse and validate manifest `%s`', manifest._db_id)
return None
storage_map = {}
if parsed.local_blob_digests:
blob_query = model.storage.lookup_repo_storages_by_content_checksum(repo_id,
@ -331,11 +332,12 @@ class SharedModel:
digest_str = str(layer.digest)
if digest_str not in storage_map:
logger.error('Missing digest `%s` for manifest `%s`', layer.digest, manifest._db_id)
logger.error('Missing digest `%s` for manifest `%s`', layer.digest, parsed.digest)
return None
image_storage = storage_map[digest_str]
assert image_storage.cas_path is not None
assert image_storage.image_size is not None
placements = None
if include_placements:

View file

@ -1,5 +1,6 @@
import hashlib
import os
import tarfile
from io import BytesIO
@ -113,3 +114,36 @@ def test_extra_blob_stream_handlers(pre_oci_model):
assert ''.join(handler1_result) == 'hello world'
assert ''.join(handler2_result) == 'hello world'
def valid_tar_gz(contents):
layer_data = BytesIO()
tar_file = tarfile.open(fileobj=layer_data, mode='w|gz')
tar_file_info = tarfile.TarInfo(name='somefile')
tar_file_info.type = tarfile.REGTYPE
tar_file_info.size = len(contents)
tar_file_info.mtime = 1
tar_file.addfile(tar_file_info, BytesIO(contents))
tar_file.close()
layer_bytes = layer_data.getvalue()
layer_data.close()
return layer_bytes
def test_uncompressed_size(pre_oci_model):
repository_ref = pre_oci_model.lookup_repository('devtable', 'complex')
storage = DistributedStorage({'local_us': FakeStorage(None)}, ['local_us'])
settings = BlobUploadSettings('1K', 512 * 1024, 3600)
app_config = {'TESTING': True}
with upload_blob(repository_ref, storage, settings) as manager:
manager.upload_chunk(app_config, BytesIO(valid_tar_gz('hello world')))
blob = manager.commit_to_blob(app_config)
assert blob.compressed_size is not None
assert blob.uncompressed_size is not None

View file

@ -543,8 +543,10 @@ def test_derived_image(registry_model):
assert registry_model.lookup_derived_image(manifest, 'squash', {}) is None
# Create a new one.
squashed = registry_model.lookup_or_create_derived_image(manifest, 'squash', 'local_us', {})
assert registry_model.lookup_or_create_derived_image(manifest, 'squash', 'local_us', {}) == squashed
squashed = registry_model.lookup_or_create_derived_image(manifest, 'squash',
'local_us', storage, {})
assert registry_model.lookup_or_create_derived_image(manifest, 'squash',
'local_us', storage, {}) == squashed
assert squashed.unique_id
# Check and set the size.
@ -560,15 +562,15 @@ def test_derived_image(registry_model):
assert registry_model.lookup_derived_image(manifest, 'squash', {'foo': 'bar'}) is None
squashed_foo = registry_model.lookup_or_create_derived_image(manifest, 'squash', 'local_us',
{'foo': 'bar'})
storage, {'foo': 'bar'})
assert squashed_foo != squashed
assert registry_model.lookup_derived_image(manifest, 'squash', {'foo': 'bar'}) == squashed_foo
assert squashed.unique_id != squashed_foo.unique_id
# Lookup with placements.
squashed = registry_model.lookup_or_create_derived_image(manifest, 'squash', 'local_us', {},
include_placements=True)
squashed = registry_model.lookup_or_create_derived_image(manifest, 'squash', 'local_us',
storage, {}, include_placements=True)
assert squashed.blob.placements
# Delete the derived image.

View file

@ -43,7 +43,7 @@ class VerbReporter(TarLayerFormatterReporter):
metric_queue.verb_action_passes.Inc(labelvalues=[self.kind, pass_count])
def _open_stream(formatter, tag, manifest, schema1_manifest, derived_image_id, handlers, reporter):
def _open_stream(formatter, tag, schema1_manifest, derived_image_id, handlers, reporter):
"""
This method generates a stream of data which will be replicated and read from the queue files.
This method runs in a separate process.
@ -51,7 +51,8 @@ def _open_stream(formatter, tag, manifest, schema1_manifest, derived_image_id, h
# For performance reasons, we load the full image list here, cache it, then disconnect from
# the database.
with database.UseThenDisconnect(app.config):
layers = registry_model.list_manifest_layers(manifest, include_placements=True)
layers = registry_model.list_parsed_manifest_layers(tag.repository, schema1_manifest,
include_placements=True)
def image_stream_getter(store, blob):
def get_stream_for_storage():
@ -209,6 +210,9 @@ def _verify_repo_verb(_, namespace, repo_name, tag_name, verb, checker=None):
# Lookup the requested tag.
repo_ref = registry_model.lookup_repository(namespace, repo_name)
if repo_ref is None:
abort(404)
tag = registry_model.get_repo_tag(repo_ref, tag_name)
if tag is None:
logger.debug('Tag %s does not exist in repository %s/%s for user %s', tag, namespace, repo_name,
@ -221,7 +225,7 @@ def _verify_repo_verb(_, namespace, repo_name, tag_name, verb, checker=None):
logger.debug('Could not get manifest on %s/%s:%s::%s', namespace, repo_name, tag.name, verb)
abort(404)
# Ensure the manifest is not a list.
# Retrieve the schema1-compatible version of the manifest.
try:
schema1_manifest = registry_model.get_schema1_parsed_manifest(manifest, namespace,
repo_name, tag.name,
@ -294,8 +298,13 @@ def _repo_verb(namespace, repository, tag_name, verb, formatter, sign=False, che
# Lookup/create the derived image for the verb and repo image.
derived_image = registry_model.lookup_or_create_derived_image(
manifest, verb, storage.preferred_locations[0], varying_metadata={'tag': tag.name},
manifest, verb, storage.preferred_locations[0], storage,
varying_metadata={'tag': tag.name},
include_placements=True)
if derived_image is None:
logger.error('Could not create or lookup a derived image for manifest %s', manifest)
abort(400)
if not derived_image.blob.uploading:
logger.debug('Derived %s image %s exists in storage', verb, derived_image)
is_head_request = request.method == 'HEAD'
@ -337,7 +346,7 @@ def _repo_verb(namespace, repository, tag_name, verb, formatter, sign=False, che
# and send the results to the client and storage.
handlers = [hasher.update]
reporter = VerbReporter(verb)
args = (formatter, tag, manifest, schema1_manifest, derived_image.unique_id, handlers, reporter)
args = (formatter, tag, schema1_manifest, derived_image.unique_id, handlers, reporter)
queue_process = QueueProcess(
_open_stream,
8 * 1024,

View file

@ -213,7 +213,7 @@ class DockerSchema2Config(object):
command=history_entry[DOCKER_SCHEMA2_CONFIG_CREATED_BY_KEY],
is_empty=history_entry.get(DOCKER_SCHEMA2_CONFIG_EMPTY_LAYER_KEY, False))
def build_v1_compatibility(self, layer_index, v1_id, v1_parent_id):
def build_v1_compatibility(self, layer_index, v1_id, v1_parent_id, compressed_size=None):
""" Builds the V1 compatibility block for the given layer.
Note that the layer_index is 0-indexed, with the *base* layer being 0, and the leaf
@ -237,6 +237,9 @@ class DockerSchema2Config(object):
'Cmd': history[layer_index].command,
}
if compressed_size is not None:
v1_compatibility['Size'] = compressed_size
# The history and rootfs keys are schema2-config specific.
v1_compatibility.pop(DOCKER_SCHEMA2_CONFIG_HISTORY_KEY, None)
v1_compatibility.pop(DOCKER_SCHEMA2_CONFIG_ROOTFS_KEY, None)

View file

@ -30,7 +30,7 @@ DockerV2ManifestLayer = namedtuple('DockerV2ManifestLayer', ['index', 'digest',
'is_remote', 'urls',
'compressed_size'])
LayerWithV1ID = namedtuple('LayerWithV1ID', ['layer', 'v1_id', 'v1_parent_id'])
LayerWithV1ID = namedtuple('LayerWithV1ID', ['layer', 'v1_id', 'v1_parent_id', 'compressed_size'])
logger = logging.getLogger(__name__)
@ -270,7 +270,8 @@ class DockerSchema2Manifest(ManifestInterface):
digest_history.update(str(layer.index))
digest_history.update("|")
v1_layer_id = digest_history.hexdigest()
yield LayerWithV1ID(layer=layer, v1_id=v1_layer_id, v1_parent_id=v1_layer_parent_id)
yield LayerWithV1ID(layer=layer, v1_id=v1_layer_id, v1_parent_id=v1_layer_parent_id,
compressed_size=layer.compressed_size)
def populate_schema1_builder(self, v1_builder, content_retriever):
""" Populates a DockerSchema1ManifestBuilder with the layers and config from
@ -284,7 +285,8 @@ class DockerSchema2Manifest(ManifestInterface):
for layer_with_ids in reversed(layers): # Schema1 has layers in reverse order
v1_compatibility = schema2_config.build_v1_compatibility(layer_with_ids.layer.index,
layer_with_ids.v1_id,
layer_with_ids.v1_parent_id)
layer_with_ids.v1_parent_id,
layer_with_ids.compressed_size)
v1_builder.add_layer(str(layer_with_ids.layer.digest), json.dumps(v1_compatibility))
return v1_builder

View file

@ -1560,3 +1560,72 @@ def test_push_pull_manifest_list_duplicate_manifest(v22_protocol, basic_images,
# Pull and verify the manifest list.
v22_protocol.pull_list(liveserver_session, 'devtable', 'newrepo', 'latest', manifestlist,
credentials=credentials, options=options)
def test_squashed_image_unsupported(v22_protocol, basic_images, liveserver_session, liveserver,
app_reloader, data_model):
""" Test: Attempting to pull a squashed image for a manifest list without an amd64+linux entry.
"""
credentials = ('devtable', 'password')
if data_model != 'oci_model':
return
credentials = ('devtable', 'password')
options = ProtocolOptions()
# Build the manifest that will go in the list.
blobs = {}
manifest = v22_protocol.build_schema2(basic_images, blobs, options)
# Create and push the manifest list.
builder = DockerSchema2ManifestListBuilder()
builder.add_manifest(manifest, 'foobar', 'someos')
manifestlist = builder.build()
v22_protocol.push_list(liveserver_session, 'devtable', 'newrepo', 'latest', manifestlist,
[manifest], blobs,
credentials=credentials, options=options)
# Attempt to pull the squashed version.
response = liveserver_session.get('/c1/squash/devtable/newrepo/latest', auth=credentials)
assert response.status_code == 404
def test_squashed_image_manifest_list(v22_protocol, basic_images, liveserver_session, liveserver,
app_reloader, data_model):
""" Test: Pull a squashed image for a manifest list with an amd64+linux entry.
"""
credentials = ('devtable', 'password')
if data_model != 'oci_model':
return
credentials = ('devtable', 'password')
options = ProtocolOptions()
# Build the manifest that will go in the list.
blobs = {}
manifest = v22_protocol.build_schema2(basic_images, blobs, options)
# Create and push the manifest list.
builder = DockerSchema2ManifestListBuilder()
builder.add_manifest(manifest, 'amd64', 'linux')
manifestlist = builder.build()
v22_protocol.push_list(liveserver_session, 'devtable', 'newrepo', 'latest', manifestlist,
[manifest], blobs,
credentials=credentials, options=options)
# Pull the squashed version.
response = liveserver_session.get('/c1/squash/devtable/newrepo/latest', auth=credentials)
assert response.status_code == 200
# Verify the squashed image.
tar = tarfile.open(fileobj=StringIO(response.content))
expected_image_id = 'cdc6d6c0d07d2cbacfc579e49ce0c256c5084b9b2b16c1b1b0c45f26a12a4ba5'
expected_names = ['repositories',
expected_image_id,
'%s/json' % expected_image_id,
'%s/VERSION' % expected_image_id,
'%s/layer.tar' % expected_image_id]
assert tar.getnames() == expected_names