import logging import time from contextlib import contextmanager from collections import namedtuple import bitmath import resumablehashlib from data.registry_model import registry_model from data.database import CloseForLongOperation, db_transaction from digest import digest_tools from util.registry.filelike import wrap_with_handler, StreamSlice from util.registry.gzipstream import calculate_size_handler from util.registry.torrent import PieceHasher logger = logging.getLogger(__name__) BLOB_CONTENT_TYPE = 'application/octet-stream' class BlobUploadException(Exception): """ Base for all exceptions raised when uploading blobs. """ class BlobDigestMismatchException(BlobUploadException): """ Exception raised if the digest requested does not match that of the contents uploaded. """ class BlobTooLargeException(BlobUploadException): """ Exception raised if the data uploaded exceeds the maximum_blob_size. """ def __init__(self, uploaded, max_allowed): super(BlobTooLargeException, self).__init__() self.uploaded = uploaded self.max_allowed = max_allowed BlobUploadSettings = namedtuple('BlobUploadSettings', ['maximum_blob_size', 'bittorrent_piece_size', 'committed_blob_expiration']) def create_blob_upload(repository_ref, storage, settings, extra_blob_stream_handlers=None): """ Creates a new blob upload in the specified repository and returns a manager for interacting with that upload. Returns None if a new blob upload could not be started. """ location_name = storage.preferred_locations[0] new_upload_uuid, upload_metadata = storage.initiate_chunked_upload(location_name) blob_upload = registry_model.create_blob_upload(repository_ref, new_upload_uuid, location_name, upload_metadata) if blob_upload is None: return None return _BlobUploadManager(repository_ref, blob_upload, settings, storage, extra_blob_stream_handlers) def retrieve_blob_upload_manager(repository_ref, blob_upload_id, storage, settings): """ Retrieves the manager for an in-progress blob upload with the specified ID under the given repository or None if none. """ blob_upload = registry_model.lookup_blob_upload(repository_ref, blob_upload_id) if blob_upload is None: return None return _BlobUploadManager(repository_ref, blob_upload, settings, storage) @contextmanager def upload_blob(repository_ref, storage, settings, extra_blob_stream_handlers=None): """ Starts a new blob upload in the specified repository and yields a manager for interacting with that upload. When the context manager completes, the blob upload is deleted, whether committed to a blob or not. Yields None if a blob upload could not be started. """ created = create_blob_upload(repository_ref, storage, settings, extra_blob_stream_handlers) if not created: yield None return try: yield created except Exception as ex: logger.exception('Exception when uploading blob `%s`', created.blob_upload_id) raise ex finally: # Cancel the upload if something went wrong or it was not commit to a blob. if created.committed_blob is None: created.cancel_upload() class _BlobUploadManager(object): """ Defines a helper class for easily interacting with blob uploads in progress, including handling of database and storage calls. """ def __init__(self, repository_ref, blob_upload, settings, storage, extra_blob_stream_handlers=None): self.repository_ref = repository_ref self.blob_upload = blob_upload self.settings = settings self.storage = storage self.extra_blob_stream_handlers = extra_blob_stream_handlers self.committed_blob = None @property def blob_upload_id(self): """ Returns the unique ID for the blob upload. """ return self.blob_upload.upload_id def upload_chunk(self, app_config, input_fp, start_offset=0, length=-1, metric_queue=None): """ Uploads a chunk of data found in the given input file-like interface. start_offset and length are optional and should match a range header if any was given. If metric_queue is given, the upload time and chunk size are written into the metrics in the queue. Returns the total number of bytes uploaded after this upload has completed. Raises a BlobUploadException if the upload failed. """ assert start_offset is not None assert length is not None if start_offset > 0 and start_offset > self.blob_upload.byte_count: logger.error('start_offset provided greater than blob_upload.byte_count') return None # Ensure that we won't go over the allowed maximum size for blobs. max_blob_size = bitmath.parse_string_unsafe(self.settings.maximum_blob_size) uploaded = bitmath.Byte(length + start_offset) if length > -1 and uploaded > max_blob_size: raise BlobTooLargeException(uploaded=uploaded.bytes, max_allowed=max_blob_size.bytes) location_set = {self.blob_upload.location_name} upload_error = None with CloseForLongOperation(app_config): if start_offset > 0 and start_offset < self.blob_upload.byte_count: # Skip the bytes which were received on a previous push, which are already stored and # included in the sha calculation overlap_size = self.blob_upload.byte_count - start_offset input_fp = StreamSlice(input_fp, overlap_size) # Update our upload bounds to reflect the skipped portion of the overlap start_offset = self.blob_upload.byte_count length = max(length - overlap_size, 0) # We use this to escape early in case we have already processed all of the bytes the user # wants to upload. if length == 0: return self.blob_upload.byte_count input_fp = wrap_with_handler(input_fp, self.blob_upload.sha_state.update) if self.extra_blob_stream_handlers: for handler in self.extra_blob_stream_handlers: input_fp = wrap_with_handler(input_fp, handler) # Add a hasher for calculating SHA1s for torrents if this is the first chunk and/or we have # already calculated hash data for the previous chunk(s). piece_hasher = None if self.blob_upload.chunk_count == 0 or self.blob_upload.piece_sha_state: initial_sha1_value = self.blob_upload.piece_sha_state or resumablehashlib.sha1() initial_sha1_pieces_value = self.blob_upload.piece_hashes or '' piece_hasher = PieceHasher(self.settings.bittorrent_piece_size, start_offset, initial_sha1_pieces_value, initial_sha1_value) input_fp = wrap_with_handler(input_fp, piece_hasher.update) # If this is the first chunk and we're starting at the 0 offset, add a handler to gunzip the # stream so we can determine the uncompressed size. We'll throw out this data if another chunk # comes in, but in the common case the docker client only sends one chunk. size_info = None if start_offset == 0 and self.blob_upload.chunk_count == 0: size_info, fn = calculate_size_handler() input_fp = wrap_with_handler(input_fp, fn) start_time = time.time() length_written, new_metadata, upload_error = self.storage.stream_upload_chunk( location_set, self.blob_upload.upload_id, start_offset, length, input_fp, self.blob_upload.storage_metadata, content_type=BLOB_CONTENT_TYPE, ) if upload_error is not None: logger.error('storage.stream_upload_chunk returned error %s', upload_error) raise BlobUploadException(upload_error) # Update the chunk upload time metric. if metric_queue is not None: metric_queue.chunk_upload_time.Observe(time.time() - start_time, labelvalues=[ length_written, list(location_set)[0]]) # Ensure we have not gone beyond the max layer size. new_blob_bytes = self.blob_upload.byte_count + length_written new_blob_size = bitmath.Byte(new_blob_bytes) if new_blob_size > max_blob_size: raise BlobTooLargeException(uploaded=new_blob_size, max_allowed=max_blob_size.bytes) # If we determined an uncompressed size and this is the first chunk, add it to the blob. # Otherwise, we clear the size from the blob as it was uploaded in multiple chunks. uncompressed_byte_count = self.blob_upload.uncompressed_byte_count if size_info is not None and self.blob_upload.chunk_count == 0 and size_info.is_valid: uncompressed_byte_count = size_info.uncompressed_size elif length_written > 0: # Otherwise, if we wrote some bytes and the above conditions were not met, then we don't # know the uncompressed size. uncompressed_byte_count = None piece_hashes = None piece_sha_state = None if piece_hasher is not None: piece_hashes = piece_hasher.piece_hashes piece_sha_state = piece_hasher.hash_fragment self.blob_upload = registry_model.update_blob_upload(self.blob_upload, uncompressed_byte_count, piece_hashes, piece_sha_state, new_metadata, new_blob_bytes, self.blob_upload.chunk_count + 1, self.blob_upload.sha_state) if self.blob_upload is None: raise BlobUploadException('Could not complete upload of chunk') return new_blob_bytes def cancel_upload(self): """ Cancels the blob upload, deleting any data uploaded and removing the upload itself. """ # Tell storage to cancel the chunked upload, deleting its contents. self.storage.cancel_chunked_upload({self.blob_upload.location_name}, self.blob_upload.upload_id, self.blob_upload.storage_metadata) # Remove the blob upload record itself. registry_model.delete_blob_upload(self.blob_upload) def commit_to_blob(self, app_config, expected_digest=None): """ Commits the blob upload to a blob under the repository. The resulting blob will be marked to not be GCed for some period of time (as configured by `committed_blob_expiration`). If expected_digest is specified, the content digest of the data uploaded for the blob is compared to that given and, if it does not match, a BlobDigestMismatchException is raised. The digest given must be of type `Digest` and not a string. """ # Compare the content digest. if expected_digest is not None: self._validate_digest(expected_digest) # Finalize the storage. storage_already_existed = self._finalize_blob_storage(app_config) # Convert the upload to a blob. computed_digest_str = digest_tools.sha256_digest_from_hashlib(self.blob_upload.sha_state) with db_transaction(): blob = registry_model.commit_blob_upload(self.blob_upload, computed_digest_str, self.settings.committed_blob_expiration) if blob is None: return None # Save torrent hash information (if available). if self.blob_upload.piece_sha_state is not None and not storage_already_existed: piece_bytes = self.blob_upload.piece_hashes + self.blob_upload.piece_sha_state.digest() registry_model.set_torrent_info(blob, self.settings.bittorrent_piece_size, piece_bytes) self.committed_blob = blob return blob def _validate_digest(self, expected_digest): """ Verifies that the digest's SHA matches that of the uploaded data. """ computed_digest = digest_tools.sha256_digest_from_hashlib(self.blob_upload.sha_state) if not digest_tools.digests_equal(computed_digest, expected_digest): logger.error('Digest mismatch for upload %s: Expected digest %s, found digest %s', self.blob_upload.upload_id, expected_digest, computed_digest) raise BlobDigestMismatchException() def _finalize_blob_storage(self, app_config): """ When an upload is successful, this ends the uploading process from the storage's perspective. Returns True if the blob already existed. """ computed_digest = digest_tools.sha256_digest_from_hashlib(self.blob_upload.sha_state) final_blob_location = digest_tools.content_path(computed_digest) # Close the database connection before we perform this operation, as it can take a while # and we shouldn't hold the connection during that time. with CloseForLongOperation(app_config): # Move the storage into place, or if this was a re-upload, cancel it already_existed = self.storage.exists({self.blob_upload.location_name}, final_blob_location) if already_existed: # It already existed, clean up our upload which served as proof that the # uploader had the blob. self.storage.cancel_chunked_upload({self.blob_upload.location_name}, self.blob_upload.upload_id, self.blob_upload.storage_metadata) else: # We were the first ones to upload this image (at least to this location) # Let's copy it into place self.storage.complete_chunked_upload({self.blob_upload.location_name}, self.blob_upload.upload_id, final_blob_location, self.blob_upload.storage_metadata) return already_existed