Merge pull request #3057 from quay/joseph.schorr/QUAY-893/blob-mount
Implement support for blob mounting via the `mount` parameter on blob uploads
This commit is contained in:
commit
e22b0ce004
8 changed files with 291 additions and 63 deletions
|
@ -30,9 +30,7 @@ def store_blob_record_and_temp_link(namespace, repo_name, blob_digest, location_
|
||||||
link_expiration_s, uncompressed_byte_count=None):
|
link_expiration_s, uncompressed_byte_count=None):
|
||||||
""" Store a record of the blob and temporarily link it to the specified repository.
|
""" Store a record of the blob and temporarily link it to the specified repository.
|
||||||
"""
|
"""
|
||||||
random_image_name = str(uuid4())
|
|
||||||
with db_transaction():
|
with db_transaction():
|
||||||
repo = _basequery.get_existing_repository(namespace, repo_name)
|
|
||||||
try:
|
try:
|
||||||
storage = ImageStorage.get(content_checksum=blob_digest)
|
storage = ImageStorage.get(content_checksum=blob_digest)
|
||||||
storage.image_size = byte_count
|
storage.image_size = byte_count
|
||||||
|
@ -51,12 +49,33 @@ def store_blob_record_and_temp_link(namespace, repo_name, blob_digest, location_
|
||||||
except ImageStoragePlacement.DoesNotExist:
|
except ImageStoragePlacement.DoesNotExist:
|
||||||
ImageStoragePlacement.create(storage=storage, location=location_obj)
|
ImageStoragePlacement.create(storage=storage, location=location_obj)
|
||||||
|
|
||||||
# Create a temporary link into the repository, to be replaced by the v1 metadata later
|
_temp_link_blob(namespace, repo_name, storage, link_expiration_s)
|
||||||
# and create a temporary tag to reference it
|
return storage
|
||||||
image = Image.create(storage=storage, docker_image_id=random_image_name, repository=repo)
|
|
||||||
tag.create_temporary_hidden_tag(repo, image, link_expiration_s)
|
|
||||||
|
|
||||||
return storage
|
|
||||||
|
def temp_link_blob(namespace, repo_name, blob_digest, link_expiration_s):
|
||||||
|
""" Temporarily links to the blob record from the given namespace. If the blob record is not
|
||||||
|
found, return None.
|
||||||
|
"""
|
||||||
|
with db_transaction():
|
||||||
|
try:
|
||||||
|
storage = ImageStorage.get(content_checksum=blob_digest)
|
||||||
|
except ImageStorage.DoesNotExist:
|
||||||
|
return None
|
||||||
|
|
||||||
|
_temp_link_blob(namespace, repo_name, storage, link_expiration_s)
|
||||||
|
return storage
|
||||||
|
|
||||||
|
|
||||||
|
def _temp_link_blob(namespace, repo_name, storage, link_expiration_s):
|
||||||
|
""" Note: Should *always* be called by a parent under a transaction. """
|
||||||
|
random_image_name = str(uuid4())
|
||||||
|
repo = _basequery.get_existing_repository(namespace, repo_name)
|
||||||
|
|
||||||
|
# Create a temporary link into the repository, to be replaced by the v1 metadata later
|
||||||
|
# and create a temporary tag to reference it
|
||||||
|
image = Image.create(storage=storage, docker_image_id=random_image_name, repository=repo)
|
||||||
|
tag.create_temporary_hidden_tag(repo, image, link_expiration_s)
|
||||||
|
|
||||||
|
|
||||||
def get_stale_blob_upload(stale_timespan):
|
def get_stale_blob_upload(stale_timespan):
|
||||||
|
|
|
@ -9,16 +9,19 @@ import resumablehashlib
|
||||||
|
|
||||||
from app import storage, app, get_app_url, metric_queue, model_cache
|
from app import storage, app, get_app_url, metric_queue, model_cache
|
||||||
from auth.registry_jwt_auth import process_registry_jwt_auth
|
from auth.registry_jwt_auth import process_registry_jwt_auth
|
||||||
|
from auth.permissions import ReadRepositoryPermission
|
||||||
from data import database
|
from data import database
|
||||||
from data.cache import cache_key
|
from data.cache import cache_key
|
||||||
from digest import digest_tools
|
from digest import digest_tools
|
||||||
from endpoints.decorators import anon_protect, parse_repository_name
|
from endpoints.decorators import anon_protect, parse_repository_name
|
||||||
from endpoints.v2 import v2_bp, require_repo_read, require_repo_write, get_input_stream
|
from endpoints.v2 import v2_bp, require_repo_read, require_repo_write, get_input_stream
|
||||||
from endpoints.v2.errors import (
|
from endpoints.v2.errors import (
|
||||||
BlobUnknown, BlobUploadInvalid, BlobUploadUnknown, Unsupported, NameUnknown, LayerTooLarge)
|
BlobUnknown, BlobUploadInvalid, BlobUploadUnknown, Unsupported, NameUnknown, LayerTooLarge,
|
||||||
|
InvalidRequest)
|
||||||
from endpoints.v2.models_interface import Blob
|
from endpoints.v2.models_interface import Blob
|
||||||
from endpoints.v2.models_pre_oci import data_model as model
|
from endpoints.v2.models_pre_oci import data_model as model
|
||||||
from util.cache import cache_control
|
from util.cache import cache_control
|
||||||
|
from util.names import parse_namespace_repository
|
||||||
from util.registry.filelike import wrap_with_handler, StreamSlice
|
from util.registry.filelike import wrap_with_handler, StreamSlice
|
||||||
from util.registry.gzipstream import calculate_size_handler
|
from util.registry.gzipstream import calculate_size_handler
|
||||||
from util.registry.torrent import PieceHasher
|
from util.registry.torrent import PieceHasher
|
||||||
|
@ -121,6 +124,59 @@ def download_blob(namespace_name, repo_name, digest):
|
||||||
'Content-Type': BLOB_CONTENT_TYPE,}),)
|
'Content-Type': BLOB_CONTENT_TYPE,}),)
|
||||||
|
|
||||||
|
|
||||||
|
def _try_to_mount_blob(namespace_name, repo_name, mount_blob_digest):
|
||||||
|
""" Attempts to mount a blob requested by the user from another repository. """
|
||||||
|
logger.debug('Got mount request for blob `%s` into `%s/%s`', mount_blob_digest, namespace_name,
|
||||||
|
repo_name)
|
||||||
|
from_repo = request.args.get('from', None)
|
||||||
|
if from_repo is None:
|
||||||
|
raise InvalidRequest
|
||||||
|
|
||||||
|
# Ensure the user has access to the repository.
|
||||||
|
logger.debug('Got mount request for blob `%s` under repository `%s` into `%s/%s`',
|
||||||
|
mount_blob_digest, from_repo, namespace_name, repo_name)
|
||||||
|
from_namespace, from_repo_name = parse_namespace_repository(from_repo,
|
||||||
|
app.config['LIBRARY_NAMESPACE'],
|
||||||
|
include_tag=False)
|
||||||
|
|
||||||
|
# First check permission. This does not hit the DB so we do it first.
|
||||||
|
read_permission = ReadRepositoryPermission(from_namespace, from_repo_name).can()
|
||||||
|
if not read_permission:
|
||||||
|
# If no direct permission, check if the repostory is public.
|
||||||
|
if not model.is_repository_public(from_namespace, from_repo_name):
|
||||||
|
logger.debug('No permission to mount blob `%s` under repository `%s` into `%s/%s`',
|
||||||
|
mount_blob_digest, from_repo, namespace_name, repo_name)
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Lookup if the mount blob's digest exists in the repository.
|
||||||
|
mount_blob = model.get_blob_by_digest(from_namespace, from_repo_name, mount_blob_digest)
|
||||||
|
if mount_blob is None:
|
||||||
|
logger.debug('Blob `%s` under repository `%s` not found', mount_blob_digest, from_repo)
|
||||||
|
return None
|
||||||
|
|
||||||
|
logger.debug('Mounting blob `%s` under repository `%s` into `%s/%s`', mount_blob_digest,
|
||||||
|
from_repo, namespace_name, repo_name)
|
||||||
|
|
||||||
|
# Mount the blob into the current repository and return that we've completed the operation.
|
||||||
|
expiration_sec = app.config['PUSH_TEMP_TAG_EXPIRATION_SEC']
|
||||||
|
if not model.mount_blob_and_temp_tag(namespace_name, repo_name, mount_blob, expiration_sec):
|
||||||
|
logger.debug('Could not mount blob `%s` under repository `%s` not found', mount_blob_digest,
|
||||||
|
from_repo)
|
||||||
|
return
|
||||||
|
|
||||||
|
# Return the response for the blob indicating that it was mounted, and including its content
|
||||||
|
# digest.
|
||||||
|
logger.debug('Mounted blob `%s` under repository `%s` into `%s/%s`', mount_blob_digest,
|
||||||
|
from_repo, namespace_name, repo_name)
|
||||||
|
return Response(
|
||||||
|
status=201,
|
||||||
|
headers={
|
||||||
|
'Docker-Content-Digest': mount_blob_digest,
|
||||||
|
'Location':
|
||||||
|
get_app_url() + url_for('v2.download_blob', repository='%s/%s' %
|
||||||
|
(namespace_name, repo_name), digest=mount_blob_digest),},)
|
||||||
|
|
||||||
|
|
||||||
@v2_bp.route('/<repopath:repository>/blobs/uploads/', methods=['POST'])
|
@v2_bp.route('/<repopath:repository>/blobs/uploads/', methods=['POST'])
|
||||||
@parse_repository_name()
|
@parse_repository_name()
|
||||||
@process_registry_jwt_auth(scopes=['pull', 'push'])
|
@process_registry_jwt_auth(scopes=['pull', 'push'])
|
||||||
|
@ -135,6 +191,14 @@ def start_blob_upload(namespace_name, repo_name):
|
||||||
if not repository_exists:
|
if not repository_exists:
|
||||||
raise NameUnknown()
|
raise NameUnknown()
|
||||||
|
|
||||||
|
# Check for mounting of a blob from another repository.
|
||||||
|
mount_blob_digest = request.args.get('mount', None)
|
||||||
|
if mount_blob_digest is not None:
|
||||||
|
response = _try_to_mount_blob(namespace_name, repo_name, mount_blob_digest)
|
||||||
|
if response is not None:
|
||||||
|
return response
|
||||||
|
|
||||||
|
# Check for a normal blob upload.
|
||||||
digest = request.args.get('digest', None)
|
digest = request.args.get('digest', None)
|
||||||
if digest is None:
|
if digest is None:
|
||||||
# Short-circuit because the user will send the blob data in another request.
|
# Short-circuit because the user will send the blob data in another request.
|
||||||
|
|
|
@ -220,6 +220,14 @@ class DockerRegistryV2DataInterface(object):
|
||||||
"""
|
"""
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def mount_blob_and_temp_tag(self, namespace_name, repo_name, existing_blob, expiration_sec):
|
||||||
|
"""
|
||||||
|
Mounts an existing blob and links a temporary tag with the specified expiration to it under
|
||||||
|
the matching repository. Returns True on success and False on failure.
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def create_blob_and_temp_tag(self, namespace_name, repo_name, blob_digest, blob_upload,
|
def create_blob_and_temp_tag(self, namespace_name, repo_name, blob_digest, blob_upload,
|
||||||
expiration_sec):
|
expiration_sec):
|
||||||
|
@ -276,3 +284,8 @@ class DockerRegistryV2DataInterface(object):
|
||||||
""" Returns whether the given namespace is enabled. If the namespace doesn't exist,
|
""" Returns whether the given namespace is enabled. If the namespace doesn't exist,
|
||||||
returns True. """
|
returns True. """
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def is_repository_public(self, namespace_name, repo_name):
|
||||||
|
""" Returns True if the repository with the given name exists and is public. """
|
||||||
|
pass
|
||||||
|
|
|
@ -190,6 +190,10 @@ class PreOCIModel(DockerRegistryV2DataInterface):
|
||||||
except model.InvalidBlobUpload:
|
except model.InvalidBlobUpload:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
def mount_blob_and_temp_tag(self, namespace_name, repo_name, existing_blob, expiration_sec):
|
||||||
|
return model.blob.temp_link_blob(namespace_name, repo_name, existing_blob.digest,
|
||||||
|
expiration_sec)
|
||||||
|
|
||||||
def create_blob_and_temp_tag(self, namespace_name, repo_name, blob_digest, blob_upload,
|
def create_blob_and_temp_tag(self, namespace_name, repo_name, blob_digest, blob_upload,
|
||||||
expiration_sec):
|
expiration_sec):
|
||||||
location_obj = model.storage.get_image_location_for_name(blob_upload.location_name)
|
location_obj = model.storage.get_image_location_for_name(blob_upload.location_name)
|
||||||
|
@ -262,6 +266,9 @@ class PreOCIModel(DockerRegistryV2DataInterface):
|
||||||
namespace = model.user.get_namespace_user(namespace_name)
|
namespace = model.user.get_namespace_user(namespace_name)
|
||||||
return namespace is None or namespace.enabled
|
return namespace is None or namespace.enabled
|
||||||
|
|
||||||
|
def is_repository_public(self, namespace_name, repo_name):
|
||||||
|
return model.repository.repository_is_public(namespace_name, repo_name)
|
||||||
|
|
||||||
|
|
||||||
def _docker_v1_metadata(namespace_name, repo_name, repo_image):
|
def _docker_v1_metadata(namespace_name, repo_name, repo_image):
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -58,3 +58,71 @@ def test_blob_caching(method, endpoint, client, app):
|
||||||
with assert_query_count(0):
|
with assert_query_count(0):
|
||||||
conduct_call(client, 'v2.' + endpoint, url_for, method, params, expected_code=200,
|
conduct_call(client, 'v2.' + endpoint, url_for, method, params, expected_code=200,
|
||||||
headers=headers)
|
headers=headers)
|
||||||
|
|
||||||
|
@pytest.mark.parametrize('mount_digest, source_repo, username, expect_success', [
|
||||||
|
# Unknown blob.
|
||||||
|
('sha256:unknown', 'devtable/simple', 'devtable', False),
|
||||||
|
|
||||||
|
# Blob not in repo.
|
||||||
|
('sha256:' + hashlib.sha256("a").hexdigest(), 'devtable/complex', 'devtable', False),
|
||||||
|
|
||||||
|
# Blob in repo.
|
||||||
|
('sha256:' + hashlib.sha256("b").hexdigest(), 'devtable/complex', 'devtable', True),
|
||||||
|
|
||||||
|
# No access to repo.
|
||||||
|
('sha256:' + hashlib.sha256("b").hexdigest(), 'devtable/complex', 'public', False),
|
||||||
|
|
||||||
|
# Public repo.
|
||||||
|
('sha256:' + hashlib.sha256("c").hexdigest(), 'public/publicrepo', 'devtable', True),
|
||||||
|
])
|
||||||
|
def test_blob_mounting(mount_digest, source_repo, username, expect_success, client, app):
|
||||||
|
location = ImageStorageLocation.get(name='local_us')
|
||||||
|
|
||||||
|
# Store and link some blobs.
|
||||||
|
digest = 'sha256:' + hashlib.sha256("a").hexdigest()
|
||||||
|
model.blob.store_blob_record_and_temp_link('devtable', 'simple', digest, location, 1, 10000000)
|
||||||
|
|
||||||
|
digest = 'sha256:' + hashlib.sha256("b").hexdigest()
|
||||||
|
model.blob.store_blob_record_and_temp_link('devtable', 'complex', digest, location, 1, 10000000)
|
||||||
|
|
||||||
|
digest = 'sha256:' + hashlib.sha256("c").hexdigest()
|
||||||
|
model.blob.store_blob_record_and_temp_link('public', 'publicrepo', digest, location, 1, 10000000)
|
||||||
|
|
||||||
|
params = {
|
||||||
|
'repository': 'devtable/building',
|
||||||
|
'mount': mount_digest,
|
||||||
|
'from': source_repo,
|
||||||
|
}
|
||||||
|
|
||||||
|
user = model.user.get_user(username)
|
||||||
|
access = [{
|
||||||
|
'type': 'repository',
|
||||||
|
'name': 'devtable/building',
|
||||||
|
'actions': ['pull', 'push'],
|
||||||
|
}]
|
||||||
|
|
||||||
|
if source_repo.find(username) == 0:
|
||||||
|
access.append({
|
||||||
|
'type': 'repository',
|
||||||
|
'name': source_repo,
|
||||||
|
'actions': ['pull'],
|
||||||
|
})
|
||||||
|
|
||||||
|
context, subject = build_context_and_subject(ValidatedAuthContext(user=user))
|
||||||
|
token = generate_bearer_token(realapp.config['SERVER_HOSTNAME'], subject, context, access, 600,
|
||||||
|
instance_keys)
|
||||||
|
|
||||||
|
headers = {
|
||||||
|
'Authorization': 'Bearer %s' % token,
|
||||||
|
}
|
||||||
|
|
||||||
|
expected_code = 201 if expect_success else 202
|
||||||
|
conduct_call(client, 'v2.start_blob_upload', url_for, 'POST', params, expected_code=expected_code,
|
||||||
|
headers=headers)
|
||||||
|
|
||||||
|
if expect_success:
|
||||||
|
# Ensure the blob now exists under the repo.
|
||||||
|
model.blob.get_repo_blob_by_digest('devtable', 'building', mount_digest)
|
||||||
|
else:
|
||||||
|
with pytest.raises(model.blob.BlobDoesNotExist):
|
||||||
|
model.blob.get_repo_blob_by_digest('devtable', 'building', mount_digest)
|
||||||
|
|
|
@ -15,6 +15,7 @@ class V2ProtocolSteps(Enum):
|
||||||
BLOB_HEAD_CHECK = 'blob-head-check'
|
BLOB_HEAD_CHECK = 'blob-head-check'
|
||||||
GET_MANIFEST = 'get-manifest'
|
GET_MANIFEST = 'get-manifest'
|
||||||
PUT_MANIFEST = 'put-manifest'
|
PUT_MANIFEST = 'put-manifest'
|
||||||
|
MOUNT_BLOB = 'mount-blob'
|
||||||
|
|
||||||
|
|
||||||
class V2Protocol(RegistryProtocol):
|
class V2Protocol(RegistryProtocol):
|
||||||
|
@ -27,6 +28,9 @@ class V2Protocol(RegistryProtocol):
|
||||||
Failures.INVALID_REPOSITORY: 400,
|
Failures.INVALID_REPOSITORY: 400,
|
||||||
Failures.NAMESPACE_DISABLED: 400,
|
Failures.NAMESPACE_DISABLED: 400,
|
||||||
},
|
},
|
||||||
|
V2ProtocolSteps.MOUNT_BLOB: {
|
||||||
|
Failures.UNAUTHORIZED_FOR_MOUNT: 202,
|
||||||
|
},
|
||||||
V2ProtocolSteps.GET_MANIFEST: {
|
V2ProtocolSteps.GET_MANIFEST: {
|
||||||
Failures.UNKNOWN_TAG: 404,
|
Failures.UNKNOWN_TAG: 404,
|
||||||
Failures.UNAUTHORIZED: 403,
|
Failures.UNAUTHORIZED: 403,
|
||||||
|
@ -92,8 +96,7 @@ class V2Protocol(RegistryProtocol):
|
||||||
}
|
}
|
||||||
|
|
||||||
if scopes:
|
if scopes:
|
||||||
params['scope'] = 'repository:%s:%s' % (self.repo_name(namespace, repo_name),
|
params['scope'] = scopes
|
||||||
','.join(scopes))
|
|
||||||
|
|
||||||
response = self.conduct(session, 'GET', '/v2/auth', params=params, auth=auth,
|
response = self.conduct(session, 'GET', '/v2/auth', params=params, auth=auth,
|
||||||
expected_status=(200, expected_failure, V2ProtocolSteps.AUTH))
|
expected_status=(200, expected_failure, V2ProtocolSteps.AUTH))
|
||||||
|
@ -107,7 +110,7 @@ class V2Protocol(RegistryProtocol):
|
||||||
def push(self, session, namespace, repo_name, tag_names, images, credentials=None,
|
def push(self, session, namespace, repo_name, tag_names, images, credentials=None,
|
||||||
expected_failure=None, options=None):
|
expected_failure=None, options=None):
|
||||||
options = options or ProtocolOptions()
|
options = options or ProtocolOptions()
|
||||||
scopes = options.scopes or ['push', 'pull']
|
scopes = options.scopes or ['repository:%s:push,pull' % self.repo_name(namespace, repo_name)]
|
||||||
tag_names = [tag_names] if isinstance(tag_names, str) else tag_names
|
tag_names = [tag_names] if isinstance(tag_names, str) else tag_names
|
||||||
|
|
||||||
# Ping!
|
# Ping!
|
||||||
|
@ -160,64 +163,79 @@ class V2Protocol(RegistryProtocol):
|
||||||
expected_status=(404, expected_failure, V2ProtocolSteps.BLOB_HEAD_CHECK),
|
expected_status=(404, expected_failure, V2ProtocolSteps.BLOB_HEAD_CHECK),
|
||||||
headers=headers)
|
headers=headers)
|
||||||
|
|
||||||
# Start a new upload of the layer data.
|
# Check for mounting of blobs.
|
||||||
response = self.conduct(session, 'POST',
|
if options.mount_blobs and image.id in options.mount_blobs:
|
||||||
'/v2/%s/blobs/uploads/' % self.repo_name(namespace, repo_name),
|
self.conduct(session, 'POST',
|
||||||
expected_status=202,
|
'/v2/%s/blobs/uploads/' % self.repo_name(namespace, repo_name),
|
||||||
headers=headers)
|
params={
|
||||||
|
'mount': checksum,
|
||||||
upload_uuid = response.headers['Docker-Upload-UUID']
|
'from': options.mount_blobs[image.id],
|
||||||
new_upload_location = response.headers['Location']
|
},
|
||||||
assert new_upload_location.startswith('http://localhost:5000')
|
expected_status=(201, expected_failure, V2ProtocolSteps.MOUNT_BLOB),
|
||||||
|
|
||||||
# We need to make this relative just for the tests because the live server test
|
|
||||||
# case modifies the port.
|
|
||||||
location = response.headers['Location'][len('http://localhost:5000'):]
|
|
||||||
|
|
||||||
# PATCH the image data into the layer.
|
|
||||||
if options.chunks_for_upload is None:
|
|
||||||
self.conduct(session, 'PATCH', location, data=image.bytes, expected_status=204,
|
|
||||||
headers=headers)
|
headers=headers)
|
||||||
|
if expected_failure is not None:
|
||||||
|
return
|
||||||
else:
|
else:
|
||||||
# If chunked upload is requested, upload the data as a series of chunks, checking
|
# Start a new upload of the layer data.
|
||||||
# status at every point.
|
response = self.conduct(session, 'POST',
|
||||||
for chunk_data in options.chunks_for_upload:
|
'/v2/%s/blobs/uploads/' % self.repo_name(namespace, repo_name),
|
||||||
if len(chunk_data) == 3:
|
expected_status=202,
|
||||||
(start_byte, end_byte, expected_code) = chunk_data
|
headers=headers)
|
||||||
else:
|
|
||||||
(start_byte, end_byte) = chunk_data
|
|
||||||
expected_code = 204
|
|
||||||
|
|
||||||
patch_headers = {'Range': 'bytes=%s-%s' % (start_byte, end_byte)}
|
upload_uuid = response.headers['Docker-Upload-UUID']
|
||||||
patch_headers.update(headers)
|
new_upload_location = response.headers['Location']
|
||||||
|
assert new_upload_location.startswith('http://localhost:5000')
|
||||||
|
|
||||||
contents_chunk = image.bytes[start_byte:end_byte]
|
# We need to make this relative just for the tests because the live server test
|
||||||
self.conduct(session, 'PATCH', location, data=contents_chunk,
|
# case modifies the port.
|
||||||
expected_status=expected_code,
|
location = response.headers['Location'][len('http://localhost:5000'):]
|
||||||
headers=patch_headers)
|
|
||||||
if expected_code != 204:
|
|
||||||
return
|
|
||||||
|
|
||||||
# Retrieve the upload status at each point, and ensure it is valid.
|
# PATCH the image data into the layer.
|
||||||
|
if options.chunks_for_upload is None:
|
||||||
|
self.conduct(session, 'PATCH', location, data=image.bytes, expected_status=204,
|
||||||
|
headers=headers)
|
||||||
|
else:
|
||||||
|
# If chunked upload is requested, upload the data as a series of chunks, checking
|
||||||
|
# status at every point.
|
||||||
|
for chunk_data in options.chunks_for_upload:
|
||||||
|
if len(chunk_data) == 3:
|
||||||
|
(start_byte, end_byte, expected_code) = chunk_data
|
||||||
|
else:
|
||||||
|
(start_byte, end_byte) = chunk_data
|
||||||
|
expected_code = 204
|
||||||
|
|
||||||
|
patch_headers = {'Range': 'bytes=%s-%s' % (start_byte, end_byte)}
|
||||||
|
patch_headers.update(headers)
|
||||||
|
|
||||||
|
contents_chunk = image.bytes[start_byte:end_byte]
|
||||||
|
self.conduct(session, 'PATCH', location, data=contents_chunk,
|
||||||
|
expected_status=expected_code,
|
||||||
|
headers=patch_headers)
|
||||||
|
if expected_code != 204:
|
||||||
|
return
|
||||||
|
|
||||||
|
# Retrieve the upload status at each point, and ensure it is valid.
|
||||||
|
status_url = '/v2/%s/blobs/uploads/%s' % (self.repo_name(namespace, repo_name),
|
||||||
|
upload_uuid)
|
||||||
|
response = self.conduct(session, 'GET', status_url, expected_status=204,
|
||||||
|
headers=headers)
|
||||||
|
assert response.headers['Docker-Upload-UUID'] == upload_uuid
|
||||||
|
assert response.headers['Range'] == "bytes=0-%s" % end_byte
|
||||||
|
|
||||||
|
if options.cancel_blob_upload:
|
||||||
|
self.conduct(session, 'DELETE', location, params=dict(digest=checksum),
|
||||||
|
expected_status=204, headers=headers)
|
||||||
|
|
||||||
|
# Ensure the upload was canceled.
|
||||||
status_url = '/v2/%s/blobs/uploads/%s' % (self.repo_name(namespace, repo_name),
|
status_url = '/v2/%s/blobs/uploads/%s' % (self.repo_name(namespace, repo_name),
|
||||||
upload_uuid)
|
upload_uuid)
|
||||||
response = self.conduct(session, 'GET', status_url, expected_status=204, headers=headers)
|
self.conduct(session, 'GET', status_url, expected_status=404, headers=headers)
|
||||||
assert response.headers['Docker-Upload-UUID'] == upload_uuid
|
return
|
||||||
assert response.headers['Range'] == "bytes=0-%s" % end_byte
|
|
||||||
|
|
||||||
if options.cancel_blob_upload:
|
# Finish the layer upload with a PUT.
|
||||||
self.conduct(session, 'DELETE', location, params=dict(digest=checksum), expected_status=204,
|
response = self.conduct(session, 'PUT', location, params=dict(digest=checksum),
|
||||||
headers=headers)
|
expected_status=201, headers=headers)
|
||||||
|
assert response.headers['Docker-Content-Digest'] == checksum
|
||||||
# Ensure the upload was canceled.
|
|
||||||
status_url = '/v2/%s/blobs/uploads/%s' % (self.repo_name(namespace, repo_name), upload_uuid)
|
|
||||||
self.conduct(session, 'GET', status_url, expected_status=404, headers=headers)
|
|
||||||
return
|
|
||||||
|
|
||||||
# Finish the layer upload with a PUT.
|
|
||||||
response = self.conduct(session, 'PUT', location, params=dict(digest=checksum),
|
|
||||||
expected_status=201, headers=headers)
|
|
||||||
assert response.headers['Docker-Content-Digest'] == checksum
|
|
||||||
|
|
||||||
# Ensure the layer exists now.
|
# Ensure the layer exists now.
|
||||||
response = self.conduct(session, 'HEAD',
|
response = self.conduct(session, 'HEAD',
|
||||||
|
@ -258,7 +276,7 @@ class V2Protocol(RegistryProtocol):
|
||||||
def delete(self, session, namespace, repo_name, tag_names, credentials=None,
|
def delete(self, session, namespace, repo_name, tag_names, credentials=None,
|
||||||
expected_failure=None, options=None):
|
expected_failure=None, options=None):
|
||||||
options = options or ProtocolOptions()
|
options = options or ProtocolOptions()
|
||||||
scopes = options.scopes or ['*']
|
scopes = options.scopes or ['repository:%s:*' % self.repo_name(namespace, repo_name)]
|
||||||
tag_names = [tag_names] if isinstance(tag_names, str) else tag_names
|
tag_names = [tag_names] if isinstance(tag_names, str) else tag_names
|
||||||
|
|
||||||
# Ping!
|
# Ping!
|
||||||
|
@ -284,7 +302,7 @@ class V2Protocol(RegistryProtocol):
|
||||||
def pull(self, session, namespace, repo_name, tag_names, images, credentials=None,
|
def pull(self, session, namespace, repo_name, tag_names, images, credentials=None,
|
||||||
expected_failure=None, options=None):
|
expected_failure=None, options=None):
|
||||||
options = options or ProtocolOptions()
|
options = options or ProtocolOptions()
|
||||||
scopes = options.scopes or ['pull']
|
scopes = options.scopes or ['repository:%s:pull' % self.repo_name(namespace, repo_name)]
|
||||||
tag_names = [tag_names] if isinstance(tag_names, str) else tag_names
|
tag_names = [tag_names] if isinstance(tag_names, str) else tag_names
|
||||||
|
|
||||||
# Ping!
|
# Ping!
|
||||||
|
|
|
@ -51,6 +51,7 @@ class Failures(Enum):
|
||||||
UNSUPPORTED_CONTENT_TYPE = 'unsupported-content-type'
|
UNSUPPORTED_CONTENT_TYPE = 'unsupported-content-type'
|
||||||
INVALID_BLOB = 'invalid-blob'
|
INVALID_BLOB = 'invalid-blob'
|
||||||
NAMESPACE_DISABLED = 'namespace-disabled'
|
NAMESPACE_DISABLED = 'namespace-disabled'
|
||||||
|
UNAUTHORIZED_FOR_MOUNT = 'unauthorized-for-mount'
|
||||||
|
|
||||||
|
|
||||||
class ProtocolOptions(object):
|
class ProtocolOptions(object):
|
||||||
|
@ -62,6 +63,7 @@ class ProtocolOptions(object):
|
||||||
self.chunks_for_upload = None
|
self.chunks_for_upload = None
|
||||||
self.skip_head_checks = False
|
self.skip_head_checks = False
|
||||||
self.manifest_content_type = None
|
self.manifest_content_type = None
|
||||||
|
self.mount_blobs = None
|
||||||
|
|
||||||
|
|
||||||
@add_metaclass(ABCMeta)
|
@add_metaclass(ABCMeta)
|
||||||
|
|
|
@ -796,6 +796,43 @@ def test_squashed_images(use_estimates, pusher, sized_images, liveserver_session
|
||||||
tarfile.open(fileobj=tar.extractfile(tar.getmember('%s/layer.tar' % expected_image_id)))
|
tarfile.open(fileobj=tar.extractfile(tar.getmember('%s/layer.tar' % expected_image_id)))
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize('push_user, push_namespace, push_repo, mount_repo_name, expected_failure', [
|
||||||
|
# Successful mount, same namespace.
|
||||||
|
('devtable', 'devtable', 'baserepo', 'devtable/baserepo', None),
|
||||||
|
|
||||||
|
# Successful mount, cross namespace.
|
||||||
|
('devtable', 'buynlarge', 'baserepo', 'buynlarge/baserepo', None),
|
||||||
|
|
||||||
|
# Unsuccessful mount, unknown repo.
|
||||||
|
('devtable', 'devtable', 'baserepo', 'unknown/repohere', Failures.UNAUTHORIZED_FOR_MOUNT),
|
||||||
|
|
||||||
|
# Unsuccessful mount, no access.
|
||||||
|
('public', 'public', 'baserepo', 'public/baserepo', Failures.UNAUTHORIZED_FOR_MOUNT),
|
||||||
|
])
|
||||||
|
def test_blob_mounting(push_user, push_namespace, push_repo, mount_repo_name, expected_failure,
|
||||||
|
manifest_protocol, pusher, puller, basic_images, liveserver_session,
|
||||||
|
app_reloader):
|
||||||
|
# Push an image so we can attempt to mount it.
|
||||||
|
pusher.push(liveserver_session, push_namespace, push_repo, 'latest', basic_images,
|
||||||
|
credentials=(push_user, 'password'))
|
||||||
|
|
||||||
|
# Push again, trying to mount the image layer(s) from the mount repo.
|
||||||
|
options = ProtocolOptions()
|
||||||
|
options.scopes = ['repository:devtable/newrepo:push,pull',
|
||||||
|
'repository:%s:pull' % (mount_repo_name)]
|
||||||
|
options.mount_blobs = {image.id: mount_repo_name for image in basic_images}
|
||||||
|
|
||||||
|
manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images,
|
||||||
|
credentials=('devtable', 'password'),
|
||||||
|
options=options,
|
||||||
|
expected_failure=expected_failure)
|
||||||
|
|
||||||
|
if expected_failure is None:
|
||||||
|
# Pull to ensure it worked.
|
||||||
|
puller.pull(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images,
|
||||||
|
credentials=('devtable', 'password'))
|
||||||
|
|
||||||
|
|
||||||
def get_robot_password(api_caller):
|
def get_robot_password(api_caller):
|
||||||
api_caller.conduct_auth('devtable', 'password')
|
api_caller.conduct_auth('devtable', 'password')
|
||||||
resp = api_caller.get('/api/v1/organization/buynlarge/robots/ownerbot')
|
resp = api_caller.get('/api/v1/organization/buynlarge/robots/ownerbot')
|
||||||
|
|
Reference in a new issue