diff --git a/TODO.md b/TODO.md index 53fbdc5b2..0cb79709b 100644 --- a/TODO.md +++ b/TODO.md @@ -1,15 +1,10 @@ -- Convert the flattened image generator to use the database ancestry instead of the json file -- Convert verbs to load json from either db or storage -- Convert verbs to work with v1 and cas layer storage locations - Fix all tests - Fix uncompressed size backfill - File issue to move queries out of uncompressed size backfill and use subquery random - Consider removing the new jwest dependency - Update the max fresh on registry tokens, 300s is not long enough to complete all registry actions -- Fix the sizes stored in the db - Make sure we handle more of the v2 api than just what is required to push and pull - Handle registry API error conditions - Fill in the registry v2 methods on other storage engines - Write a script to backfill the json metadata - Verify the manifest, and throw the proper error if unverified -- Convert uploads to get locked to a placement, e.g. once an upload starts, all communication goes through that replica diff --git a/app.py b/app.py index 36047425c..15eb0a27b 100644 --- a/app.py +++ b/app.py @@ -106,6 +106,11 @@ for handler in logging.getLogger().handlers: app.request_class = RequestWithId +# Generate a secret key if none was specified. +if app.config['SECRET_KEY'] is None: + logger.debug('Generating in-memory secret key') + app.config['SECRET_KEY'] = generate_secret_key() + features.import_features(app.config) Principal(app, use_sessions=False) @@ -144,11 +149,6 @@ database.configure(app.config) model.config.app_config = app.config model.config.store = storage -# Generate a secret key if none was specified. -if app.config['SECRET_KEY'] is None: - logger.debug('Generating in-memory secret key') - app.config['SECRET_KEY'] = generate_secret_key() - @login_manager.user_loader def load_user(user_uuid): logger.debug('User loader loading deferred user with uuid: %s' % user_uuid) diff --git a/conf/server-base.conf b/conf/server-base.conf index 046121e24..185536110 100644 --- a/conf/server-base.conf +++ b/conf/server-base.conf @@ -30,10 +30,10 @@ location /realtime { proxy_request_buffering off; } -# At the begining and end of a push/pull, /v1/repositories is hit by the Docker +# At the begining and end of a push/pull, (/v1/repositories|/v2/auth/) is hit by the Docker # client. By rate-limiting just this endpoint, we can avoid accidentally # blocking pulls/pushes for images with many layers. -location /v1/repositories/ { +location ~ ^/(v1/repositories|v2/auth)/ { proxy_buffering off; proxy_request_buffering off; @@ -45,13 +45,14 @@ location /v1/repositories/ { limit_req zone=repositories burst=10; } -location /v1/ { +location ~ ^/(v1|v2)/ { proxy_buffering off; proxy_request_buffering off; proxy_http_version 1.1; proxy_set_header Connection ""; + proxy_set_header Host $host; proxy_pass http://registry_app_server; proxy_temp_path /tmp 1 2; diff --git a/data/model/blob.py b/data/model/blob.py index e97539b66..5820ba3b1 100644 --- a/data/model/blob.py +++ b/data/model/blob.py @@ -27,7 +27,7 @@ def get_repo_blob_by_digest(namespace, repo_name, blob_digest): return found -def store_blob_record_and_temp_link(namespace, repo_name, blob_digest, location_obj, +def store_blob_record_and_temp_link(namespace, repo_name, blob_digest, location_obj, byte_count, link_expiration_s): """ Store a record of the blob and temporarily link it to the specified repository. """ @@ -36,9 +36,12 @@ def store_blob_record_and_temp_link(namespace, repo_name, blob_digest, location_ repo = _basequery.get_existing_repository(namespace, repo_name) try: storage = ImageStorage.get(checksum=blob_digest) + storage.image_size = byte_count + storage.save() + ImageStoragePlacement.get(storage=storage, location=location_obj) except ImageStorage.DoesNotExist: - storage = ImageStorage.create(checksum=blob_digest, uploading=False) + storage = ImageStorage.create(checksum=blob_digest, uploading=False, image_size=byte_count) ImageStoragePlacement.create(storage=storage, location=location_obj) except ImageStoragePlacement.DoesNotExist: ImageStoragePlacement.create(storage=storage, location=location_obj) diff --git a/data/model/image.py b/data/model/image.py index d1ec8563b..be1d5b589 100644 --- a/data/model/image.py +++ b/data/model/image.py @@ -5,7 +5,7 @@ from peewee import JOIN_LEFT_OUTER, fn from datetime import datetime from data.model import (DataModelException, db_transaction, _basequery, storage, - InvalidImageException) + InvalidImageException, config) from data.database import (Image, Repository, ImageStoragePlacement, Namespace, ImageStorage, ImageStorageLocation, RepositoryPermission, db_for_update) @@ -79,7 +79,10 @@ def get_repository_images_base(namespace_name, repository_name, query_modifier): .where(Repository.name == repository_name, Namespace.username == namespace_name)) query = query_modifier(query) + return _translate_placements_to_images_with_locations(query) + +def _translate_placements_to_images_with_locations(query): location_list = list(query) images = {} @@ -113,7 +116,7 @@ def lookup_repository_images(namespace_name, repository_name, docker_image_ids): def get_matching_repository_images(namespace_name, repository_name, docker_image_ids): def modify_query(query): - return query.where(Image.docker_image_id << docker_image_ids) + return query.where(Image.docker_image_id << list(docker_image_ids)) return get_repository_images_base(namespace_name, repository_name, modify_query) @@ -360,6 +363,44 @@ def get_repo_image_by_storage_checksum(namespace, repository_name, storage_check raise InvalidImageException(msg) +def has_image_json(image): + """ Returns the whether there exists a JSON definition data for the image. """ + if image.v1_json_metadata: + return bool(image.v1_json_metadata) + + store = config.store + return store.exists(image.storage.locations, store.image_json_path(image.storage.uuid)) + + +def get_image_json(image): + """ Returns the JSON definition data for the image. """ + if image.v1_json_metadata: + return image.v1_json_metadata + + store = config.store + return store.get_content(image.storage.locations, store.image_json_path(image.storage.uuid)) + + +def get_image_layers(image): + """ Returns a list of the full layers of an image, including itself (if specified), sorted + from base image outward. """ + ancestors = image.ancestors.split('/')[1:-1] + image_ids = [ancestor_id for ancestor_id in ancestors if ancestor_id] + image_ids.append(str(image.id)) + + query = (ImageStoragePlacement + .select(ImageStoragePlacement, Image, ImageStorage, ImageStorageLocation) + .join(ImageStorageLocation) + .switch(ImageStoragePlacement) + .join(ImageStorage, JOIN_LEFT_OUTER) + .join(Image) + .where(Image.id << image_ids)) + + image_list = list(_translate_placements_to_images_with_locations(query)) + image_list.sort(key=lambda image: image_ids.index(str(image.id))) + return image_list + + def synthesize_v1_image(namespace, repository_name, storage_checksum, docker_image_id, created_date_str, comment, command, v1_json_metadata, parent_docker_id): """ Find an existing image with this docker image id, and if none exists, write one with the diff --git a/data/model/storage.py b/data/model/storage.py index 8d0c05bdf..c697c5de8 100644 --- a/data/model/storage.py +++ b/data/model/storage.py @@ -216,3 +216,15 @@ def get_repo_storage_by_checksum(namespace, repository_name, checksum): return _get_storage(filter_to_repo_and_checksum) except InvalidImageException: raise InvalidImageException('No storage found with checksum {0}'.format(checksum)) + + +def get_layer_path(storage_record): + """ Returns the path in the storage engine to the layer data referenced by the storage row. """ + store = config.store + if not storage_record.cas_path: + return store.v1_image_layer_path(storage_record.uuid) + + return store.blob_path(storage_record.checksum) + + + diff --git a/endpoints/api/image.py b/endpoints/api/image.py index 0814e75e1..3f21a8018 100644 --- a/endpoints/api/image.py +++ b/endpoints/api/image.py @@ -12,7 +12,7 @@ from util.cache import cache_control_flask_restful def image_view(image, image_map, include_ancestors=True): - # TODO: Remove once we've migrated all storage data to the image records. + # TODO: Remove this once we've migrated all storage data to the image records. storage_props = image if image.storage and image.storage.id: storage_props = image.storage diff --git a/endpoints/v1/registry.py b/endpoints/v1/registry.py index 2f9c9d5cb..42e63dc66 100644 --- a/endpoints/v1/registry.py +++ b/endpoints/v1/registry.py @@ -168,14 +168,13 @@ def put_image_layer(namespace, repository, image_id): repo_image = model.image.get_repo_image_extended(namespace, repository, image_id) try: logger.debug('Retrieving image data') - uuid = repo_image.storage.uuid - json_data = (repo_image.v1_json_metadata or - store.get_content(repo_image.storage.locations, store.image_json_path(uuid))) + json_data = model.image.get_image_json(repo_image) except (IOError, AttributeError): logger.exception('Exception when retrieving image data') abort(404, 'Image %(image_id)s not found', issue='unknown-image', image_id=image_id) + uuid = repo_image.storage.uuid layer_path = store.v1_image_layer_path(uuid) logger.info('Storing layer at v1 path: %s', layer_path) @@ -296,11 +295,8 @@ def put_image_checksum(namespace, repository, image_id): if not repo_image or not repo_image.storage: abort(404, 'Image not found: %(image_id)s', issue='unknown-image', image_id=image_id) - uuid = repo_image.storage.uuid - logger.debug('Looking up repo layer data') - if (repo_image.v1_json_metadata is None and - not store.exists(repo_image.storage.locations, store.image_json_path(uuid))): + if not model.image.has_image_json(repo_image): abort(404, 'Image not found: %(image_id)s', issue='unknown-image', image_id=image_id) logger.debug('Marking image path') @@ -353,9 +349,7 @@ def get_image_json(namespace, repository, image_id, headers): logger.debug('Looking up repo layer data') try: - uuid = repo_image.storage.uuid - data = (repo_image.v1_json_metadata or - store.get_content(repo_image.storage.locations, store.image_json_path(uuid))) + data = repo_image.get_image_json(repo_image) except (IOError, AttributeError): flask_abort(404) @@ -469,10 +463,7 @@ def put_image_json(namespace, repository, image_id): abort(400, 'Image %(image_id)s depends on non existing parent image %(parent_id)s', issue='invalid-request', image_id=image_id, parent_id=parent_id) - json_path = store.image_json_path(repo_image.storage.uuid) - if (not image_is_uploading(repo_image) and - (repo_image.v1_json_metadata is not None or - store.exists(repo_image.storage.locations, json_path))): + if not image_is_uploading(repo_image) and model.image.has_image_json(repo_image): exact_abort(409, 'Image already exists') set_uploading_flag(repo_image, True) diff --git a/endpoints/v2/blob.py b/endpoints/v2/blob.py index 013627d81..8e6ac3bb0 100644 --- a/endpoints/v2/blob.py +++ b/endpoints/v2/blob.py @@ -214,15 +214,19 @@ def _upload_chunk(namespace, repo_name, upload_uuid): def _finish_upload(namespace, repo_name, upload_obj, expected_digest): + # Verify that the digest's SHA matches that of the uploaded data. computed_digest = digest_tools.sha256_digest_from_hashlib(upload_obj.sha_state) if not digest_tools.digests_equal(computed_digest, expected_digest): raise BlobUploadInvalid() + # Mark the blob as uploaded. final_blob_location = digest_tools.content_path(expected_digest) storage.complete_chunked_upload({upload_obj.location.name}, upload_obj.uuid, final_blob_location) model.blob.store_blob_record_and_temp_link(namespace, repo_name, expected_digest, - upload_obj.location, + upload_obj.location, upload_obj.byte_count, app.config['PUSH_TEMP_TAG_EXPIRATION_SEC']) + + # Delete the upload tracking row. upload_obj.delete_instance() response = make_response('', 201) diff --git a/endpoints/v2/manifest.py b/endpoints/v2/manifest.py index 1b8cf1434..fcc7cd3d8 100644 --- a/endpoints/v2/manifest.py +++ b/endpoints/v2/manifest.py @@ -334,10 +334,7 @@ def __get_and_backfill_image_metadata(image): if image_metadata is None: logger.warning('Loading metadata from storage for image id: %s', image.id) - metadata_path = storage.image_json_path(image.storage.uuid) - image_metadata = storage.get_content(image.storage.locations, metadata_path) - image.v1_json_metadata = image_metadata - + image.v1_json_metadata = model.image.get_image_json(image) logger.info('Saving backfilled metadata for image id: %s', image.id) image.save() diff --git a/endpoints/v2/v2auth.py b/endpoints/v2/v2auth.py index 7c05e10a0..76f11b042 100644 --- a/endpoints/v2/v2auth.py +++ b/endpoints/v2/v2auth.py @@ -54,6 +54,8 @@ def generate_registry_jwt(): logger.debug('Scope request: %s', scope_param) user = get_authenticated_user() + if user is None: + abort(404) access = [] if scope_param is not None: diff --git a/endpoints/verbs.py b/endpoints/verbs.py index be0067d1d..560acbdd6 100644 --- a/endpoints/verbs.py +++ b/endpoints/verbs.py @@ -23,16 +23,11 @@ logger = logging.getLogger(__name__) def _open_stream(formatter, namespace, repository, tag, synthetic_image_id, image_json, - image_id_list): + image_list): store = Storage(app) - # For performance reasons, we load the full image list here, cache it, then disconnect from - # the database. - with database.UseThenDisconnect(app.config): - image_list = list(model.image.get_matching_repository_images(namespace, repository, - image_id_list)) - - image_list.sort(key=lambda image: image_id_list.index(image.docker_image_id)) + def get_image_json(image): + return json.loads(model.image.get_image_json(image)) def get_next_image(): for current_image in image_list: @@ -40,7 +35,7 @@ def _open_stream(formatter, namespace, repository, tag, synthetic_image_id, imag def get_next_layer(): for current_image_entry in image_list: - current_image_path = store.image_layer_path(current_image_entry.storage.uuid) + current_image_path = model.storage.get_layer_path(current_image_entry.storage) current_image_stream = store.stream_read_file(current_image_entry.storage.locations, current_image_path) @@ -49,7 +44,7 @@ def _open_stream(formatter, namespace, repository, tag, synthetic_image_id, imag yield current_image_stream stream = formatter.build_stream(namespace, repository, tag, synthetic_image_id, image_json, - get_next_image, get_next_layer) + get_next_image, get_next_layer, get_image_json) return stream.read @@ -88,10 +83,14 @@ def _write_synthetic_image_to_storage(verb, linked_storage_uuid, linked_location queue_file.add_exception_handler(handle_exception) - image_path = store.image_layer_path(linked_storage_uuid) + print "Starting write of sythn image" + + image_path = store.v1_image_layer_path(linked_storage_uuid) store.stream_write(linked_locations, image_path, queue_file) queue_file.close() + print "Done writing synth image" + if not queue_file.raised_exception: # Setup the database (since this is a new process) and then disconnect immediately # once the operation completes. @@ -119,13 +118,10 @@ def _verify_repo_verb(store, namespace, repository, tag, verb, checker=None): abort(404) # If there is a data checker, call it first. - uuid = repo_image.storage.uuid image_json = None if checker is not None: - image_json_data = store.get_content(repo_image.storage.locations, store.image_json_path(uuid)) - image_json = json.loads(image_json_data) - + image_json = json.loads(model.image.get_image_json(repo_image)) if not checker(image_json): logger.debug('Check mismatch on %s/%s:%s, verb %s', namespace, repository, tag, verb) abort(404) @@ -170,9 +166,9 @@ def _repo_verb(namespace, repository, tag, verb, formatter, sign=False, checker= derived = model.storage.find_or_create_derived_storage(repo_image.storage, verb, store.preferred_locations[0]) - if not derived.uploading: + if not derived.uploading and False: logger.debug('Derived %s image %s exists in storage', verb, derived.uuid) - derived_layer_path = store.image_layer_path(derived.uuid) + derived_layer_path = model.storage.get_layer_path(derived) download_url = store.get_direct_download_url(derived.locations, derived_layer_path) if download_url: logger.debug('Redirecting to download URL for derived %s image %s', verb, derived.uuid) @@ -184,17 +180,14 @@ def _repo_verb(namespace, repository, tag, verb, formatter, sign=False, checker= logger.debug('Sending cached derived %s image %s', verb, derived.uuid) return send_file(store.stream_read_file(derived.locations, derived_layer_path)) - # Load the ancestry for the image. - uuid = repo_image.storage.uuid + # Load the full image list for the image. + full_image_list = model.image.get_image_layers(repo_image) logger.debug('Building and returning derived %s image %s', verb, derived.uuid) - ancestry_data = store.get_content(repo_image.storage.locations, store.image_ancestry_path(uuid)) - full_image_list = json.loads(ancestry_data) # Load the image's JSON layer. if not image_json: - image_json_data = store.get_content(repo_image.storage.locations, store.image_json_path(uuid)) - image_json = json.loads(image_json_data) + image_json = json.loads(model.image.get_image_json(repo_image)) # Calculate a synthetic image ID. synthetic_image_id = hashlib.sha256(tag_image.docker_image_id + ':' + verb).hexdigest() diff --git a/formats/aci.py b/formats/aci.py index 718c35445..11a7a06ef 100644 --- a/formats/aci.py +++ b/formats/aci.py @@ -10,7 +10,7 @@ class ACIImage(TarImageFormatter): """ def stream_generator(self, namespace, repository, tag, synthetic_image_id, - layer_json, get_image_iterator, get_layer_iterator): + layer_json, get_image_iterator, get_layer_iterator, get_image_json): # ACI Format (.tar): # manifest - The JSON manifest # rootfs - The root file system diff --git a/formats/squashed.py b/formats/squashed.py index b26a069fd..c04b4aa7e 100644 --- a/formats/squashed.py +++ b/formats/squashed.py @@ -19,8 +19,13 @@ class SquashedDockerImage(TarImageFormatter): command. """ + # TODO(jschorr): Change this multiplier to reflect the 99%-tile of the actual difference between + # the uncompressed image size and the Size reported by Docker. + SIZE_MULTIPLIER = 2 + def stream_generator(self, namespace, repository, tag, synthetic_image_id, - layer_json, get_image_iterator, get_layer_iterator): + layer_json, get_image_iterator, get_layer_iterator, get_image_json): + # Docker import V1 Format (.tar): # repositories - JSON file containing a repo -> tag -> image map # {image ID folder}: @@ -52,7 +57,14 @@ class SquashedDockerImage(TarImageFormatter): # Yield the merged layer data's header. estimated_file_size = 0 for image in get_image_iterator(): - estimated_file_size += image.storage.uncompressed_size + # In V1 we have the actual uncompressed size, which is needed for back compat with + # older versions of Docker. + # In V2, we use the size given in the image JSON. + if image.storage.uncompressed_size: + estimated_file_size += image.storage.uncompressed_size + else: + image_json = get_image_json(image) + estimated_file_size += image_json.get('Size', 0) * SquashedDockerImage.SIZE_MULTIPLIER yield self.tar_file_header(synthetic_image_id + '/layer.tar', estimated_file_size) @@ -65,7 +77,8 @@ class SquashedDockerImage(TarImageFormatter): # If the yielded size is more than the estimated size (which is unlikely but possible), then # raise an exception since the tar header will be wrong. if yielded_size > estimated_file_size: - raise FileEstimationException() + message = "Expected %s bytes, found %s bytes" % (estimated_file_size, yielded_size) + raise FileEstimationException(message) # If the yielded size is less than the estimated size (which is likely), fill the rest with # zeros. diff --git a/formats/tarimageformatter.py b/formats/tarimageformatter.py index 361f6256d..38d3fb3ab 100644 --- a/formats/tarimageformatter.py +++ b/formats/tarimageformatter.py @@ -5,16 +5,17 @@ class TarImageFormatter(object): """ Base class for classes which produce a TAR containing image and layer data. """ def build_stream(self, namespace, repository, tag, synthetic_image_id, layer_json, - get_image_iterator, get_layer_iterator): + get_image_iterator, get_layer_iterator, get_image_json): """ Builds and streams a synthetic .tar.gz that represents the formatted TAR created by this class's implementation. """ return GzipWrap(self.stream_generator(namespace, repository, tag, synthetic_image_id, layer_json, - get_image_iterator, get_layer_iterator)) + get_image_iterator, get_layer_iterator, + get_image_json)) def stream_generator(self, namespace, repository, tag, synthetic_image_id, - layer_json, get_image_iterator, get_layer_iterator): + layer_json, get_image_iterator, get_layer_iterator, get_image_json): raise NotImplementedError def tar_file(self, name, contents): diff --git a/requirements-nover.txt b/requirements-nover.txt index de0dd0763..41483e4e2 100644 --- a/requirements-nover.txt +++ b/requirements-nover.txt @@ -53,3 +53,4 @@ python-keystoneclient Flask-Testing pyjwt toposort +pyjwkest diff --git a/requirements.txt b/requirements.txt index 0df1eaa41..ae7a83313 100644 --- a/requirements.txt +++ b/requirements.txt @@ -57,6 +57,7 @@ pyasn1==0.1.8 pycparser==2.14 pycrypto==2.6.1 pygpgme==0.3 +pyjwkest==1.0.3 PyJWT==1.3.0 PyMySQL==0.6.6 pyOpenSSL==0.15.1 diff --git a/storage/local.py b/storage/local.py index 6e18004e0..9495aed9c 100644 --- a/storage/local.py +++ b/storage/local.py @@ -61,9 +61,11 @@ class LocalStorage(BaseStorageV2): num_bytes < 0 copy until the stream ends. """ bytes_copied = 0 - bytes_remaining = num_bytes - while bytes_remaining > 0 or num_bytes < 0: - size_to_read = min(bytes_remaining, self.buffer_size) + while bytes_copied < num_bytes or num_bytes == -1: + size_to_read = min(num_bytes - bytes_copied, self.buffer_size) + if size_to_read < 0: + size_to_read = self.buffer_size + try: buf = in_fp.read(size_to_read) if not buf: diff --git a/util/registry/gzipwrap.py b/util/registry/gzipwrap.py index 604fa343d..685e5bb13 100644 --- a/util/registry/gzipwrap.py +++ b/util/registry/gzipwrap.py @@ -11,6 +11,9 @@ class GzipWrap(object): self.is_done = False def read(self, size=-1): + if size is None or size < 0: + raise Exception('Call to GzipWrap with unbound size will result in poor performance') + # If the buffer already has enough bytes, then simply pop them off of # the beginning and return them. if len(self.buffer) >= size or self.is_done: