diff --git a/endpoints/v1/models_interface.py b/endpoints/v1/models_interface.py index a4d4455c9..d32a00931 100644 --- a/endpoints/v1/models_interface.py +++ b/endpoints/v1/models_interface.py @@ -79,9 +79,9 @@ class DockerRegistryV1DataInterface(object): pass @abstractmethod - def update_image_sizes(self, namespace_name, repo_name, image_id, size, uncompressed_size): + def update_image_blob(self, namespace_name, repo_name, image_id, blob): """ - Updates the sizing information for the image with the given V1 Docker ID. + Updates the blob for the image with the given V1 Docker ID. """ pass diff --git a/endpoints/v1/models_pre_oci.py b/endpoints/v1/models_pre_oci.py index 0b11eccdb..badaffce0 100644 --- a/endpoints/v1/models_pre_oci.py +++ b/endpoints/v1/models_pre_oci.py @@ -1,5 +1,6 @@ from app import app, storage as store from data import model +from data.database import db_transaction from endpoints.v1.models_interface import DockerRegistryV1DataInterface, Repository from util.morecollections import AttrDict @@ -56,8 +57,8 @@ class PreOCIModel(DockerRegistryV1DataInterface): if repo_image is None or repo_image.storage is None: return + assert repo_image.storage.content_checksum == content_checksum with model.db_transaction(): - repo_image.storage.content_checksum = content_checksum repo_image.v1_checksum = checksum repo_image.storage.save() repo_image.save() @@ -77,9 +78,19 @@ class PreOCIModel(DockerRegistryV1DataInterface): repo_image.storage.save() return repo_image.storage - def update_image_sizes(self, namespace_name, repo_name, image_id, size, uncompressed_size): - model.storage.set_image_storage_metadata(image_id, namespace_name, repo_name, size, - uncompressed_size) + def update_image_blob(self, namespace_name, repo_name, image_id, blob): + # Retrieve the existing image storage record and replace it with that given by the blob. + repo_image = model.image.get_repo_image_and_storage(namespace_name, repo_name, image_id) + if repo_image is None or repo_image.storage is None or not repo_image.storage.uploading: + return False + + with db_transaction(): + existing_storage = repo_image.storage + + repo_image.storage = blob._db_id + repo_image.save() + + existing_storage.delete_instance(recursive=True) def get_image_size(self, namespace_name, repo_name, image_id): repo_image = model.image.get_repo_image_and_storage(namespace_name, repo_name, image_id) diff --git a/endpoints/v1/registry.py b/endpoints/v1/registry.py index 330774df6..749babd51 100644 --- a/endpoints/v1/registry.py +++ b/endpoints/v1/registry.py @@ -12,6 +12,8 @@ from auth.auth_context import get_authenticated_user from auth.decorators import extract_namespace_repo_from_session, process_auth from auth.permissions import (ReadRepositoryPermission, ModifyRepositoryPermission) from data import model, database +from data.registry_model import registry_model +from data.registry_model.blobuploader import upload_blob, BlobUploadSettings, BlobUploadException from digest import checksums from endpoints.v1 import v1_bp from endpoints.v1.models_pre_oci import pre_oci_model as model @@ -26,14 +28,6 @@ from util.registry.torrent import PieceHasher logger = logging.getLogger(__name__) -def _finish_image(namespace, repository, image_id): - # Checksum is ok, we remove the marker - blob_ref = model.update_image_uploading(namespace, repository, image_id, False) - - # Send a job to the work queue to replicate the image layer. - queue_storage_replication(namespace, blob_ref) - - def require_completion(f): """This make sure that the image push correctly finished.""" @@ -183,51 +177,44 @@ def put_image_layer(namespace, repository, image_id): # encoding (Gunicorn) input_stream = request.environ['wsgi.input'] - # Create a socket reader to read the input stream containing the layer data. - sr = SocketReader(input_stream) + repository_ref = registry_model.lookup_repository(namespace, repository) + if repository_ref is None: + abort(404) + + expiration_sec = app.config['PUSH_TEMP_TAG_EXPIRATION_SEC'] + settings = BlobUploadSettings(maximum_blob_size=app.config['MAXIMUM_LAYER_SIZE'], + bittorrent_piece_size=app.config['BITTORRENT_PIECE_SIZE'], + committed_blob_expiration=expiration_sec) + + extra_handlers = [] # Add a handler that copies the data into a temp file. This is used to calculate the tarsum, # which is only needed for older versions of Docker. requires_tarsum = session.get('checksum_format') == 'tarsum' if requires_tarsum: tmp, tmp_hndlr = store.temp_store_handler() - sr.add_handler(tmp_hndlr) + extra_handlers.append(tmp_hndlr) - # Add a handler to compute the compressed and uncompressed sizes of the layer. - size_info, size_hndlr = gzipstream.calculate_size_handler() - sr.add_handler(size_hndlr) - - # Add a handler to hash the chunks of the upload for torrenting - piece_hasher = PieceHasher(app.config['BITTORRENT_PIECE_SIZE']) - sr.add_handler(piece_hasher.update) - - # Add a handler which computes the checksum. + # Add a handler which computes the simple Docker V1 checksum. h, sum_hndlr = checksums.simple_checksum_handler(v1_metadata.compat_json) - sr.add_handler(sum_hndlr) + extra_handlers.append(sum_hndlr) - # Add a handler which computes the content checksum only - ch, content_sum_hndlr = checksums.content_checksum_handler() - sr.add_handler(content_sum_hndlr) + uploaded_blob = None + try: + with upload_blob(repository_ref, store, settings, + extra_blob_stream_handlers=extra_handlers) as manager: + manager.upload_chunk(app.config, input_stream) + uploaded_blob = manager.commit_to_blob(app.config) + except BlobUploadException: + logger.exception('Exception when writing image data') + abort(520, 'Image %(image_id)s could not be written. Please try again.', image_id=image_id) - # Stream write the data to storage. - locations, path = model.placement_locations_and_path_docker_v1(namespace, repository, image_id) - with database.CloseForLongOperation(app.config): - try: - start_time = time() - store.stream_write(locations, path, sr) - metric_queue.chunk_size.Observe(size_info.compressed_size, labelvalues=[list(locations)[0]]) - metric_queue.chunk_upload_time.Observe(time() - start_time, labelvalues=[list(locations)[0]]) - except IOError: - logger.exception('Exception when writing image data') - abort(520, 'Image %(image_id)s could not be written. Please try again.', image_id=image_id) + # Save the blob for the image. + model.update_image_blob(namespace, repository, image_id, uploaded_blob) - # Save the size of the image. - model.update_image_sizes(namespace, repository, image_id, size_info.compressed_size, - size_info.uncompressed_size) - - # Save the BitTorrent pieces. - model.create_bittorrent_pieces(namespace, repository, image_id, - piece_hasher.final_piece_hashes()) + # Send a job to the work queue to replicate the image layer. + # TODO: move this into a better place. + queue_storage_replication(namespace, uploaded_blob) # Append the computed checksum. csums = [] @@ -245,7 +232,7 @@ def put_image_layer(namespace, repository, image_id): # We don't have a checksum stored yet, that's fine skipping the check. # Not removing the mark though, image is not downloadable yet. session['checksum'] = csums - session['content_checksum'] = 'sha256:{0}'.format(ch.hexdigest()) + session['content_checksum'] = uploaded_blob.digest return make_response('true', 200) # We check if the checksums provided matches one the one we computed @@ -254,9 +241,6 @@ def put_image_layer(namespace, repository, image_id): abort(400, 'Checksum mismatch; ignoring the layer for image %(image_id)s', issue='checksum-mismatch', image_id=image_id) - # Mark the image as uploaded. - _finish_image(namespace, repository, image_id) - return make_response('true', 200) @@ -306,20 +290,12 @@ def put_image_checksum(namespace, repository, image_id): if not v1_metadata.compat_json: abort(404, 'Image not found: %(image_id)s', issue='unknown-image', image_id=image_id) - logger.debug('Marking image path') - if not model.is_image_uploading(namespace, repository, image_id): - abort(409, 'Cannot set checksum for image %(image_id)s', issue='image-write-error', - image_id=image_id) - - logger.debug('Storing image and content checksums') - + logger.debug('Storing image and checksum') content_checksum = session.get('content_checksum', None) checksum_parts = checksum.split(':') if len(checksum_parts) != 2: abort(400, 'Invalid checksum format') - model.store_docker_v1_checksums(namespace, repository, image_id, checksum, content_checksum) - if checksum not in session.get('checksum', []): logger.debug('session checksums: %s', session.get('checksum', [])) logger.debug('client supplied checksum: %s', checksum) @@ -327,9 +303,6 @@ def put_image_checksum(namespace, repository, image_id): abort(400, 'Checksum mismatch for image: %(image_id)s', issue='checksum-mismatch', image_id=image_id) - # Mark the image as uploaded. - _finish_image(namespace, repository, image_id) - return make_response('true', 200)