diff --git a/data/database.py b/data/database.py index 297656c6c..b4d7ba986 100644 --- a/data/database.py +++ b/data/database.py @@ -784,6 +784,8 @@ class BlobUpload(BaseModel): sha_state = ResumableSHAField(null=True, default=resumablehashlib.sha256) location = ForeignKeyField(ImageStorageLocation) storage_metadata = JSONField(null=True, default={}) + chunk_count = IntegerField(default=0) + uncompressed_byte_count = IntegerField(null=True) class Meta: database = db diff --git a/data/migrations/versions/403d02fea323_add_new_blobupload_columns.py b/data/migrations/versions/403d02fea323_add_new_blobupload_columns.py new file mode 100644 index 000000000..cb5b74317 --- /dev/null +++ b/data/migrations/versions/403d02fea323_add_new_blobupload_columns.py @@ -0,0 +1,28 @@ +"""Add new blobupload columns + +Revision ID: 403d02fea323 +Revises: 10b999e8db1f +Create Date: 2015-11-30 14:25:46.822730 + +""" + +# revision identifiers, used by Alembic. +revision = '403d02fea323' +down_revision = '10b999e8db1f' + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import mysql + +def upgrade(tables): + ### commands auto generated by Alembic - please adjust! ### + op.add_column('blobupload', sa.Column('chunk_count', sa.Integer(), nullable=False)) + op.add_column('blobupload', sa.Column('uncompressed_byte_count', sa.Integer(), nullable=True)) + ### end Alembic commands ### + + +def downgrade(tables): + ### commands auto generated by Alembic - please adjust! ### + op.drop_column('blobupload', 'uncompressed_byte_count') + op.drop_column('blobupload', 'chunk_count') + ### end Alembic commands ### diff --git a/data/model/blob.py b/data/model/blob.py index b05ce6100..263978ec1 100644 --- a/data/model/blob.py +++ b/data/model/blob.py @@ -26,7 +26,7 @@ def get_repo_blob_by_digest(namespace, repo_name, blob_digest): def store_blob_record_and_temp_link(namespace, repo_name, blob_digest, location_obj, byte_count, - link_expiration_s): + link_expiration_s, uncompressed_byte_count=None): """ Store a record of the blob and temporarily link it to the specified repository. """ random_image_name = str(uuid4()) @@ -35,6 +35,7 @@ def store_blob_record_and_temp_link(namespace, repo_name, blob_digest, location_ try: storage = ImageStorage.get(content_checksum=blob_digest) storage.image_size = byte_count + storage.uncompressed_size = uncompressed_byte_count storage.save() ImageStoragePlacement.get(storage=storage, location=location_obj) diff --git a/endpoints/v2/blob.py b/endpoints/v2/blob.py index e7525c8a9..0d8dc4a4e 100644 --- a/endpoints/v2/blob.py +++ b/endpoints/v2/blob.py @@ -13,6 +13,7 @@ from auth.jwt_auth import process_jwt_auth from endpoints.decorators import anon_protect from util.cache import cache_control from util.registry.filelike import wrap_with_handler, StreamSlice +from util.registry.gzipstream import calculate_size_handler from storage.basestorage import InvalidChunkException @@ -220,6 +221,14 @@ def _upload_chunk(namespace, repo_name, upload_uuid): input_fp = wrap_with_handler(input_fp, found.sha_state.update) + # If this is the first chunk and we're starting at the 0 offset, add a handler to gunzip the + # stream so we can determine the uncompressed size. We'll throw out this data if another chunk + # comes in, but in the common case Docker only sends one chunk. + size_info = None + if start_offset == 0 and found.chunk_count == 0: + size_info, fn = calculate_size_handler() + input_fp = wrap_with_handler(input_fp, fn) + try: length_written, new_metadata, error = storage.stream_upload_chunk(location_set, upload_uuid, start_offset, length, input_fp, @@ -228,8 +237,18 @@ def _upload_chunk(namespace, repo_name, upload_uuid): except InvalidChunkException: _range_not_satisfiable(found.byte_count) + # If we determined an uncompressed size and this is the first chunk, add it to the blob. + # Otherwise, we clear the size from the blob as it was uploaded in multiple chunks. + if size_info is not None and found.chunk_count == 0 and size_info.is_valid: + found.uncompressed_byte_count = size_info.uncompressed_size + elif length_written > 0: + # Otherwise, if we wrote some bytes and the above conditions were not met, then we don't + # know the uncompressed size. + found.uncompressed_byte_count = None + found.storage_metadata = new_metadata found.byte_count += length_written + found.chunk_count += 1 return found, error @@ -257,7 +276,8 @@ def _finish_upload(namespace, repo_name, upload_obj, expected_digest): # Mark the blob as uploaded. model.blob.store_blob_record_and_temp_link(namespace, repo_name, expected_digest, upload_obj.location, upload_obj.byte_count, - app.config['PUSH_TEMP_TAG_EXPIRATION_SEC']) + app.config['PUSH_TEMP_TAG_EXPIRATION_SEC'], + upload_obj.uncompressed_byte_count) # Delete the upload tracking row. upload_obj.delete_instance() diff --git a/test/data/test.db b/test/data/test.db index 335b28e34..09d116ec4 100644 Binary files a/test/data/test.db and b/test/data/test.db differ diff --git a/util/registry/gzipstream.py b/util/registry/gzipstream.py index 0f9ce1e6b..3a3f28542 100644 --- a/util/registry/gzipstream.py +++ b/util/registry/gzipstream.py @@ -16,6 +16,7 @@ class SizeInfo(object): def __init__(self): self.uncompressed_size = 0 self.compressed_size = 0 + self.is_valid = True def calculate_size_handler(): """ Returns an object and a SocketReader handler. The handler will gunzip the data it receives, @@ -23,17 +24,26 @@ def calculate_size_handler(): """ size_info = SizeInfo() - decompressor = zlib.decompressobj(ZLIB_GZIP_WINDOW) def fn(buf): + if not size_info.is_valid: + return + # Note: We set a maximum CHUNK_SIZE to prevent the decompress from taking too much # memory. As a result, we have to loop until the unconsumed tail is empty. current_data = buf size_info.compressed_size += len(current_data) while len(current_data) > 0: - size_info.uncompressed_size += len(decompressor.decompress(current_data, CHUNK_SIZE)) + try: + size_info.uncompressed_size += len(decompressor.decompress(current_data, CHUNK_SIZE)) + except: + # The gzip stream is not valid for some reason. + size_info.uncompressed_size = None + size_info.is_valid = False + return + current_data = decompressor.unconsumed_tail # Make sure we allow the scheduler to do other work if we get stuck in this tight loop.