Merge pull request #2974 from coreos-inc/joseph.schorr/QS-118/manifest-write-query
Audit the number of SQL queries we make in writing manifests, and significantly reduce in the common case
This commit is contained in:
commit
fd1237cff9
8 changed files with 145 additions and 66 deletions
|
@ -421,8 +421,8 @@ def get_image_layers(image):
|
|||
return image_list
|
||||
|
||||
|
||||
def synthesize_v1_image(repo, image_storage, docker_image_id, created_date_str,
|
||||
comment, command, v1_json_metadata, parent_image=None):
|
||||
def synthesize_v1_image(repo, image_storage_id, storage_image_size, docker_image_id,
|
||||
created_date_str, comment, command, v1_json_metadata, parent_image=None):
|
||||
""" Find an existing image with this docker image id, and if none exists, write one with the
|
||||
specified metadata.
|
||||
"""
|
||||
|
@ -439,13 +439,13 @@ def synthesize_v1_image(repo, image_storage, docker_image_id, created_date_str,
|
|||
pass
|
||||
|
||||
# Get the aggregate size for the image.
|
||||
aggregate_size = _basequery.calculate_image_aggregate_size(ancestors, image_storage.image_size,
|
||||
aggregate_size = _basequery.calculate_image_aggregate_size(ancestors, storage_image_size,
|
||||
parent_image)
|
||||
|
||||
try:
|
||||
return Image.create(docker_image_id=docker_image_id, ancestors=ancestors, comment=comment,
|
||||
command=command, v1_json_metadata=v1_json_metadata, created=created,
|
||||
storage=image_storage, repository=repo, parent=parent_image,
|
||||
storage=image_storage_id, repository=repo, parent=parent_image,
|
||||
aggregate_size=aggregate_size)
|
||||
except IntegrityError:
|
||||
return Image.get(docker_image_id=docker_image_id, repository=repo)
|
||||
|
|
|
@ -246,13 +246,16 @@ def create_or_update_tag(namespace_name, repository_name, tag_name, tag_docker_i
|
|||
except Repository.DoesNotExist:
|
||||
raise DataModelException('Invalid repository %s/%s' % (namespace_name, repository_name))
|
||||
|
||||
return create_or_update_tag_for_repo(repo.id, tag_name, tag_docker_image_id, reversion=reversion)
|
||||
|
||||
def create_or_update_tag_for_repo(repository_id, tag_name, tag_docker_image_id, reversion=False):
|
||||
now_ts = get_epoch_timestamp()
|
||||
|
||||
with db_transaction():
|
||||
try:
|
||||
tag = db_for_update(_tag_alive(RepositoryTag
|
||||
.select()
|
||||
.where(RepositoryTag.repository == repo,
|
||||
.where(RepositoryTag.repository == repository_id,
|
||||
RepositoryTag.name == tag_name), now_ts)).get()
|
||||
tag.lifetime_end_ts = now_ts
|
||||
tag.save()
|
||||
|
@ -263,16 +266,17 @@ def create_or_update_tag(namespace_name, repository_name, tag_name, tag_docker_i
|
|||
raise StaleTagException(msg % tag_name)
|
||||
|
||||
try:
|
||||
image_obj = Image.get(Image.docker_image_id == tag_docker_image_id, Image.repository == repo)
|
||||
image_obj = Image.get(Image.docker_image_id == tag_docker_image_id,
|
||||
Image.repository == repository_id)
|
||||
except Image.DoesNotExist:
|
||||
raise DataModelException('Invalid image with id: %s' % tag_docker_image_id)
|
||||
|
||||
try:
|
||||
return RepositoryTag.create(repository=repo, image=image_obj, name=tag_name,
|
||||
return RepositoryTag.create(repository=repository_id, image=image_obj, name=tag_name,
|
||||
lifetime_start_ts=now_ts, reversion=reversion)
|
||||
except IntegrityError:
|
||||
msg = 'Tag with name %s and lifetime start %s under repository %s/%s already exists'
|
||||
raise TagAlreadyCreatedException(msg % (tag_name, now_ts, namespace_name, repository_name))
|
||||
msg = 'Tag with name %s and lifetime start %s already exists'
|
||||
raise TagAlreadyCreatedException(msg % (tag_name, now_ts))
|
||||
|
||||
|
||||
def create_temporary_hidden_tag(repo, image_obj, expiration_s):
|
||||
|
@ -504,13 +508,27 @@ def restore_tag_to_image(repo_obj, tag_name, docker_image_id):
|
|||
return existing_image
|
||||
|
||||
|
||||
def store_tag_manifest(namespace, repo_name, tag_name, docker_image_id, manifest_digest,
|
||||
def store_tag_manifest(namespace_name, repository_name, tag_name, docker_image_id, manifest_digest,
|
||||
manifest_data, reversion=False):
|
||||
""" Stores a tag manifest for a specific tag name in the database. Returns the TagManifest
|
||||
object, as well as a boolean indicating whether the TagManifest was created.
|
||||
"""
|
||||
try:
|
||||
repo = _basequery.get_existing_repository(namespace_name, repository_name)
|
||||
except Repository.DoesNotExist:
|
||||
raise DataModelException('Invalid repository %s/%s' % (namespace_name, repository_name))
|
||||
|
||||
return store_tag_manifest_for_repo(repo.id, tag_name, docker_image_id, manifest_digest,
|
||||
manifest_data, reversion=False)
|
||||
|
||||
def store_tag_manifest_for_repo(repository_id, tag_name, docker_image_id, manifest_digest,
|
||||
manifest_data, reversion=False):
|
||||
""" Stores a tag manifest for a specific tag name in the database. Returns the TagManifest
|
||||
object, as well as a boolean indicating whether the TagManifest was created.
|
||||
"""
|
||||
with db_transaction():
|
||||
tag = create_or_update_tag(namespace, repo_name, tag_name, docker_image_id, reversion=reversion)
|
||||
tag = create_or_update_tag_for_repo(repository_id, tag_name, docker_image_id,
|
||||
reversion=reversion)
|
||||
|
||||
try:
|
||||
manifest = TagManifest.get(digest=manifest_digest)
|
||||
|
|
|
@ -56,7 +56,7 @@ def gen_basic_auth(username, password):
|
|||
|
||||
|
||||
def conduct_call(client, resource, url_for, method, params, body=None, expected_code=200,
|
||||
headers=None):
|
||||
headers=None, raw_body=None):
|
||||
""" Conducts a call to a Flask endpoint. """
|
||||
params = add_csrf_param(params)
|
||||
|
||||
|
@ -68,6 +68,9 @@ def conduct_call(client, resource, url_for, method, params, body=None, expected_
|
|||
if body is not None:
|
||||
body = json.dumps(body)
|
||||
|
||||
if raw_body is not None:
|
||||
body = raw_body
|
||||
|
||||
# Required for anonymous calls to not exception.
|
||||
g.identity = Identity(None, 'none')
|
||||
|
||||
|
|
|
@ -157,7 +157,7 @@ def _write_manifest(namespace_name, repo_name, manifest):
|
|||
raise ManifestInvalid(detail={'message': 'manifest does not reference any layers'})
|
||||
|
||||
# Ensure all the blobs in the manifest exist.
|
||||
storage_map = model.lookup_blobs_by_digest(namespace_name, repo_name, manifest.checksums)
|
||||
storage_map = model.lookup_blobs_by_digest(repo, manifest.checksums)
|
||||
for layer in manifest.layers:
|
||||
digest_str = str(layer.digest)
|
||||
if digest_str not in storage_map:
|
||||
|
@ -166,30 +166,35 @@ def _write_manifest(namespace_name, repo_name, manifest):
|
|||
# Lookup all the images and their parent images (if any) inside the manifest.
|
||||
# This will let us know which v1 images we need to synthesize and which ones are invalid.
|
||||
all_image_ids = list(manifest.parent_image_ids | manifest.image_ids)
|
||||
images_map = model.get_docker_v1_metadata_by_image_id(namespace_name, repo_name, all_image_ids)
|
||||
images_map = model.get_docker_v1_metadata_by_image_id(repo, all_image_ids)
|
||||
|
||||
# Rewrite any v1 image IDs that do not match the checksum in the database.
|
||||
try:
|
||||
# TODO: make this batch and read the parent image from the previous iteration, rather than
|
||||
# reloading it.
|
||||
rewritten_images = list(manifest.rewrite_invalid_image_ids(images_map))
|
||||
for rewritten_image in rewritten_images:
|
||||
model.synthesize_v1_image(
|
||||
repo,
|
||||
storage_map[rewritten_image.content_checksum],
|
||||
rewritten_image.image_id,
|
||||
rewritten_image.created,
|
||||
rewritten_image.comment,
|
||||
rewritten_image.command,
|
||||
rewritten_image.compat_json,
|
||||
rewritten_image.parent_image_id,)
|
||||
if not rewritten_image.image_id in images_map:
|
||||
model.synthesize_v1_image(
|
||||
repo,
|
||||
storage_map[rewritten_image.content_checksum],
|
||||
rewritten_image.image_id,
|
||||
rewritten_image.created,
|
||||
rewritten_image.comment,
|
||||
rewritten_image.command,
|
||||
rewritten_image.compat_json,
|
||||
rewritten_image.parent_image_id,
|
||||
)
|
||||
except ManifestException as me:
|
||||
logger.exception("exception when rewriting v1 metadata")
|
||||
raise ManifestInvalid(detail={'message': 'failed synthesizing v1 metadata: %s' % me.message})
|
||||
|
||||
# Store the manifest pointing to the tag.
|
||||
leaf_layer_id = rewritten_images[-1].image_id
|
||||
newly_created = model.save_manifest(namespace_name, repo_name, manifest.tag, leaf_layer_id,
|
||||
manifest.digest, manifest.bytes)
|
||||
newly_created = model.save_manifest(repo, manifest.tag, leaf_layer_id, manifest.digest,
|
||||
manifest.bytes)
|
||||
if newly_created:
|
||||
# TODO: make this batch
|
||||
labels = []
|
||||
for key, value in manifest.layers[-1].v1_metadata.labels.iteritems():
|
||||
media_type = 'application/json' if is_json(value) else 'text/plain'
|
||||
|
@ -219,11 +224,12 @@ def _write_manifest_and_log(namespace_name, repo_name, manifest):
|
|||
'OK',
|
||||
status=202,
|
||||
headers={
|
||||
'Docker-Content-Digest':
|
||||
manifest.digest,
|
||||
'Docker-Content-Digest': manifest.digest,
|
||||
'Location':
|
||||
url_for('v2.fetch_manifest_by_digest', repository='%s/%s' % (namespace_name, repo_name),
|
||||
manifest_ref=manifest.digest),},)
|
||||
manifest_ref=manifest.digest),
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
@v2_bp.route(MANIFEST_DIGEST_ROUTE, methods=['DELETE'])
|
||||
|
|
|
@ -41,7 +41,7 @@ class BlobUpload(
|
|||
"""
|
||||
|
||||
|
||||
class Blob(namedtuple('Blob', ['uuid', 'digest', 'size', 'locations', 'cas_path'])):
|
||||
class Blob(namedtuple('Blob', ['id', 'uuid', 'digest', 'size', 'locations', 'cas_path'])):
|
||||
"""
|
||||
Blob represents an opaque binary blob saved to the storage system.
|
||||
"""
|
||||
|
@ -122,7 +122,7 @@ class DockerRegistryV2DataInterface(object):
|
|||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get_docker_v1_metadata_by_image_id(self, namespace_name, repo_name, docker_image_ids):
|
||||
def get_docker_v1_metadata_by_image_id(self, repository, docker_image_ids):
|
||||
"""
|
||||
Returns a map of Docker V1 metadata for each given image ID, matched under the repository with
|
||||
the given namespace and name. Returns an empty map if the matching repository was not found.
|
||||
|
@ -156,8 +156,8 @@ class DockerRegistryV2DataInterface(object):
|
|||
pass
|
||||
|
||||
@abstractmethod
|
||||
def save_manifest(self, namespace_name, repo_name, tag_name, leaf_layer_docker_id,
|
||||
manifest_digest, manifest_bytes):
|
||||
def save_manifest(self, repository, tag_name, leaf_layer_docker_id, manifest_digest,
|
||||
manifest_bytes):
|
||||
"""
|
||||
Saves a manifest pointing to the given leaf image, with the given manifest, under the matching
|
||||
repository as a tag with the given name.
|
||||
|
@ -264,3 +264,9 @@ class DockerRegistryV2DataInterface(object):
|
|||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def lookup_blobs_by_digest(self, repository, digests):
|
||||
"""
|
||||
Looks up all blobs with the matching digests under the given repository.
|
||||
"""
|
||||
pass
|
||||
|
|
|
@ -75,15 +75,12 @@ class PreOCIModel(DockerRegistryV2DataInterface):
|
|||
except DataModelException:
|
||||
return None
|
||||
|
||||
def get_docker_v1_metadata_by_image_id(self, namespace_name, repo_name, docker_image_ids):
|
||||
repo = model.repository.get_repository(namespace_name, repo_name)
|
||||
if repo is None:
|
||||
return {}
|
||||
|
||||
images_query = model.image.lookup_repository_images(repo, docker_image_ids)
|
||||
def get_docker_v1_metadata_by_image_id(self, repository, docker_image_ids):
|
||||
images_query = model.image.lookup_repository_images(repository.id, docker_image_ids)
|
||||
return {
|
||||
image.docker_image_id: _docker_v1_metadata(namespace_name, repo_name, image)
|
||||
for image in images_query}
|
||||
image.docker_image_id: _docker_v1_metadata(repository.namespace_name, repository.name, image)
|
||||
for image in images_query
|
||||
}
|
||||
|
||||
def get_parents_docker_v1_metadata(self, namespace_name, repo_name, docker_image_id):
|
||||
repo_image = model.image.get_repo_image(namespace_name, repo_name, docker_image_id)
|
||||
|
@ -104,29 +101,21 @@ class PreOCIModel(DockerRegistryV2DataInterface):
|
|||
|
||||
def synthesize_v1_image(self, repository, storage, image_id, created, comment, command,
|
||||
compat_json, parent_image_id):
|
||||
repo = model.repository.get_repository(repository.namespace_name, repository.name)
|
||||
if repo is None:
|
||||
raise DataModelException('Unknown repository: %s/%s' % (repository.namespace_name,
|
||||
repository.name))
|
||||
|
||||
parent_image = None
|
||||
if parent_image_id is not None:
|
||||
parent_image = model.image.get_image(repo, parent_image_id)
|
||||
parent_image = model.image.get_image(repository.id, parent_image_id)
|
||||
if parent_image is None:
|
||||
raise DataModelException('Unknown parent image: %s' % parent_image_id)
|
||||
|
||||
storage_obj = model.storage.get_storage_by_uuid(storage.uuid)
|
||||
if storage_obj is None:
|
||||
raise DataModelException('Unknown storage: %s' % storage.uuid)
|
||||
repo_image = model.image.synthesize_v1_image(repository.id, storage.id, storage.size,
|
||||
image_id, created, comment, command, compat_json,
|
||||
parent_image)
|
||||
return _docker_v1_metadata(repository.namespace_name, repository.name, repo_image)
|
||||
|
||||
repo_image = model.image.synthesize_v1_image(repo, storage_obj, image_id, created, comment,
|
||||
command, compat_json, parent_image)
|
||||
return _docker_v1_metadata(repo.namespace_user.username, repo.name, repo_image)
|
||||
|
||||
def save_manifest(self, namespace_name, repo_name, tag_name, leaf_layer_docker_id,
|
||||
manifest_digest, manifest_bytes):
|
||||
(_, newly_created) = model.tag.store_tag_manifest(
|
||||
namespace_name, repo_name, tag_name, leaf_layer_docker_id, manifest_digest, manifest_bytes)
|
||||
def save_manifest(self, repository, tag_name, leaf_layer_docker_id, manifest_digest,
|
||||
manifest_bytes):
|
||||
(_, newly_created) = model.tag.store_tag_manifest_for_repo(
|
||||
repository.id, tag_name, leaf_layer_docker_id, manifest_digest, manifest_bytes)
|
||||
return newly_created
|
||||
|
||||
def repository_tags(self, namespace_name, repo_name, limit, offset):
|
||||
|
@ -208,15 +197,18 @@ class PreOCIModel(DockerRegistryV2DataInterface):
|
|||
namespace_name, repo_name, blob_digest, location_obj.id, blob_upload.byte_count,
|
||||
expiration_sec, blob_upload.uncompressed_byte_count)
|
||||
return Blob(
|
||||
id=blob_record.id,
|
||||
uuid=blob_record.uuid,
|
||||
digest=blob_digest,
|
||||
size=blob_upload.byte_count,
|
||||
locations=[blob_upload.location_name],
|
||||
cas_path=blob_record.cas_path)
|
||||
cas_path=blob_record.cas_path
|
||||
)
|
||||
|
||||
def lookup_blobs_by_digest(self, namespace_name, repo_name, digests):
|
||||
def lookup_blobs_by_digest(self, repository, digests):
|
||||
def _blob_view(blob_record):
|
||||
return Blob(
|
||||
id=blob_record.id,
|
||||
uuid=blob_record.uuid,
|
||||
digest=blob_record.content_checksum,
|
||||
size=blob_record.image_size,
|
||||
|
@ -224,21 +216,20 @@ class PreOCIModel(DockerRegistryV2DataInterface):
|
|||
locations=None, # Note: Locations is None in this case.
|
||||
)
|
||||
|
||||
repo = model.repository.get_repository(namespace_name, repo_name)
|
||||
if repo is None:
|
||||
return {}
|
||||
query = model.storage.lookup_repo_storages_by_content_checksum(repo, digests)
|
||||
query = model.storage.lookup_repo_storages_by_content_checksum(repository.id, digests)
|
||||
return {storage.content_checksum: _blob_view(storage) for storage in query}
|
||||
|
||||
def get_blob_by_digest(self, namespace_name, repo_name, digest):
|
||||
try:
|
||||
blob_record = model.blob.get_repo_blob_by_digest(namespace_name, repo_name, digest)
|
||||
return Blob(
|
||||
id=blob_record.id,
|
||||
uuid=blob_record.uuid,
|
||||
digest=digest,
|
||||
size=blob_record.image_size,
|
||||
locations=blob_record.locations,
|
||||
cas_path=blob_record.cas_path)
|
||||
cas_path=blob_record.cas_path
|
||||
)
|
||||
except model.BlobDoesNotExist:
|
||||
return None
|
||||
|
||||
|
@ -286,7 +277,8 @@ def _docker_v1_metadata(namespace_name, repo_name, repo_image):
|
|||
comment=repo_image.comment,
|
||||
command=repo_image.command,
|
||||
# TODO: make sure this isn't needed anywhere, as it is expensive to lookup
|
||||
parent_image_id=None,)
|
||||
parent_image_id=None,
|
||||
)
|
||||
|
||||
|
||||
def _repository_for_repo(repo):
|
||||
|
|
54
endpoints/v2/test/test_manifest.py
Normal file
54
endpoints/v2/test/test_manifest.py
Normal file
|
@ -0,0 +1,54 @@
|
|||
import hashlib
|
||||
import pytest
|
||||
import time
|
||||
|
||||
from mock import patch
|
||||
|
||||
from flask import url_for
|
||||
from playhouse.test_utils import count_queries
|
||||
|
||||
from app import instance_keys, app as realapp
|
||||
from data import model
|
||||
from endpoints.test.shared import conduct_call
|
||||
from util.security.registry_jwt import generate_bearer_token, build_context_and_subject
|
||||
from test.fixtures import *
|
||||
|
||||
def test_e2e_query_count_manifest_norewrite(client, app):
|
||||
tag_manifest = model.tag.load_tag_manifest('devtable', 'simple', 'latest')
|
||||
|
||||
params = {
|
||||
'repository': 'devtable/simple',
|
||||
'manifest_ref': tag_manifest.digest,
|
||||
}
|
||||
|
||||
user = model.user.get_user('devtable')
|
||||
access = [{
|
||||
'type': 'repository',
|
||||
'name': 'devtable/simple',
|
||||
'actions': ['pull', 'push'],
|
||||
}]
|
||||
|
||||
context, subject = build_context_and_subject(user=user)
|
||||
token = generate_bearer_token(realapp.config['SERVER_HOSTNAME'], subject, context, access, 600,
|
||||
instance_keys)
|
||||
|
||||
headers = {
|
||||
'Authorization': 'Bearer %s' % token,
|
||||
}
|
||||
|
||||
# Conduct a call to prime the instance key and other caches.
|
||||
conduct_call(client, 'v2.write_manifest_by_digest', url_for, 'PUT', params, expected_code=202,
|
||||
headers=headers, raw_body=tag_manifest.json_data)
|
||||
|
||||
timecode = time.time()
|
||||
def get_time():
|
||||
return timecode + 10
|
||||
|
||||
with patch('time.time', get_time):
|
||||
# Necessary in order to have the tag updates not occurr in the same second, which is the
|
||||
# granularity supported currently.
|
||||
with count_queries() as counter:
|
||||
conduct_call(client, 'v2.write_manifest_by_digest', url_for, 'PUT', params, expected_code=202,
|
||||
headers=headers, raw_body=tag_manifest.json_data)
|
||||
|
||||
assert counter.count < 15
|
|
@ -165,7 +165,7 @@ class DockerSchema1Manifest(object):
|
|||
try:
|
||||
validate_schema(self._parsed, DockerSchema1Manifest.METASCHEMA)
|
||||
except ValidationError as ve:
|
||||
raise MalformedSchema1Manifest('malformed manifest data: %s' % ve)
|
||||
raise MalformedSchema1Manifest('manifest data does not match schema: %s' % ve)
|
||||
|
||||
self._signatures = self._parsed[DOCKER_SCHEMA1_SIGNATURES_KEY]
|
||||
self._tag = self._parsed[DOCKER_SCHEMA1_REPO_TAG_KEY]
|
||||
|
|
Reference in a new issue