Fixes to ensuring existing code can process schema 2 manifests

This commit is contained in:
Joseph Schorr 2018-11-13 17:13:51 +02:00
parent 9474fb7833
commit 7b9f56eff3
10 changed files with 91 additions and 21 deletions

View file

@ -282,7 +282,7 @@ class RegistryDataInterface(object):
""" """
Mounts the blob from another repository into the specified target repository, and adds an Mounts the blob from another repository into the specified target repository, and adds an
expiration before that blob is automatically GCed. This function is useful during push expiration before that blob is automatically GCed. This function is useful during push
operations if an existing blob from another repositroy is being pushed. Returns False if operations if an existing blob from another repository is being pushed. Returns False if
the mounting fails. Note that this function does *not* check security for mounting the blob the mounting fails. Note that this function does *not* check security for mounting the blob
and the caller is responsible for doing this check (an example can be found in and the caller is responsible for doing this check (an example can be found in
endpoints/v2/blob.py). endpoints/v2/blob.py).
@ -293,3 +293,7 @@ class RegistryDataInterface(object):
""" """
Sets the expiration on all tags that point to the given manifest to that specified. Sets the expiration on all tags that point to the given manifest to that specified.
""" """
@abstractmethod
def get_schema1_parsed_manifest(self, manifest, namespace_name, repo_name, tag_name, storage):
""" Returns the schema 1 version of this manifest, or None if none. """

View file

@ -8,9 +8,11 @@ from data import model
from data.model import oci, DataModelException from data.model import oci, DataModelException
from data.database import db_transaction, Image from data.database import db_transaction, Image
from data.registry_model.interface import RegistryDataInterface from data.registry_model.interface import RegistryDataInterface
from data.registry_model.datatypes import Tag, Manifest, LegacyImage, Label, SecurityScanStatus from data.registry_model.datatypes import (Tag, Manifest, LegacyImage, Label, SecurityScanStatus,
RepositoryReference)
from data.registry_model.shared import SharedModel from data.registry_model.shared import SharedModel
from data.registry_model.label_handlers import apply_label_to_manifest from data.registry_model.label_handlers import apply_label_to_manifest
from image.docker import ManifestException
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -420,5 +422,27 @@ class OCIModel(SharedModel, RegistryDataInterface):
""" """
oci.tag.set_tag_expiration_sec_for_manifest(manifest._db_id, expiration_sec) oci.tag.set_tag_expiration_sec_for_manifest(manifest._db_id, expiration_sec)
def get_schema1_parsed_manifest(self, manifest, namespace_name, repo_name, tag_name, storage):
""" Returns the schema 1 manifest for this manifest, or None if none. """
try:
parsed = manifest.get_parsed_manifest()
except ManifestException:
return None
try:
manifest_row = database.Manifest.get(id=manifest._db_id)
except database.Manifest.DoesNotExist:
return None
repository_ref = RepositoryReference.for_id(manifest_row.repository_id)
def _lookup_blob(digest):
blob = self.get_repo_blob_by_digest(repository_ref, digest, include_placements=True)
if blob is None:
return None
return storage.get_content(blob.placements, blob.storage_path)
return parsed.get_v1_compatible_manifest(namespace_name, repo_name, tag_name, _lookup_blob)
oci_model = OCIModel() oci_model = OCIModel()

View file

@ -527,4 +527,12 @@ class PreOCIModel(SharedModel, RegistryDataInterface):
model.tag.set_tag_expiration_for_manifest(tag_manifest, expiration_sec) model.tag.set_tag_expiration_for_manifest(tag_manifest, expiration_sec)
def get_schema1_parsed_manifest(self, manifest, namespace_name, repo_name, tag_name, storage):
""" Returns the schema 1 version of this manifest, or None if none. """
try:
return manifest.get_parsed_manifest()
except ManifestException:
return None
pre_oci_model = PreOCIModel() pre_oci_model = PreOCIModel()

View file

@ -317,7 +317,8 @@ class SharedModel:
logger.exception('Could not parse and validate manifest `%s`', manifest._db_id) logger.exception('Could not parse and validate manifest `%s`', manifest._db_id)
return None return None
blob_query = model.storage.lookup_repo_storages_by_content_checksum(repo_id, parsed.checksums) blob_query = model.storage.lookup_repo_storages_by_content_checksum(repo_id,
parsed.blob_digests)
storage_map = {blob.content_checksum: blob for blob in blob_query} storage_map = {blob.content_checksum: blob for blob in blob_query}
manifest_layers = [] manifest_layers = []

View file

@ -95,6 +95,9 @@ def test_lookup_manifests(repo_namespace, repo_name, registry_model):
assert found.legacy_image assert found.legacy_image
assert found.legacy_image.parents assert found.legacy_image.parents
schema1_parsed = registry_model.get_schema1_parsed_manifest(found, 'foo', 'bar', 'baz', storage)
assert schema1_parsed is not None
def test_lookup_unknown_manifest(registry_model): def test_lookup_unknown_manifest(registry_model):
repo = model.repository.get_repository('devtable', 'simple') repo = model.repository.get_repository('devtable', 'simple')

View file

@ -56,7 +56,13 @@ def fetch_manifest_by_tagname(namespace_name, repo_name, manifest_ref):
# Something went wrong. # Something went wrong.
raise ManifestInvalid() raise ManifestInvalid()
manifest = _rewrite_to_schema1_if_necessary(namespace_name, repo_name, manifest_ref, manifest) try:
parsed = manifest.get_parsed_manifest()
except ManifestException:
logger.exception('Got exception when trying to parse manifest `%s`', manifest_ref)
raise ManifestInvalid()
manifest = _rewrite_to_schema1_if_necessary(namespace_name, repo_name, manifest_ref, parsed)
if manifest is None: if manifest is None:
raise ManifestUnknown() raise ManifestUnknown()
@ -65,7 +71,7 @@ def fetch_manifest_by_tagname(namespace_name, repo_name, manifest_ref):
metric_queue.repository_pull.Inc(labelvalues=[namespace_name, repo_name, 'v2', True]) metric_queue.repository_pull.Inc(labelvalues=[namespace_name, repo_name, 'v2', True])
return Response( return Response(
manifest.manifest_bytes, manifest.bytes,
status=200, status=200,
headers={ headers={
'Content-Type': manifest.media_type, 'Content-Type': manifest.media_type,
@ -88,14 +94,20 @@ def fetch_manifest_by_digest(namespace_name, repo_name, manifest_ref):
if manifest is None: if manifest is None:
raise ManifestUnknown() raise ManifestUnknown()
manifest = _rewrite_to_schema1_if_necessary(namespace_name, repo_name, '$digest', manifest) try:
parsed = manifest.get_parsed_manifest()
except ManifestException:
logger.exception('Got exception when trying to parse manifest `%s`', manifest_ref)
raise ManifestInvalid()
manifest = _rewrite_to_schema1_if_necessary(namespace_name, repo_name, '$digest', parsed)
if manifest is None: if manifest is None:
raise ManifestUnknown() raise ManifestUnknown()
track_and_log('pull_repo', repository_ref, manifest_digest=manifest_ref) track_and_log('pull_repo', repository_ref, manifest_digest=manifest_ref)
metric_queue.repository_pull.Inc(labelvalues=[namespace_name, repo_name, 'v2', True]) metric_queue.repository_pull.Inc(labelvalues=[namespace_name, repo_name, 'v2', True])
return Response(manifest.manifest_bytes, status=200, headers={ return Response(manifest.bytes, status=200, headers={
'Content-Type': manifest.media_type, 'Content-Type': manifest.media_type,
'Docker-Content-Digest': manifest.digest, 'Docker-Content-Digest': manifest.digest,
}) })
@ -106,7 +118,8 @@ def _rewrite_to_schema1_if_necessary(namespace_name, repo_name, tag_name, manife
# media type is not in the Accept header, we return a schema 1 version of the manifest for # media type is not in the Accept header, we return a schema 1 version of the manifest for
# the amd64+linux platform, if any, or None if none. # the amd64+linux platform, if any, or None if none.
# See: https://docs.docker.com/registry/spec/manifest-v2-2 # See: https://docs.docker.com/registry/spec/manifest-v2-2
if len(request.accept_mimetypes) != 0 and manifest.media_type in request.accept_mimetypes: mimetypes = [mimetype for mimetype, _ in request.accept_mimetypes]
if manifest.media_type in mimetypes:
return manifest return manifest
def lookup_fn(config_or_manifest_digest): def lookup_fn(config_or_manifest_digest):

View file

@ -16,6 +16,7 @@ from data.registry_model import registry_model
from endpoints.decorators import anon_protect, anon_allowed, route_show_if, parse_repository_name from endpoints.decorators import anon_protect, anon_allowed, route_show_if, parse_repository_name
from endpoints.v2.blob import BLOB_DIGEST_ROUTE from endpoints.v2.blob import BLOB_DIGEST_ROUTE
from image.appc import AppCImageFormatter from image.appc import AppCImageFormatter
from image.docker import ManifestException
from image.docker.squashed import SquashedDockerImageFormatter from image.docker.squashed import SquashedDockerImageFormatter
from storage import Storage from storage import Storage
from util.audit import track_and_log, wrap_repository from util.audit import track_and_log, wrap_repository
@ -42,7 +43,7 @@ class VerbReporter(TarLayerFormatterReporter):
metric_queue.verb_action_passes.Inc(labelvalues=[self.kind, pass_count]) metric_queue.verb_action_passes.Inc(labelvalues=[self.kind, pass_count])
def _open_stream(formatter, tag, manifest, derived_image_id, handlers, reporter): def _open_stream(formatter, tag, manifest, schema1_manifest, derived_image_id, handlers, reporter):
""" """
This method generates a stream of data which will be replicated and read from the queue files. This method generates a stream of data which will be replicated and read from the queue files.
This method runs in a separate process. This method runs in a separate process.
@ -68,7 +69,7 @@ def _open_stream(formatter, tag, manifest, derived_image_id, handlers, reporter)
for layer in reversed(layers): for layer in reversed(layers):
yield image_stream_getter(store, layer.blob) yield image_stream_getter(store, layer.blob)
stream = formatter.build_stream(tag, manifest, derived_image_id, layers, stream = formatter.build_stream(tag, schema1_manifest, derived_image_id, layers,
tar_stream_getter_iterator, reporter=reporter) tar_stream_getter_iterator, reporter=reporter)
for handler_fn in handlers: for handler_fn in handlers:
@ -220,9 +221,21 @@ def _verify_repo_verb(_, namespace, repo_name, tag_name, verb, checker=None):
logger.debug('Could not get manifest on %s/%s:%s::%s', namespace, repo_name, tag.name, verb) logger.debug('Could not get manifest on %s/%s:%s::%s', namespace, repo_name, tag.name, verb)
abort(404) abort(404)
# Ensure the manifest is not a list.
try:
schema1_manifest = registry_model.get_schema1_parsed_manifest(manifest, namespace,
repo_name, tag.name,
storage)
except ManifestException:
logger.exception('Could not get manifest on %s/%s:%s::%s', namespace, repo_name, tag.name, verb)
abort(400)
if schema1_manifest is None:
abort(404)
# If there is a data checker, call it first. # If there is a data checker, call it first.
if checker is not None: if checker is not None:
if not checker(tag, manifest): if not checker(tag, schema1_manifest):
logger.debug('Check mismatch on %s/%s:%s, verb %s', namespace, repo_name, tag.name, verb) logger.debug('Check mismatch on %s/%s:%s, verb %s', namespace, repo_name, tag.name, verb)
abort(404) abort(404)
@ -230,12 +243,12 @@ def _verify_repo_verb(_, namespace, repo_name, tag_name, verb, checker=None):
assert tag.repository.namespace_name assert tag.repository.namespace_name
assert tag.repository.name assert tag.repository.name
return tag, manifest return tag, manifest, schema1_manifest
def _repo_verb_signature(namespace, repository, tag_name, verb, checker=None, **kwargs): def _repo_verb_signature(namespace, repository, tag_name, verb, checker=None, **kwargs):
# Verify that the tag exists and that we have access to it. # Verify that the tag exists and that we have access to it.
tag, manifest = _verify_repo_verb(storage, namespace, repository, tag_name, verb, checker) tag, manifest, _ = _verify_repo_verb(storage, namespace, repository, tag_name, verb, checker)
# Find the derived image storage for the verb. # Find the derived image storage for the verb.
derived_image = registry_model.lookup_derived_image(manifest, verb, derived_image = registry_model.lookup_derived_image(manifest, verb,
@ -261,7 +274,8 @@ def _repo_verb(namespace, repository, tag_name, verb, formatter, sign=False, che
# Verify that the image exists and that we have access to it. # Verify that the image exists and that we have access to it.
logger.debug('Verifying repo verb %s for repository %s/%s with user %s with mimetype %s', logger.debug('Verifying repo verb %s for repository %s/%s with user %s with mimetype %s',
verb, namespace, repository, get_authenticated_user(), request.accept_mimetypes.best) verb, namespace, repository, get_authenticated_user(), request.accept_mimetypes.best)
tag, manifest = _verify_repo_verb(storage, namespace, repository, tag_name, verb, checker) tag, manifest, schema1_manifest = _verify_repo_verb(storage, namespace, repository,
tag_name, verb, checker)
# Load the repository for later. # Load the repository for later.
repo = model.repository.get_repository(namespace, repository) repo = model.repository.get_repository(namespace, repository)
@ -323,7 +337,7 @@ def _repo_verb(namespace, repository, tag_name, verb, formatter, sign=False, che
# and send the results to the client and storage. # and send the results to the client and storage.
handlers = [hasher.update] handlers = [hasher.update]
reporter = VerbReporter(verb) reporter = VerbReporter(verb)
args = (formatter, tag, manifest, derived_image.unique_id, handlers, reporter) args = (formatter, tag, manifest, schema1_manifest, derived_image.unique_id, handlers, reporter)
queue_process = QueueProcess( queue_process = QueueProcess(
_open_stream, _open_stream,
8 * 1024, 8 * 1024,
@ -360,7 +374,7 @@ def _repo_verb(namespace, repository, tag_name, verb, formatter, sign=False, che
def os_arch_checker(os, arch): def os_arch_checker(os, arch):
def checker(tag, manifest): def checker(tag, manifest):
try: try:
image_json = json.loads(manifest.get_parsed_manifest().leaf_layer.raw_v1_metadata) image_json = json.loads(manifest.leaf_layer.raw_v1_metadata)
except ValueError: except ValueError:
logger.exception('Could not parse leaf layer JSON for manifest %s', manifest) logger.exception('Could not parse leaf layer JSON for manifest %s', manifest)
return False return False

View file

@ -18,10 +18,9 @@ class AppCImageFormatter(TarImageFormatter):
Image formatter which produces an tarball according to the AppC specification. Image formatter which produces an tarball according to the AppC specification.
""" """
def stream_generator(self, tag, manifest, synthetic_image_id, layer_iterator, def stream_generator(self, tag, parsed_manifest, synthetic_image_id, layer_iterator,
tar_stream_getter_iterator, reporter=None): tar_stream_getter_iterator, reporter=None):
image_mtime = 0 image_mtime = 0
parsed_manifest = manifest.get_parsed_manifest()
created = parsed_manifest.created_datetime created = parsed_manifest.created_datetime
if created is not None: if created is not None:
image_mtime = calendar.timegm(created.utctimetuple()) image_mtime = calendar.timegm(created.utctimetuple())

View file

@ -170,7 +170,7 @@ class DockerSchema2Config(object):
def __init__(self, config_bytes): def __init__(self, config_bytes):
self._config_bytes = config_bytes self._config_bytes = config_bytes
try: try:
self._parsed = json.loads(config_bytes) self._parsed = json.loads(config_bytes)
except ValueError as ve: except ValueError as ve:
@ -191,6 +191,11 @@ class DockerSchema2Config(object):
""" Returns the size of this config object. """ """ Returns the size of this config object. """
return len(self._config_bytes) return len(self._config_bytes)
@property
def bytes(self):
""" Returns the bytes of this config object. """
return self._config_bytes
@property @property
def labels(self): def labels(self):
""" Returns a dictionary of all the labels defined in this configuration. """ """ Returns a dictionary of all the labels defined in this configuration. """

View file

@ -28,10 +28,9 @@ class SquashedDockerImageFormatter(TarImageFormatter):
# daemon dies when trying to load the entire tar into memory. # daemon dies when trying to load the entire tar into memory.
SIZE_MULTIPLIER = 1.2 SIZE_MULTIPLIER = 1.2
def stream_generator(self, tag, manifest, synthetic_image_id, layer_iterator, def stream_generator(self, tag, parsed_manifest, synthetic_image_id, layer_iterator,
tar_stream_getter_iterator, reporter=None): tar_stream_getter_iterator, reporter=None):
image_mtime = 0 image_mtime = 0
parsed_manifest = manifest.get_parsed_manifest()
created = parsed_manifest.created_datetime created = parsed_manifest.created_datetime
if created is not None: if created is not None:
image_mtime = calendar.timegm(created.utctimetuple()) image_mtime = calendar.timegm(created.utctimetuple())