Handle the common case of one chunk when calculating the uncompressed size
Reference #992
This commit is contained in:
parent
1323da20e3
commit
54095eb5cb
6 changed files with 65 additions and 4 deletions
|
@ -784,6 +784,8 @@ class BlobUpload(BaseModel):
|
||||||
sha_state = ResumableSHAField(null=True, default=resumablehashlib.sha256)
|
sha_state = ResumableSHAField(null=True, default=resumablehashlib.sha256)
|
||||||
location = ForeignKeyField(ImageStorageLocation)
|
location = ForeignKeyField(ImageStorageLocation)
|
||||||
storage_metadata = JSONField(null=True, default={})
|
storage_metadata = JSONField(null=True, default={})
|
||||||
|
chunk_count = IntegerField(default=0)
|
||||||
|
uncompressed_byte_count = IntegerField(null=True)
|
||||||
|
|
||||||
class Meta:
|
class Meta:
|
||||||
database = db
|
database = db
|
||||||
|
|
|
@ -0,0 +1,28 @@
|
||||||
|
"""Add new blobupload columns
|
||||||
|
|
||||||
|
Revision ID: 403d02fea323
|
||||||
|
Revises: 10b999e8db1f
|
||||||
|
Create Date: 2015-11-30 14:25:46.822730
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
# revision identifiers, used by Alembic.
|
||||||
|
revision = '403d02fea323'
|
||||||
|
down_revision = '10b999e8db1f'
|
||||||
|
|
||||||
|
from alembic import op
|
||||||
|
import sqlalchemy as sa
|
||||||
|
from sqlalchemy.dialects import mysql
|
||||||
|
|
||||||
|
def upgrade(tables):
|
||||||
|
### commands auto generated by Alembic - please adjust! ###
|
||||||
|
op.add_column('blobupload', sa.Column('chunk_count', sa.Integer(), nullable=False))
|
||||||
|
op.add_column('blobupload', sa.Column('uncompressed_byte_count', sa.Integer(), nullable=True))
|
||||||
|
### end Alembic commands ###
|
||||||
|
|
||||||
|
|
||||||
|
def downgrade(tables):
|
||||||
|
### commands auto generated by Alembic - please adjust! ###
|
||||||
|
op.drop_column('blobupload', 'uncompressed_byte_count')
|
||||||
|
op.drop_column('blobupload', 'chunk_count')
|
||||||
|
### end Alembic commands ###
|
|
@ -26,7 +26,7 @@ def get_repo_blob_by_digest(namespace, repo_name, blob_digest):
|
||||||
|
|
||||||
|
|
||||||
def store_blob_record_and_temp_link(namespace, repo_name, blob_digest, location_obj, byte_count,
|
def store_blob_record_and_temp_link(namespace, repo_name, blob_digest, location_obj, byte_count,
|
||||||
link_expiration_s):
|
link_expiration_s, uncompressed_byte_count=None):
|
||||||
""" Store a record of the blob and temporarily link it to the specified repository.
|
""" Store a record of the blob and temporarily link it to the specified repository.
|
||||||
"""
|
"""
|
||||||
random_image_name = str(uuid4())
|
random_image_name = str(uuid4())
|
||||||
|
@ -35,6 +35,7 @@ def store_blob_record_and_temp_link(namespace, repo_name, blob_digest, location_
|
||||||
try:
|
try:
|
||||||
storage = ImageStorage.get(content_checksum=blob_digest)
|
storage = ImageStorage.get(content_checksum=blob_digest)
|
||||||
storage.image_size = byte_count
|
storage.image_size = byte_count
|
||||||
|
storage.uncompressed_size = uncompressed_byte_count
|
||||||
storage.save()
|
storage.save()
|
||||||
|
|
||||||
ImageStoragePlacement.get(storage=storage, location=location_obj)
|
ImageStoragePlacement.get(storage=storage, location=location_obj)
|
||||||
|
|
|
@ -13,6 +13,7 @@ from auth.jwt_auth import process_jwt_auth
|
||||||
from endpoints.decorators import anon_protect
|
from endpoints.decorators import anon_protect
|
||||||
from util.cache import cache_control
|
from util.cache import cache_control
|
||||||
from util.registry.filelike import wrap_with_handler, StreamSlice
|
from util.registry.filelike import wrap_with_handler, StreamSlice
|
||||||
|
from util.registry.gzipstream import calculate_size_handler
|
||||||
from storage.basestorage import InvalidChunkException
|
from storage.basestorage import InvalidChunkException
|
||||||
|
|
||||||
|
|
||||||
|
@ -220,6 +221,14 @@ def _upload_chunk(namespace, repo_name, upload_uuid):
|
||||||
|
|
||||||
input_fp = wrap_with_handler(input_fp, found.sha_state.update)
|
input_fp = wrap_with_handler(input_fp, found.sha_state.update)
|
||||||
|
|
||||||
|
# If this is the first chunk and we're starting at the 0 offset, add a handler to gunzip the
|
||||||
|
# stream so we can determine the uncompressed size. We'll throw out this data if another chunk
|
||||||
|
# comes in, but in the common case Docker only sends one chunk.
|
||||||
|
size_info = None
|
||||||
|
if start_offset == 0 and found.chunk_count == 0:
|
||||||
|
size_info, fn = calculate_size_handler()
|
||||||
|
input_fp = wrap_with_handler(input_fp, fn)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
length_written, new_metadata, error = storage.stream_upload_chunk(location_set, upload_uuid,
|
length_written, new_metadata, error = storage.stream_upload_chunk(location_set, upload_uuid,
|
||||||
start_offset, length, input_fp,
|
start_offset, length, input_fp,
|
||||||
|
@ -228,8 +237,18 @@ def _upload_chunk(namespace, repo_name, upload_uuid):
|
||||||
except InvalidChunkException:
|
except InvalidChunkException:
|
||||||
_range_not_satisfiable(found.byte_count)
|
_range_not_satisfiable(found.byte_count)
|
||||||
|
|
||||||
|
# If we determined an uncompressed size and this is the first chunk, add it to the blob.
|
||||||
|
# Otherwise, we clear the size from the blob as it was uploaded in multiple chunks.
|
||||||
|
if size_info is not None and found.chunk_count == 0 and size_info.is_valid:
|
||||||
|
found.uncompressed_byte_count = size_info.uncompressed_size
|
||||||
|
elif length_written > 0:
|
||||||
|
# Otherwise, if we wrote some bytes and the above conditions were not met, then we don't
|
||||||
|
# know the uncompressed size.
|
||||||
|
found.uncompressed_byte_count = None
|
||||||
|
|
||||||
found.storage_metadata = new_metadata
|
found.storage_metadata = new_metadata
|
||||||
found.byte_count += length_written
|
found.byte_count += length_written
|
||||||
|
found.chunk_count += 1
|
||||||
return found, error
|
return found, error
|
||||||
|
|
||||||
|
|
||||||
|
@ -257,7 +276,8 @@ def _finish_upload(namespace, repo_name, upload_obj, expected_digest):
|
||||||
# Mark the blob as uploaded.
|
# Mark the blob as uploaded.
|
||||||
model.blob.store_blob_record_and_temp_link(namespace, repo_name, expected_digest,
|
model.blob.store_blob_record_and_temp_link(namespace, repo_name, expected_digest,
|
||||||
upload_obj.location, upload_obj.byte_count,
|
upload_obj.location, upload_obj.byte_count,
|
||||||
app.config['PUSH_TEMP_TAG_EXPIRATION_SEC'])
|
app.config['PUSH_TEMP_TAG_EXPIRATION_SEC'],
|
||||||
|
upload_obj.uncompressed_byte_count)
|
||||||
|
|
||||||
# Delete the upload tracking row.
|
# Delete the upload tracking row.
|
||||||
upload_obj.delete_instance()
|
upload_obj.delete_instance()
|
||||||
|
|
Binary file not shown.
|
@ -16,6 +16,7 @@ class SizeInfo(object):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.uncompressed_size = 0
|
self.uncompressed_size = 0
|
||||||
self.compressed_size = 0
|
self.compressed_size = 0
|
||||||
|
self.is_valid = True
|
||||||
|
|
||||||
def calculate_size_handler():
|
def calculate_size_handler():
|
||||||
""" Returns an object and a SocketReader handler. The handler will gunzip the data it receives,
|
""" Returns an object and a SocketReader handler. The handler will gunzip the data it receives,
|
||||||
|
@ -23,17 +24,26 @@ def calculate_size_handler():
|
||||||
"""
|
"""
|
||||||
|
|
||||||
size_info = SizeInfo()
|
size_info = SizeInfo()
|
||||||
|
|
||||||
decompressor = zlib.decompressobj(ZLIB_GZIP_WINDOW)
|
decompressor = zlib.decompressobj(ZLIB_GZIP_WINDOW)
|
||||||
|
|
||||||
def fn(buf):
|
def fn(buf):
|
||||||
|
if not size_info.is_valid:
|
||||||
|
return
|
||||||
|
|
||||||
# Note: We set a maximum CHUNK_SIZE to prevent the decompress from taking too much
|
# Note: We set a maximum CHUNK_SIZE to prevent the decompress from taking too much
|
||||||
# memory. As a result, we have to loop until the unconsumed tail is empty.
|
# memory. As a result, we have to loop until the unconsumed tail is empty.
|
||||||
current_data = buf
|
current_data = buf
|
||||||
size_info.compressed_size += len(current_data)
|
size_info.compressed_size += len(current_data)
|
||||||
|
|
||||||
while len(current_data) > 0:
|
while len(current_data) > 0:
|
||||||
size_info.uncompressed_size += len(decompressor.decompress(current_data, CHUNK_SIZE))
|
try:
|
||||||
|
size_info.uncompressed_size += len(decompressor.decompress(current_data, CHUNK_SIZE))
|
||||||
|
except:
|
||||||
|
# The gzip stream is not valid for some reason.
|
||||||
|
size_info.uncompressed_size = None
|
||||||
|
size_info.is_valid = False
|
||||||
|
return
|
||||||
|
|
||||||
current_data = decompressor.unconsumed_tail
|
current_data = decompressor.unconsumed_tail
|
||||||
|
|
||||||
# Make sure we allow the scheduler to do other work if we get stuck in this tight loop.
|
# Make sure we allow the scheduler to do other work if we get stuck in this tight loop.
|
||||||
|
|
Reference in a new issue