Change V1 upload code to use the new blob uploader
This commit is contained in:
parent
ba39737c3a
commit
077c3908e4
3 changed files with 48 additions and 64 deletions
|
@ -79,9 +79,9 @@ class DockerRegistryV1DataInterface(object):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def update_image_sizes(self, namespace_name, repo_name, image_id, size, uncompressed_size):
|
def update_image_blob(self, namespace_name, repo_name, image_id, blob):
|
||||||
"""
|
"""
|
||||||
Updates the sizing information for the image with the given V1 Docker ID.
|
Updates the blob for the image with the given V1 Docker ID.
|
||||||
"""
|
"""
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
from app import app, storage as store
|
from app import app, storage as store
|
||||||
from data import model
|
from data import model
|
||||||
|
from data.database import db_transaction
|
||||||
from endpoints.v1.models_interface import DockerRegistryV1DataInterface, Repository
|
from endpoints.v1.models_interface import DockerRegistryV1DataInterface, Repository
|
||||||
from util.morecollections import AttrDict
|
from util.morecollections import AttrDict
|
||||||
|
|
||||||
|
@ -56,8 +57,8 @@ class PreOCIModel(DockerRegistryV1DataInterface):
|
||||||
if repo_image is None or repo_image.storage is None:
|
if repo_image is None or repo_image.storage is None:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
assert repo_image.storage.content_checksum == content_checksum
|
||||||
with model.db_transaction():
|
with model.db_transaction():
|
||||||
repo_image.storage.content_checksum = content_checksum
|
|
||||||
repo_image.v1_checksum = checksum
|
repo_image.v1_checksum = checksum
|
||||||
repo_image.storage.save()
|
repo_image.storage.save()
|
||||||
repo_image.save()
|
repo_image.save()
|
||||||
|
@ -77,9 +78,19 @@ class PreOCIModel(DockerRegistryV1DataInterface):
|
||||||
repo_image.storage.save()
|
repo_image.storage.save()
|
||||||
return repo_image.storage
|
return repo_image.storage
|
||||||
|
|
||||||
def update_image_sizes(self, namespace_name, repo_name, image_id, size, uncompressed_size):
|
def update_image_blob(self, namespace_name, repo_name, image_id, blob):
|
||||||
model.storage.set_image_storage_metadata(image_id, namespace_name, repo_name, size,
|
# Retrieve the existing image storage record and replace it with that given by the blob.
|
||||||
uncompressed_size)
|
repo_image = model.image.get_repo_image_and_storage(namespace_name, repo_name, image_id)
|
||||||
|
if repo_image is None or repo_image.storage is None or not repo_image.storage.uploading:
|
||||||
|
return False
|
||||||
|
|
||||||
|
with db_transaction():
|
||||||
|
existing_storage = repo_image.storage
|
||||||
|
|
||||||
|
repo_image.storage = blob._db_id
|
||||||
|
repo_image.save()
|
||||||
|
|
||||||
|
existing_storage.delete_instance(recursive=True)
|
||||||
|
|
||||||
def get_image_size(self, namespace_name, repo_name, image_id):
|
def get_image_size(self, namespace_name, repo_name, image_id):
|
||||||
repo_image = model.image.get_repo_image_and_storage(namespace_name, repo_name, image_id)
|
repo_image = model.image.get_repo_image_and_storage(namespace_name, repo_name, image_id)
|
||||||
|
|
|
@ -12,6 +12,8 @@ from auth.auth_context import get_authenticated_user
|
||||||
from auth.decorators import extract_namespace_repo_from_session, process_auth
|
from auth.decorators import extract_namespace_repo_from_session, process_auth
|
||||||
from auth.permissions import (ReadRepositoryPermission, ModifyRepositoryPermission)
|
from auth.permissions import (ReadRepositoryPermission, ModifyRepositoryPermission)
|
||||||
from data import model, database
|
from data import model, database
|
||||||
|
from data.registry_model import registry_model
|
||||||
|
from data.registry_model.blobuploader import upload_blob, BlobUploadSettings, BlobUploadException
|
||||||
from digest import checksums
|
from digest import checksums
|
||||||
from endpoints.v1 import v1_bp
|
from endpoints.v1 import v1_bp
|
||||||
from endpoints.v1.models_pre_oci import pre_oci_model as model
|
from endpoints.v1.models_pre_oci import pre_oci_model as model
|
||||||
|
@ -26,14 +28,6 @@ from util.registry.torrent import PieceHasher
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def _finish_image(namespace, repository, image_id):
|
|
||||||
# Checksum is ok, we remove the marker
|
|
||||||
blob_ref = model.update_image_uploading(namespace, repository, image_id, False)
|
|
||||||
|
|
||||||
# Send a job to the work queue to replicate the image layer.
|
|
||||||
queue_storage_replication(namespace, blob_ref)
|
|
||||||
|
|
||||||
|
|
||||||
def require_completion(f):
|
def require_completion(f):
|
||||||
"""This make sure that the image push correctly finished."""
|
"""This make sure that the image push correctly finished."""
|
||||||
|
|
||||||
|
@ -183,51 +177,44 @@ def put_image_layer(namespace, repository, image_id):
|
||||||
# encoding (Gunicorn)
|
# encoding (Gunicorn)
|
||||||
input_stream = request.environ['wsgi.input']
|
input_stream = request.environ['wsgi.input']
|
||||||
|
|
||||||
# Create a socket reader to read the input stream containing the layer data.
|
repository_ref = registry_model.lookup_repository(namespace, repository)
|
||||||
sr = SocketReader(input_stream)
|
if repository_ref is None:
|
||||||
|
abort(404)
|
||||||
|
|
||||||
|
expiration_sec = app.config['PUSH_TEMP_TAG_EXPIRATION_SEC']
|
||||||
|
settings = BlobUploadSettings(maximum_blob_size=app.config['MAXIMUM_LAYER_SIZE'],
|
||||||
|
bittorrent_piece_size=app.config['BITTORRENT_PIECE_SIZE'],
|
||||||
|
committed_blob_expiration=expiration_sec)
|
||||||
|
|
||||||
|
extra_handlers = []
|
||||||
|
|
||||||
# Add a handler that copies the data into a temp file. This is used to calculate the tarsum,
|
# Add a handler that copies the data into a temp file. This is used to calculate the tarsum,
|
||||||
# which is only needed for older versions of Docker.
|
# which is only needed for older versions of Docker.
|
||||||
requires_tarsum = session.get('checksum_format') == 'tarsum'
|
requires_tarsum = session.get('checksum_format') == 'tarsum'
|
||||||
if requires_tarsum:
|
if requires_tarsum:
|
||||||
tmp, tmp_hndlr = store.temp_store_handler()
|
tmp, tmp_hndlr = store.temp_store_handler()
|
||||||
sr.add_handler(tmp_hndlr)
|
extra_handlers.append(tmp_hndlr)
|
||||||
|
|
||||||
# Add a handler to compute the compressed and uncompressed sizes of the layer.
|
# Add a handler which computes the simple Docker V1 checksum.
|
||||||
size_info, size_hndlr = gzipstream.calculate_size_handler()
|
|
||||||
sr.add_handler(size_hndlr)
|
|
||||||
|
|
||||||
# Add a handler to hash the chunks of the upload for torrenting
|
|
||||||
piece_hasher = PieceHasher(app.config['BITTORRENT_PIECE_SIZE'])
|
|
||||||
sr.add_handler(piece_hasher.update)
|
|
||||||
|
|
||||||
# Add a handler which computes the checksum.
|
|
||||||
h, sum_hndlr = checksums.simple_checksum_handler(v1_metadata.compat_json)
|
h, sum_hndlr = checksums.simple_checksum_handler(v1_metadata.compat_json)
|
||||||
sr.add_handler(sum_hndlr)
|
extra_handlers.append(sum_hndlr)
|
||||||
|
|
||||||
# Add a handler which computes the content checksum only
|
uploaded_blob = None
|
||||||
ch, content_sum_hndlr = checksums.content_checksum_handler()
|
try:
|
||||||
sr.add_handler(content_sum_hndlr)
|
with upload_blob(repository_ref, store, settings,
|
||||||
|
extra_blob_stream_handlers=extra_handlers) as manager:
|
||||||
|
manager.upload_chunk(app.config, input_stream)
|
||||||
|
uploaded_blob = manager.commit_to_blob(app.config)
|
||||||
|
except BlobUploadException:
|
||||||
|
logger.exception('Exception when writing image data')
|
||||||
|
abort(520, 'Image %(image_id)s could not be written. Please try again.', image_id=image_id)
|
||||||
|
|
||||||
# Stream write the data to storage.
|
# Save the blob for the image.
|
||||||
locations, path = model.placement_locations_and_path_docker_v1(namespace, repository, image_id)
|
model.update_image_blob(namespace, repository, image_id, uploaded_blob)
|
||||||
with database.CloseForLongOperation(app.config):
|
|
||||||
try:
|
|
||||||
start_time = time()
|
|
||||||
store.stream_write(locations, path, sr)
|
|
||||||
metric_queue.chunk_size.Observe(size_info.compressed_size, labelvalues=[list(locations)[0]])
|
|
||||||
metric_queue.chunk_upload_time.Observe(time() - start_time, labelvalues=[list(locations)[0]])
|
|
||||||
except IOError:
|
|
||||||
logger.exception('Exception when writing image data')
|
|
||||||
abort(520, 'Image %(image_id)s could not be written. Please try again.', image_id=image_id)
|
|
||||||
|
|
||||||
# Save the size of the image.
|
# Send a job to the work queue to replicate the image layer.
|
||||||
model.update_image_sizes(namespace, repository, image_id, size_info.compressed_size,
|
# TODO: move this into a better place.
|
||||||
size_info.uncompressed_size)
|
queue_storage_replication(namespace, uploaded_blob)
|
||||||
|
|
||||||
# Save the BitTorrent pieces.
|
|
||||||
model.create_bittorrent_pieces(namespace, repository, image_id,
|
|
||||||
piece_hasher.final_piece_hashes())
|
|
||||||
|
|
||||||
# Append the computed checksum.
|
# Append the computed checksum.
|
||||||
csums = []
|
csums = []
|
||||||
|
@ -245,7 +232,7 @@ def put_image_layer(namespace, repository, image_id):
|
||||||
# We don't have a checksum stored yet, that's fine skipping the check.
|
# We don't have a checksum stored yet, that's fine skipping the check.
|
||||||
# Not removing the mark though, image is not downloadable yet.
|
# Not removing the mark though, image is not downloadable yet.
|
||||||
session['checksum'] = csums
|
session['checksum'] = csums
|
||||||
session['content_checksum'] = 'sha256:{0}'.format(ch.hexdigest())
|
session['content_checksum'] = uploaded_blob.digest
|
||||||
return make_response('true', 200)
|
return make_response('true', 200)
|
||||||
|
|
||||||
# We check if the checksums provided matches one the one we computed
|
# We check if the checksums provided matches one the one we computed
|
||||||
|
@ -254,9 +241,6 @@ def put_image_layer(namespace, repository, image_id):
|
||||||
abort(400, 'Checksum mismatch; ignoring the layer for image %(image_id)s',
|
abort(400, 'Checksum mismatch; ignoring the layer for image %(image_id)s',
|
||||||
issue='checksum-mismatch', image_id=image_id)
|
issue='checksum-mismatch', image_id=image_id)
|
||||||
|
|
||||||
# Mark the image as uploaded.
|
|
||||||
_finish_image(namespace, repository, image_id)
|
|
||||||
|
|
||||||
return make_response('true', 200)
|
return make_response('true', 200)
|
||||||
|
|
||||||
|
|
||||||
|
@ -306,20 +290,12 @@ def put_image_checksum(namespace, repository, image_id):
|
||||||
if not v1_metadata.compat_json:
|
if not v1_metadata.compat_json:
|
||||||
abort(404, 'Image not found: %(image_id)s', issue='unknown-image', image_id=image_id)
|
abort(404, 'Image not found: %(image_id)s', issue='unknown-image', image_id=image_id)
|
||||||
|
|
||||||
logger.debug('Marking image path')
|
logger.debug('Storing image and checksum')
|
||||||
if not model.is_image_uploading(namespace, repository, image_id):
|
|
||||||
abort(409, 'Cannot set checksum for image %(image_id)s', issue='image-write-error',
|
|
||||||
image_id=image_id)
|
|
||||||
|
|
||||||
logger.debug('Storing image and content checksums')
|
|
||||||
|
|
||||||
content_checksum = session.get('content_checksum', None)
|
content_checksum = session.get('content_checksum', None)
|
||||||
checksum_parts = checksum.split(':')
|
checksum_parts = checksum.split(':')
|
||||||
if len(checksum_parts) != 2:
|
if len(checksum_parts) != 2:
|
||||||
abort(400, 'Invalid checksum format')
|
abort(400, 'Invalid checksum format')
|
||||||
|
|
||||||
model.store_docker_v1_checksums(namespace, repository, image_id, checksum, content_checksum)
|
|
||||||
|
|
||||||
if checksum not in session.get('checksum', []):
|
if checksum not in session.get('checksum', []):
|
||||||
logger.debug('session checksums: %s', session.get('checksum', []))
|
logger.debug('session checksums: %s', session.get('checksum', []))
|
||||||
logger.debug('client supplied checksum: %s', checksum)
|
logger.debug('client supplied checksum: %s', checksum)
|
||||||
|
@ -327,9 +303,6 @@ def put_image_checksum(namespace, repository, image_id):
|
||||||
abort(400, 'Checksum mismatch for image: %(image_id)s', issue='checksum-mismatch',
|
abort(400, 'Checksum mismatch for image: %(image_id)s', issue='checksum-mismatch',
|
||||||
image_id=image_id)
|
image_id=image_id)
|
||||||
|
|
||||||
# Mark the image as uploaded.
|
|
||||||
_finish_image(namespace, repository, image_id)
|
|
||||||
|
|
||||||
return make_response('true', 200)
|
return make_response('true', 200)
|
||||||
|
|
||||||
|
|
||||||
|
|
Reference in a new issue