2caaf84f31
We will now cache the results of the catalog for 60s and not hit the database at all if cached
1018 lines
41 KiB
Python
1018 lines
41 KiB
Python
# pylint: disable=W0401, W0621, W0613, W0614, R0913
|
|
import hashlib
|
|
import tarfile
|
|
|
|
from cStringIO import StringIO
|
|
|
|
import binascii
|
|
import bencode
|
|
import resumablehashlib
|
|
|
|
from test.fixtures import *
|
|
from test.registry.liveserverfixture import *
|
|
from test.registry.fixtures import *
|
|
from test.registry.protocol_fixtures import *
|
|
|
|
from test.registry.protocols import Failures, Image, layer_bytes_for_contents, ProtocolOptions
|
|
|
|
from app import instance_keys
|
|
from data.model.tag import list_repository_tags
|
|
from util.security.registry_jwt import decode_bearer_header
|
|
from util.timedeltastring import convert_to_timedelta
|
|
|
|
|
|
def test_basic_push_pull(pusher, puller, basic_images, liveserver_session, app_reloader):
|
|
""" Test: Basic push and pull of an image to a new repository. """
|
|
credentials = ('devtable', 'password')
|
|
|
|
# Push a new repository.
|
|
pusher.push(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images,
|
|
credentials=credentials)
|
|
|
|
# Pull the repository to verify.
|
|
puller.pull(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images,
|
|
credentials=credentials)
|
|
|
|
|
|
def test_basic_push_pull_by_manifest(manifest_protocol, basic_images, liveserver_session,
|
|
app_reloader):
|
|
""" Test: Basic push and pull-by-manifest of an image to a new repository. """
|
|
credentials = ('devtable', 'password')
|
|
|
|
# Push a new repository.
|
|
result = manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images,
|
|
credentials=credentials)
|
|
|
|
# Pull the repository by digests to verify.
|
|
digests = [str(manifest.digest) for manifest in result.manifests.values()]
|
|
manifest_protocol.pull(liveserver_session, 'devtable', 'newrepo', digests, basic_images,
|
|
credentials=credentials)
|
|
|
|
|
|
def test_push_invalid_credentials(pusher, basic_images, liveserver_session, app_reloader):
|
|
""" Test: Ensure we get auth errors when trying to push with invalid credentials. """
|
|
invalid_credentials = ('devtable', 'notcorrectpassword')
|
|
|
|
pusher.push(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images,
|
|
credentials=invalid_credentials, expected_failure=Failures.UNAUTHENTICATED)
|
|
|
|
|
|
def test_pull_invalid_credentials(puller, basic_images, liveserver_session, app_reloader):
|
|
""" Test: Ensure we get auth errors when trying to pull with invalid credentials. """
|
|
invalid_credentials = ('devtable', 'notcorrectpassword')
|
|
|
|
puller.pull(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images,
|
|
credentials=invalid_credentials, expected_failure=Failures.UNAUTHENTICATED)
|
|
|
|
|
|
def test_push_pull_formerly_bad_repo_name(pusher, puller, basic_images, liveserver_session,
|
|
app_reloader):
|
|
""" Test: Basic push and pull of an image to a new repository with a name that formerly
|
|
failed. """
|
|
credentials = ('devtable', 'password')
|
|
|
|
# Push a new repository.
|
|
pusher.push(liveserver_session, 'devtable', 'foo.bar', 'latest', basic_images,
|
|
credentials=credentials)
|
|
|
|
# Pull the repository to verify.
|
|
puller.pull(liveserver_session, 'devtable', 'foo.bar', 'latest', basic_images,
|
|
credentials=credentials)
|
|
|
|
|
|
def test_application_repo(pusher, puller, basic_images, liveserver_session, app_reloader,
|
|
registry_server_executor, liveserver):
|
|
""" Test: Attempting to push or pull from an *application* repository raises a 405. """
|
|
credentials = ('devtable', 'password')
|
|
registry_server_executor.on(liveserver).create_app_repository('devtable', 'someapprepo')
|
|
|
|
# Attempt to push to the repository.
|
|
pusher.push(liveserver_session, 'devtable', 'someapprepo', 'latest', basic_images,
|
|
credentials=credentials, expected_failure=Failures.APP_REPOSITORY)
|
|
|
|
# Attempt to pull from the repository.
|
|
puller.pull(liveserver_session, 'devtable', 'someapprepo', 'latest', basic_images,
|
|
credentials=credentials, expected_failure=Failures.APP_REPOSITORY)
|
|
|
|
|
|
def test_middle_layer_different_sha(manifest_protocol, v1_protocol, liveserver_session,
|
|
app_reloader):
|
|
""" Test: Pushing of a 3-layer image with the *same* V1 ID's, but the middle layer having
|
|
different bytes, must result in new IDs being generated for the leaf layer, as
|
|
they point to different "images".
|
|
"""
|
|
credentials = ('devtable', 'password')
|
|
first_images = [
|
|
Image(id='baseimage', parent_id=None, size=None, bytes=layer_bytes_for_contents('base')),
|
|
Image(id='middleimage', parent_id='baseimage', size=None,
|
|
bytes=layer_bytes_for_contents('middle')),
|
|
Image(id='leafimage', parent_id='middleimage', size=None,
|
|
bytes=layer_bytes_for_contents('leaf')),
|
|
]
|
|
|
|
# First push and pull the images, to ensure we have the basics setup and working.
|
|
manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest', first_images,
|
|
credentials=credentials)
|
|
first_pull_result = manifest_protocol.pull(liveserver_session, 'devtable', 'newrepo', 'latest',
|
|
first_images, credentials=credentials)
|
|
first_manifest = first_pull_result.manifests['latest']
|
|
assert set([image.id for image in first_images]) == set(first_manifest.image_ids)
|
|
assert first_pull_result.image_ids['latest'] == 'leafimage'
|
|
|
|
# Next, create an image list with the middle image's *bytes* changed.
|
|
second_images = list(first_images)
|
|
second_images[1] = Image(id='middleimage', parent_id='baseimage', size=None,
|
|
bytes=layer_bytes_for_contents('different middle bytes'))
|
|
|
|
# Push and pull the image, ensuring that the produced ID for the middle and leaf layers
|
|
# are synthesized.
|
|
options = ProtocolOptions()
|
|
options.munge_shas = True
|
|
options.skip_head_checks = True
|
|
|
|
manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest', second_images,
|
|
credentials=credentials, options=options)
|
|
second_pull_result = v1_protocol.pull(liveserver_session, 'devtable', 'newrepo', 'latest',
|
|
second_images, credentials=credentials, options=options)
|
|
|
|
assert second_pull_result.image_ids['latest'] != 'leafimage'
|
|
|
|
|
|
def add_robot(api_caller, _):
|
|
api_caller.conduct_auth('devtable', 'password')
|
|
resp = api_caller.get('/api/v1/organization/buynlarge/robots/ownerbot')
|
|
return ('buynlarge+ownerbot', resp.json()['token'])
|
|
|
|
|
|
def add_token(_, executor):
|
|
return ('$token', executor.add_token().text)
|
|
|
|
|
|
@pytest.mark.parametrize('credentials, namespace, expected_performer', [
|
|
(('devtable', 'password'), 'devtable', 'devtable'),
|
|
(add_robot, 'buynlarge', 'buynlarge+ownerbot'),
|
|
(('$oauthtoken', 'test'), 'devtable', 'devtable'),
|
|
(('$app', 'test'), 'devtable', 'devtable'),
|
|
(add_token, 'devtable', None),
|
|
])
|
|
def test_push_pull_logging(credentials, namespace, expected_performer, pusher, puller, basic_images,
|
|
liveserver_session, liveserver, api_caller, app_reloader,
|
|
registry_server_executor):
|
|
""" Test: Basic push and pull, ensuring that logs are added for each operation. """
|
|
|
|
# Create the repository before the test push.
|
|
start_images = [Image(id='startimage', parent_id=None, size=None,
|
|
bytes=layer_bytes_for_contents('start image'))]
|
|
pusher.push(liveserver_session, namespace, 'newrepo', 'latest', start_images,
|
|
credentials=('devtable', 'password'))
|
|
|
|
# Retrieve the credentials to use. This must be after the repo is created, because
|
|
# some credentials creation code adds the new entity to the repository.
|
|
if not isinstance(credentials, tuple):
|
|
credentials = credentials(api_caller, registry_server_executor.on(liveserver))
|
|
|
|
# Push to the repository with the specified credentials.
|
|
pusher.push(liveserver_session, namespace, 'newrepo', 'latest', basic_images,
|
|
credentials=credentials)
|
|
|
|
# Check the logs for the push.
|
|
api_caller.conduct_auth('devtable', 'password')
|
|
|
|
result = api_caller.get('/api/v1/repository/%s/newrepo/logs' % namespace)
|
|
logs = result.json()['logs']
|
|
assert len(logs) == 2
|
|
assert logs[0]['kind'] == 'push_repo'
|
|
assert logs[0]['metadata']['namespace'] == namespace
|
|
assert logs[0]['metadata']['repo'] == 'newrepo'
|
|
|
|
if expected_performer is not None:
|
|
assert logs[0]['performer']['name'] == expected_performer
|
|
|
|
# Pull the repository to verify.
|
|
puller.pull(liveserver_session, namespace, 'newrepo', 'latest', basic_images,
|
|
credentials=credentials)
|
|
|
|
# Check the logs for the pull.
|
|
result = api_caller.get('/api/v1/repository/%s/newrepo/logs' % namespace)
|
|
logs = result.json()['logs']
|
|
assert len(logs) == 3
|
|
assert logs[0]['kind'] == 'pull_repo'
|
|
assert logs[0]['metadata']['namespace'] == namespace
|
|
assert logs[0]['metadata']['repo'] == 'newrepo'
|
|
|
|
if expected_performer is not None:
|
|
assert logs[0]['performer']['name'] == expected_performer
|
|
|
|
|
|
def test_pull_publicrepo_anonymous(pusher, puller, basic_images, liveserver_session,
|
|
app_reloader, api_caller, liveserver):
|
|
""" Test: Pull a public repository anonymously. """
|
|
# Add a new repository under the public user, so we have a repository to pull.
|
|
pusher.push(liveserver_session, 'public', 'newrepo', 'latest', basic_images,
|
|
credentials=('public', 'password'))
|
|
|
|
# First try to pull the (currently private) repo anonymously, which should fail (since it is
|
|
# private)
|
|
puller.pull(liveserver_session, 'public', 'newrepo', 'latest', basic_images,
|
|
expected_failure=Failures.UNAUTHORIZED)
|
|
|
|
# Using a non-public user should also fail.
|
|
puller.pull(liveserver_session, 'public', 'newrepo', 'latest', basic_images,
|
|
credentials=('devtable', 'password'),
|
|
expected_failure=Failures.UNAUTHORIZED)
|
|
|
|
# Make the repository public.
|
|
api_caller.conduct_auth('public', 'password')
|
|
api_caller.change_repo_visibility('public', 'newrepo', 'public')
|
|
|
|
# Pull the repository anonymously, which should succeed because the repository is public.
|
|
puller.pull(liveserver_session, 'public', 'newrepo', 'latest', basic_images)
|
|
|
|
|
|
def test_pull_publicrepo_no_anonymous_access(pusher, puller, basic_images, liveserver_session,
|
|
app_reloader, api_caller, liveserver,
|
|
registry_server_executor):
|
|
""" Test: Attempts to pull a public repository anonymously, with the feature flag disabled. """
|
|
# Add a new repository under the public user, so we have a repository to pull.
|
|
pusher.push(liveserver_session, 'public', 'newrepo', 'latest', basic_images,
|
|
credentials=('public', 'password'))
|
|
|
|
# First try to pull the (currently private) repo anonymously, which should fail (since it is
|
|
# private)
|
|
puller.pull(liveserver_session, 'public', 'newrepo', 'latest', basic_images,
|
|
expected_failure=Failures.UNAUTHORIZED)
|
|
|
|
# Using a non-public user should also fail.
|
|
puller.pull(liveserver_session, 'public', 'newrepo', 'latest', basic_images,
|
|
credentials=('devtable', 'password'),
|
|
expected_failure=Failures.UNAUTHORIZED)
|
|
|
|
# Make the repository public.
|
|
api_caller.conduct_auth('public', 'password')
|
|
api_caller.change_repo_visibility('public', 'newrepo', 'public')
|
|
|
|
with FeatureFlagValue('ANONYMOUS_ACCESS', False, registry_server_executor.on(liveserver)):
|
|
# Attempt again to pull the (now public) repo anonymously, which should fail since
|
|
# the feature flag for anonymous access is turned off.
|
|
puller.pull(liveserver_session, 'public', 'newrepo', 'latest', basic_images,
|
|
expected_failure=Failures.ANONYMOUS_NOT_ALLOWED)
|
|
|
|
# Using a non-public user should now succeed.
|
|
puller.pull(liveserver_session, 'public', 'newrepo', 'latest', basic_images,
|
|
credentials=('devtable', 'password'))
|
|
|
|
|
|
def test_basic_organization_flow(pusher, puller, basic_images, liveserver_session, app_reloader):
|
|
""" Test: Basic push and pull of an image to a new repository by members of an organization. """
|
|
# Add a new repository under the organization via the creator user.
|
|
pusher.push(liveserver_session, 'buynlarge', 'newrepo', 'latest', basic_images,
|
|
credentials=('creator', 'password'))
|
|
|
|
# Ensure that the creator can pull it.
|
|
puller.pull(liveserver_session, 'buynlarge', 'newrepo', 'latest', basic_images,
|
|
credentials=('creator', 'password'))
|
|
|
|
# Ensure that the admin can pull it.
|
|
puller.pull(liveserver_session, 'buynlarge', 'newrepo', 'latest', basic_images,
|
|
credentials=('devtable', 'password'))
|
|
|
|
# Ensure that the reader *cannot* pull it.
|
|
puller.pull(liveserver_session, 'buynlarge', 'newrepo', 'latest', basic_images,
|
|
credentials=('reader', 'password'),
|
|
expected_failure=Failures.UNAUTHORIZED)
|
|
|
|
|
|
def test_library_support(pusher, puller, basic_images, liveserver_session, app_reloader):
|
|
""" Test: Pushing and pulling from the implicit library namespace. """
|
|
credentials = ('devtable', 'password')
|
|
|
|
# Push a new repository.
|
|
pusher.push(liveserver_session, '', 'newrepo', 'latest', basic_images,
|
|
credentials=credentials)
|
|
|
|
# Pull the repository to verify.
|
|
puller.pull(liveserver_session, '', 'newrepo', 'latest', basic_images,
|
|
credentials=credentials)
|
|
|
|
# Pull the repository from the library namespace to verify.
|
|
puller.pull(liveserver_session, 'library', 'newrepo', 'latest', basic_images,
|
|
credentials=credentials)
|
|
|
|
|
|
def test_library_namespace_with_support_disabled(pusher, puller, basic_images, liveserver_session,
|
|
app_reloader, liveserver,
|
|
registry_server_executor):
|
|
""" Test: Pushing and pulling from the explicit library namespace, even when the
|
|
implicit one is disabled.
|
|
"""
|
|
credentials = ('devtable', 'password')
|
|
|
|
with FeatureFlagValue('LIBRARY_SUPPORT', False, registry_server_executor.on(liveserver)):
|
|
# Push a new repository.
|
|
pusher.push(liveserver_session, 'library', 'newrepo', 'latest', basic_images,
|
|
credentials=credentials)
|
|
|
|
# Pull the repository from the library namespace to verify.
|
|
puller.pull(liveserver_session, 'library', 'newrepo', 'latest', basic_images,
|
|
credentials=credentials)
|
|
|
|
|
|
def test_push_library_with_support_disabled(pusher, basic_images, liveserver_session,
|
|
app_reloader, liveserver,
|
|
registry_server_executor):
|
|
""" Test: Pushing to the implicit library namespace, when disabled,
|
|
should fail.
|
|
"""
|
|
credentials = ('devtable', 'password')
|
|
|
|
with FeatureFlagValue('LIBRARY_SUPPORT', False, registry_server_executor.on(liveserver)):
|
|
# Attempt to push a new repository.
|
|
pusher.push(liveserver_session, '', 'newrepo', 'latest', basic_images,
|
|
credentials=credentials,
|
|
expected_failure=Failures.DISALLOWED_LIBRARY_NAMESPACE)
|
|
|
|
|
|
def test_pull_library_with_support_disabled(puller, basic_images, liveserver_session,
|
|
app_reloader, liveserver,
|
|
registry_server_executor):
|
|
""" Test: Pushing to the implicit library namespace, when disabled,
|
|
should fail.
|
|
"""
|
|
credentials = ('devtable', 'password')
|
|
|
|
with FeatureFlagValue('LIBRARY_SUPPORT', False, registry_server_executor.on(liveserver)):
|
|
# Attempt to pull the repository from the library namespace.
|
|
puller.pull(liveserver_session, '', 'newrepo', 'latest', basic_images,
|
|
credentials=credentials,
|
|
expected_failure=Failures.DISALLOWED_LIBRARY_NAMESPACE)
|
|
|
|
|
|
def test_image_replication(pusher, basic_images, liveserver_session, app_reloader, liveserver,
|
|
registry_server_executor):
|
|
""" Test: Ensure that entries are created for replication of the images pushed. """
|
|
credentials = ('devtable', 'password')
|
|
|
|
with FeatureFlagValue('STORAGE_REPLICATION', True, registry_server_executor.on(liveserver)):
|
|
pusher.push(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images,
|
|
credentials=credentials)
|
|
|
|
# Ensure that entries were created for each image.
|
|
for image in basic_images:
|
|
r = registry_server_executor.on(liveserver).get_storage_replication_entry(image.id)
|
|
assert r.text == 'OK'
|
|
|
|
|
|
def test_push_reponame_with_slashes(pusher, basic_images, liveserver_session, app_reloader):
|
|
""" Test: Attempt to add a repository name with slashes. This should fail as we do not
|
|
support it.
|
|
"""
|
|
credentials = ('devtable', 'password')
|
|
|
|
pusher.push(liveserver_session, 'devtable', 'some/slash', 'latest', basic_images,
|
|
credentials=credentials,
|
|
expected_failure=Failures.INVALID_REPOSITORY)
|
|
|
|
|
|
@pytest.mark.parametrize('tag_name, expected_failure', [
|
|
('l', None),
|
|
('1', None),
|
|
('x' * 128, None),
|
|
|
|
('', Failures.MISSING_TAG),
|
|
('x' * 129, Failures.INVALID_TAG),
|
|
('.fail', Failures.INVALID_TAG),
|
|
('-fail', Failures.INVALID_TAG),
|
|
])
|
|
def test_tag_validaton(tag_name, expected_failure, pusher, basic_images, liveserver_session,
|
|
app_reloader):
|
|
""" Test: Various forms of tags and whether they succeed or fail as expected. """
|
|
credentials = ('devtable', 'password')
|
|
|
|
pusher.push(liveserver_session, 'devtable', 'newrepo', tag_name, basic_images,
|
|
credentials=credentials,
|
|
expected_failure=expected_failure)
|
|
|
|
|
|
def test_invalid_parent(pusher, liveserver_session, app_reloader):
|
|
""" Test: Attempt to push an image with an invalid/missing parent. """
|
|
images = [
|
|
Image(id='childimage', parent_id='parentimage', size=None,
|
|
bytes=layer_bytes_for_contents('child')),
|
|
]
|
|
|
|
credentials = ('devtable', 'password')
|
|
|
|
pusher.push(liveserver_session, 'devtable', 'newrepo', 'latest', images,
|
|
credentials=credentials,
|
|
expected_failure=Failures.INVALID_IMAGES)
|
|
|
|
|
|
def test_wrong_image_order(pusher, liveserver_session, app_reloader):
|
|
""" Test: Attempt to push an image with its layers in the wrong order. """
|
|
images = [
|
|
Image(id='childimage', parent_id='parentimage', size=None,
|
|
bytes=layer_bytes_for_contents('child')),
|
|
Image(id='parentimage', parent_id=None, size=None,
|
|
bytes=layer_bytes_for_contents('parent')),
|
|
]
|
|
|
|
credentials = ('devtable', 'password')
|
|
|
|
pusher.push(liveserver_session, 'devtable', 'newrepo', 'latest', images,
|
|
credentials=credentials,
|
|
expected_failure=Failures.INVALID_IMAGES)
|
|
|
|
|
|
@pytest.mark.parametrize('labels', [
|
|
# Basic labels.
|
|
[('foo', 'bar', 'text/plain'), ('baz', 'meh', 'text/plain')],
|
|
|
|
# Theoretically invalid, but allowed when pushed via registry protocol.
|
|
[('theoretically-invalid--label', 'foo', 'text/plain')],
|
|
|
|
# JSON label.
|
|
[('somejson', '{"some": "json"}', 'application/json'), ('plain', '', 'text/plain')],
|
|
|
|
# JSON-esque (but not valid JSON) labels.
|
|
[('foo', '[hello world]', 'text/plain'), ('bar', '{wassup?!}', 'text/plain')],
|
|
])
|
|
def test_labels(labels, manifest_protocol, liveserver_session, api_caller, app_reloader):
|
|
""" Test: Image pushed with labels has those labels found in the database after the
|
|
push succeeds.
|
|
"""
|
|
images = [
|
|
Image(id='theimage', parent_id=None, bytes=layer_bytes_for_contents('image'),
|
|
config={'Labels': {key: value for (key, value, _) in labels}}),
|
|
]
|
|
|
|
credentials = ('devtable', 'password')
|
|
result = manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest', images,
|
|
credentials=credentials)
|
|
|
|
digest = result.manifests['latest'].digest
|
|
api_caller.conduct_auth('devtable', 'password')
|
|
|
|
data = api_caller.get('/api/v1/repository/devtable/newrepo/manifest/%s/labels' % digest).json()
|
|
labels_found = data['labels']
|
|
assert len(labels_found) == len(labels)
|
|
|
|
labels_found_map = {l['key']: l for l in labels_found}
|
|
assert set(images[0].config['Labels'].keys()) == set(labels_found_map.keys())
|
|
|
|
for key, _, media_type in labels:
|
|
assert labels_found_map[key]['source_type'] == 'manifest'
|
|
assert labels_found_map[key]['media_type'] == media_type
|
|
|
|
|
|
@pytest.mark.parametrize('label_value, expected_expiration', [
|
|
('1d', True),
|
|
('1h', True),
|
|
('2w', True),
|
|
|
|
('1g', False),
|
|
('something', False),
|
|
])
|
|
def test_expiration_label(label_value, expected_expiration, manifest_protocol, liveserver_session,
|
|
api_caller, app_reloader):
|
|
""" Test: Tag pushed with a valid `quay.expires-after` will have its expiration set to its
|
|
start time plus the duration specified. If the duration is invalid, no expiration will
|
|
be set.
|
|
"""
|
|
images = [
|
|
Image(id='theimage', parent_id=None, bytes=layer_bytes_for_contents('image'),
|
|
config={'Labels': {'quay.expires-after': label_value}}),
|
|
]
|
|
|
|
credentials = ('devtable', 'password')
|
|
manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest', images,
|
|
credentials=credentials)
|
|
|
|
api_caller.conduct_auth('devtable', 'password')
|
|
|
|
tag_data = api_caller.get('/api/v1/repository/devtable/newrepo/tag').json()['tags'][0]
|
|
if expected_expiration:
|
|
diff = convert_to_timedelta(label_value).total_seconds()
|
|
assert tag_data['end_ts'] == tag_data['start_ts'] + diff
|
|
else:
|
|
assert tag_data.get('end_ts') is None
|
|
|
|
|
|
@pytest.mark.parametrize('content_type', [
|
|
'application/vnd.oci.image.manifest.v1+json',
|
|
'application/vnd.docker.distribution.manifest.v2+json',
|
|
'application/vnd.foo.bar',
|
|
])
|
|
def test_unsupported_manifest_content_type(content_type, manifest_protocol, basic_images,
|
|
liveserver_session, app_reloader):
|
|
""" Test: Attempt to push a manifest with an unsupported media type. """
|
|
credentials = ('devtable', 'password')
|
|
|
|
options = ProtocolOptions()
|
|
options.manifest_content_type = content_type
|
|
|
|
# Attempt to push a new repository.
|
|
manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images,
|
|
credentials=credentials,
|
|
options=options,
|
|
expected_failure=Failures.UNSUPPORTED_CONTENT_TYPE)
|
|
|
|
|
|
def test_invalid_blob_reference(manifest_protocol, basic_images, liveserver_session, app_reloader):
|
|
""" Test: Attempt to push a manifest with an invalid blob reference. """
|
|
credentials = ('devtable', 'password')
|
|
|
|
options = ProtocolOptions()
|
|
options.manifest_invalid_blob_references = True
|
|
|
|
# Attempt to push a new repository.
|
|
manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images,
|
|
credentials=credentials,
|
|
options=options,
|
|
expected_failure=Failures.INVALID_BLOB)
|
|
|
|
|
|
def test_delete_tag(manifest_protocol, puller, basic_images, liveserver_session,
|
|
app_reloader):
|
|
""" Test: Push a repository, delete a tag, and attempt to pull. """
|
|
credentials = ('devtable', 'password')
|
|
|
|
# Push the tags.
|
|
result = manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', ['one', 'two'],
|
|
basic_images, credentials=credentials)
|
|
|
|
# Delete tag `one` by digest.
|
|
manifest_protocol.delete(liveserver_session, 'devtable', 'newrepo',
|
|
result.manifests['one'].digest,
|
|
credentials=credentials)
|
|
|
|
# Attempt to pull tag `one` and ensure it doesn't work.
|
|
puller.pull(liveserver_session, 'devtable', 'newrepo', 'one', basic_images,
|
|
credentials=credentials,
|
|
expected_failure=Failures.UNKNOWN_TAG)
|
|
|
|
# Pull tag `two` to verify it works.
|
|
puller.pull(liveserver_session, 'devtable', 'newrepo', 'two', basic_images,
|
|
credentials=credentials)
|
|
|
|
|
|
def test_cancel_upload(manifest_protocol, basic_images, liveserver_session, app_reloader):
|
|
""" Test: Cancelation of blob uploads. """
|
|
credentials = ('devtable', 'password')
|
|
|
|
options = ProtocolOptions()
|
|
options.cancel_blob_upload = True
|
|
|
|
manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest',
|
|
basic_images, credentials=credentials)
|
|
|
|
|
|
def test_blob_caching(manifest_protocol, basic_images, liveserver_session, app_reloader,
|
|
liveserver, registry_server_executor):
|
|
""" Test: Pulling of blobs after initially pulled will result in the blobs being cached. """
|
|
credentials = ('devtable', 'password')
|
|
|
|
# Push a tag.
|
|
result = manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest',
|
|
basic_images, credentials=credentials)
|
|
|
|
|
|
# Conduct the initial pull to prime the cache.
|
|
manifest_protocol.pull(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images,
|
|
credentials=credentials)
|
|
|
|
# Disconnect the server from the database.
|
|
registry_server_executor.on(liveserver).break_database()
|
|
|
|
# Pull each blob, which should succeed due to caching. If caching is broken, this will
|
|
# fail when it attempts to hit the database.
|
|
for layer in result.manifests['latest'].layers:
|
|
blob_id = str(layer.digest)
|
|
r = liveserver_session.get('/v2/devtable/newrepo/blobs/%s' % blob_id, headers=result.headers)
|
|
assert r.status_code == 200
|
|
|
|
|
|
@pytest.mark.parametrize('chunks', [
|
|
# Two chunks.
|
|
[(0, 100), (100, None)],
|
|
|
|
# Multiple chunks.
|
|
[(0, 10), (10, 20), (20, None)],
|
|
[(0, 10), (10, 20), (20, 30), (30, 40), (40, 50), (50, None)],
|
|
|
|
# Overlapping chunks.
|
|
[(0, 1024), (10, None)],
|
|
])
|
|
def test_chunked_blob_uploading(chunks, random_layer_data, manifest_protocol, puller,
|
|
liveserver_session, app_reloader):
|
|
""" Test: Uploading of blobs as chunks. """
|
|
credentials = ('devtable', 'password')
|
|
|
|
adjusted_chunks = []
|
|
for start_byte, end_byte in chunks:
|
|
adjusted_chunks.append((start_byte, end_byte if end_byte else len(random_layer_data)))
|
|
|
|
images = [
|
|
Image(id='theimage', parent_id=None, bytes=random_layer_data),
|
|
]
|
|
|
|
options = ProtocolOptions()
|
|
options.chunks_for_upload = adjusted_chunks
|
|
|
|
# Push the image, using the specified chunking.
|
|
manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest',
|
|
images, credentials=credentials, options=options)
|
|
|
|
# Pull to verify the image was created.
|
|
puller.pull(liveserver_session, 'devtable', 'newrepo', 'latest', images,
|
|
credentials=credentials)
|
|
|
|
|
|
def test_chunked_uploading_mismatched_chunks(manifest_protocol, random_layer_data,
|
|
liveserver_session, app_reloader):
|
|
""" Test: Attempt to upload chunks with data missing. """
|
|
credentials = ('devtable', 'password')
|
|
|
|
images = [
|
|
Image(id='theimage', parent_id=None, bytes=random_layer_data),
|
|
]
|
|
|
|
# Note: Byte #100 is missing.
|
|
options = ProtocolOptions()
|
|
options.chunks_for_upload = [(0, 100), (101, len(random_layer_data), 416)]
|
|
|
|
# Attempt to push, with the chunked upload failing.
|
|
manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest',
|
|
images, credentials=credentials, options=options)
|
|
|
|
|
|
def test_pull_disabled_namespace(pusher, puller, basic_images, liveserver_session,
|
|
app_reloader, liveserver, registry_server_executor):
|
|
""" Test: Attempt to pull a repository from a disabled namespace results in an error. """
|
|
credentials = ('devtable', 'password')
|
|
|
|
# Push a new repository.
|
|
pusher.push(liveserver_session, 'buynlarge', 'someneworgrepo', 'latest', basic_images,
|
|
credentials=credentials)
|
|
|
|
# Disable the namespace.
|
|
registry_server_executor.on(liveserver).disable_namespace('buynlarge')
|
|
|
|
# Attempt to pull, which should fail.
|
|
puller.pull(liveserver_session, 'buynlarge', 'someneworgrepo', 'latest', basic_images,
|
|
credentials=credentials, expected_failure=Failures.NAMESPACE_DISABLED)
|
|
|
|
|
|
def test_push_disabled_namespace(pusher, basic_images, liveserver_session,
|
|
app_reloader, liveserver, registry_server_executor):
|
|
""" Test: Attempt to push a repository from a disabled namespace results in an error. """
|
|
credentials = ('devtable', 'password')
|
|
|
|
# Disable the namespace.
|
|
registry_server_executor.on(liveserver).disable_namespace('buynlarge')
|
|
|
|
# Attempt to push, which should fail.
|
|
pusher.push(liveserver_session, 'buynlarge', 'someneworgrepo', 'latest', basic_images,
|
|
credentials=credentials, expected_failure=Failures.NAMESPACE_DISABLED)
|
|
|
|
|
|
def test_private_catalog_no_access(v2_protocol, liveserver_session, app_reloader, liveserver,
|
|
registry_server_executor):
|
|
""" Test: Ensure that accessing a private catalog with anonymous access results in no database
|
|
connections.
|
|
"""
|
|
with FeatureFlagValue('PUBLIC_CATALOG', False, registry_server_executor.on(liveserver)):
|
|
# Disconnect the server from the database.
|
|
registry_server_executor.on(liveserver).break_database()
|
|
|
|
results = v2_protocol.catalog(liveserver_session)
|
|
assert not results
|
|
|
|
|
|
@pytest.mark.parametrize('public_catalog, credentials, expected_repos', [
|
|
# No public access and no credentials => No results.
|
|
(False, None, None),
|
|
|
|
# Public access and no credentials => public repositories.
|
|
(True, None, ['public/publicrepo']),
|
|
|
|
# Private creds => private repositories.
|
|
(False, ('devtable', 'password'), ['devtable/simple', 'devtable/complex', 'devtable/gargantuan']),
|
|
(True, ('devtable', 'password'), ['devtable/simple', 'devtable/complex', 'devtable/gargantuan']),
|
|
])
|
|
@pytest.mark.parametrize('page_size', [
|
|
1,
|
|
2,
|
|
10,
|
|
50,
|
|
100,
|
|
])
|
|
def test_catalog(public_catalog, credentials, expected_repos, page_size, v2_protocol,
|
|
liveserver_session, app_reloader, liveserver, registry_server_executor):
|
|
""" Test: Retrieving results from the V2 catalog. """
|
|
with FeatureFlagValue('PUBLIC_CATALOG', public_catalog, registry_server_executor.on(liveserver)):
|
|
results = v2_protocol.catalog(liveserver_session, page_size=page_size, credentials=credentials,
|
|
namespace='devtable', repo_name='simple')
|
|
|
|
if expected_repos is None:
|
|
assert len(results) == 0
|
|
else:
|
|
assert set(expected_repos).issubset(set(results))
|
|
|
|
|
|
def test_catalog_caching(v2_protocol, basic_images, liveserver_session, app_reloader,
|
|
liveserver, registry_server_executor):
|
|
""" Test: Calling the catalog after initially pulled will result in the catalog being cached. """
|
|
credentials = ('devtable', 'password')
|
|
|
|
# Conduct the initial catalog call to prime the cache.
|
|
results = v2_protocol.catalog(liveserver_session, credentials=credentials,
|
|
namespace='devtable', repo_name='simple')
|
|
|
|
token, _ = v2_protocol.auth(liveserver_session, credentials, 'devtable', 'simple')
|
|
|
|
# Disconnect the server from the database.
|
|
registry_server_executor.on(liveserver).break_database()
|
|
|
|
# Call the catalog again, which should now be cached.
|
|
cached_results = v2_protocol.catalog(liveserver_session, bearer_token=token)
|
|
assert len(cached_results) == len(results)
|
|
assert set(cached_results) == set(results)
|
|
|
|
|
|
@pytest.mark.parametrize('username, namespace, repository', [
|
|
('devtable', 'devtable', 'simple'),
|
|
('devtable', 'devtable', 'gargantuan'),
|
|
('public', 'public', 'publicrepo'),
|
|
('devtable', 'buynlarge', 'orgrepo'),
|
|
])
|
|
@pytest.mark.parametrize('page_size', [
|
|
1,
|
|
2,
|
|
10,
|
|
50,
|
|
100,
|
|
])
|
|
def test_tags(username, namespace, repository, page_size, v2_protocol, liveserver_session,
|
|
app_reloader, liveserver, registry_server_executor):
|
|
""" Test: Retrieving results from the V2 catalog. """
|
|
credentials = (username, 'password')
|
|
results = v2_protocol.tags(liveserver_session, page_size=page_size, credentials=credentials,
|
|
namespace=namespace, repo_name=repository)
|
|
|
|
expected_tags = [tag.name for tag in list_repository_tags(namespace, repository)]
|
|
assert len(results) == len(expected_tags)
|
|
assert set([r for r in results]) == set(expected_tags)
|
|
|
|
|
|
def test_pull_torrent(pusher, basic_images, liveserver_session, liveserver,
|
|
registry_server_executor, app_reloader):
|
|
""" Test: Retrieve a torrent for pulling the image via the Quay CLI. """
|
|
credentials = ('devtable', 'password')
|
|
|
|
# Push an image to download.
|
|
pusher.push(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images,
|
|
credentials=credentials)
|
|
|
|
# Required for torrent.
|
|
registry_server_executor.on(liveserver).set_supports_direct_download(True)
|
|
|
|
# For each layer, retrieve a torrent for the blob.
|
|
for image in basic_images:
|
|
digest = 'sha256:' + hashlib.sha256(image.bytes).hexdigest()
|
|
response = liveserver_session.get('/c1/torrent/devtable/newrepo/blobs/%s' % digest,
|
|
auth=credentials)
|
|
torrent_info = bencode.bdecode(response.content)
|
|
|
|
# Check the announce URL.
|
|
assert torrent_info['url-list'] == 'http://somefakeurl?goes=here'
|
|
|
|
# Check the metadata.
|
|
assert torrent_info.get('info', {}).get('pieces') is not None
|
|
assert torrent_info.get('announce') is not None
|
|
|
|
# Check the pieces.
|
|
sha = resumablehashlib.sha1()
|
|
sha.update(image.bytes)
|
|
|
|
expected = binascii.hexlify(sha.digest())
|
|
found = binascii.hexlify(torrent_info['info']['pieces'])
|
|
assert expected == found
|
|
|
|
|
|
@pytest.mark.parametrize('use_estimates', [
|
|
False,
|
|
True,
|
|
])
|
|
def test_squashed_images(use_estimates, pusher, sized_images, liveserver_session,
|
|
liveserver, registry_server_executor, app_reloader):
|
|
""" Test: Pulling of squashed images. """
|
|
credentials = ('devtable', 'password')
|
|
|
|
# Push an image to download.
|
|
pusher.push(liveserver_session, 'devtable', 'newrepo', 'latest', sized_images,
|
|
credentials=credentials)
|
|
|
|
if use_estimates:
|
|
# Clear the uncompressed size stored for the images, to ensure that we estimate instead.
|
|
for image in sized_images:
|
|
registry_server_executor.on(liveserver).clear_uncompressed_size(image.id)
|
|
|
|
# Pull the squashed version.
|
|
response = liveserver_session.get('/c1/squash/devtable/newrepo/latest', auth=credentials)
|
|
tar = tarfile.open(fileobj=StringIO(response.content))
|
|
|
|
# Verify the squashed image.
|
|
expected_image_id = '13a2d9711a3e242fcd50a7627c02d86901ac801b78dcea3147e8ff640078de52'
|
|
expected_names = ['repositories',
|
|
expected_image_id,
|
|
'%s/json' % expected_image_id,
|
|
'%s/VERSION' % expected_image_id,
|
|
'%s/layer.tar' % expected_image_id]
|
|
|
|
assert tar.getnames() == expected_names
|
|
|
|
# Verify the JSON image data.
|
|
json_data = (tar.extractfile(tar.getmember('%s/json' % expected_image_id)).read())
|
|
|
|
# Ensure the JSON loads and parses.
|
|
result = json.loads(json_data)
|
|
assert result['id'] == expected_image_id
|
|
|
|
# Ensure that squashed layer tar can be opened.
|
|
tarfile.open(fileobj=tar.extractfile(tar.getmember('%s/layer.tar' % expected_image_id)))
|
|
|
|
|
|
@pytest.mark.parametrize('push_user, push_namespace, push_repo, mount_repo_name, expected_failure', [
|
|
# Successful mount, same namespace.
|
|
('devtable', 'devtable', 'baserepo', 'devtable/baserepo', None),
|
|
|
|
# Successful mount, cross namespace.
|
|
('devtable', 'buynlarge', 'baserepo', 'buynlarge/baserepo', None),
|
|
|
|
# Unsuccessful mount, unknown repo.
|
|
('devtable', 'devtable', 'baserepo', 'unknown/repohere', Failures.UNAUTHORIZED_FOR_MOUNT),
|
|
|
|
# Unsuccessful mount, no access.
|
|
('public', 'public', 'baserepo', 'public/baserepo', Failures.UNAUTHORIZED_FOR_MOUNT),
|
|
])
|
|
def test_blob_mounting(push_user, push_namespace, push_repo, mount_repo_name, expected_failure,
|
|
manifest_protocol, pusher, puller, basic_images, liveserver_session,
|
|
app_reloader):
|
|
# Push an image so we can attempt to mount it.
|
|
pusher.push(liveserver_session, push_namespace, push_repo, 'latest', basic_images,
|
|
credentials=(push_user, 'password'))
|
|
|
|
# Push again, trying to mount the image layer(s) from the mount repo.
|
|
options = ProtocolOptions()
|
|
options.scopes = ['repository:devtable/newrepo:push,pull',
|
|
'repository:%s:pull' % (mount_repo_name)]
|
|
options.mount_blobs = {image.id: mount_repo_name for image in basic_images}
|
|
|
|
manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images,
|
|
credentials=('devtable', 'password'),
|
|
options=options,
|
|
expected_failure=expected_failure)
|
|
|
|
if expected_failure is None:
|
|
# Pull to ensure it worked.
|
|
puller.pull(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images,
|
|
credentials=('devtable', 'password'))
|
|
|
|
|
|
def get_robot_password(api_caller):
|
|
api_caller.conduct_auth('devtable', 'password')
|
|
resp = api_caller.get('/api/v1/organization/buynlarge/robots/ownerbot')
|
|
return resp.json()['token']
|
|
|
|
|
|
def get_encrypted_password(api_caller):
|
|
api_caller.conduct_auth('devtable', 'password')
|
|
resp = api_caller.post('/api/v1/user/clientkey', data=json.dumps(dict(password='password')),
|
|
headers={'Content-Type': 'application/json'})
|
|
return resp.json()['key']
|
|
|
|
|
|
@pytest.mark.parametrize('username, password, expect_success', [
|
|
# Invalid username.
|
|
('invaliduser', 'somepassword', False),
|
|
|
|
# Invalid password.
|
|
('devtable', 'invalidpassword', False),
|
|
|
|
# Invalid OAuth token.
|
|
('$oauthtoken', 'unknown', False),
|
|
|
|
# Invalid CLI token.
|
|
('$app', 'invalid', False),
|
|
|
|
# Valid.
|
|
('devtable', 'password', True),
|
|
|
|
# Robot.
|
|
('buynlarge+ownerbot', get_robot_password, True),
|
|
|
|
# Encrypted password.
|
|
('devtable', get_encrypted_password, True),
|
|
|
|
# OAuth.
|
|
('$oauthtoken', 'test', True),
|
|
|
|
# CLI Token.
|
|
('$app', 'test', True),
|
|
])
|
|
def test_login(username, password, expect_success, loginer, liveserver_session,
|
|
api_caller, app_reloader):
|
|
""" Test: Login flow. """
|
|
if not isinstance(password, str):
|
|
password = password(api_caller)
|
|
|
|
loginer.login(liveserver_session, username, password, [], expect_success)
|
|
|
|
|
|
@pytest.mark.parametrize('username, password, scopes, expected_access, expect_success', [
|
|
# No scopes.
|
|
('devtable', 'password', [], [], True),
|
|
|
|
# Basic pull.
|
|
('devtable', 'password', ['repository:devtable/simple:pull'], [
|
|
{'type': 'repository', 'name': 'devtable/simple', 'actions': ['pull']},
|
|
], True),
|
|
|
|
# Basic push.
|
|
('devtable', 'password', ['repository:devtable/simple:push'], [
|
|
{'type': 'repository', 'name': 'devtable/simple', 'actions': ['push']},
|
|
], True),
|
|
|
|
# Basic push/pull.
|
|
('devtable', 'password', ['repository:devtable/simple:push,pull'], [
|
|
{'type': 'repository', 'name': 'devtable/simple', 'actions': ['push', 'pull']},
|
|
], True),
|
|
|
|
# Admin.
|
|
('devtable', 'password', ['repository:devtable/simple:push,pull,*'], [
|
|
{'type': 'repository', 'name': 'devtable/simple', 'actions': ['push', 'pull', '*']},
|
|
], True),
|
|
|
|
# Basic pull with endpoint.
|
|
('devtable', 'password', ['repository:localhost:5000/devtable/simple:pull'], [
|
|
{'type': 'repository', 'name': 'localhost:5000/devtable/simple', 'actions': ['pull']},
|
|
], True),
|
|
|
|
# Basic pull with invalid endpoint.
|
|
('devtable', 'password', ['repository:someinvalid/devtable/simple:pull'], [], False),
|
|
|
|
# Pull with no access.
|
|
('public', 'password', ['repository:devtable/simple:pull'], [
|
|
{'type': 'repository', 'name': 'devtable/simple', 'actions': []},
|
|
], True),
|
|
|
|
# Anonymous push and pull on a private repository.
|
|
('', '', ['repository:devtable/simple:pull,push'], [
|
|
{'type': 'repository', 'name': 'devtable/simple', 'actions': []},
|
|
], True),
|
|
|
|
# Pull and push with no push access.
|
|
('reader', 'password', ['repository:buynlarge/orgrepo:pull,push'], [
|
|
{'type': 'repository', 'name': 'buynlarge/orgrepo', 'actions': ['pull']},
|
|
], True),
|
|
|
|
# OAuth.
|
|
('$oauthtoken', 'test', ['repository:public/publicrepo:pull,push'], [
|
|
{'type': 'repository', 'name': 'public/publicrepo', 'actions': ['pull']},
|
|
], True),
|
|
|
|
# Anonymous public repo.
|
|
('', '', ['repository:public/publicrepo:pull,push'], [
|
|
{'type': 'repository', 'name': 'public/publicrepo', 'actions': ['pull']},
|
|
], True),
|
|
|
|
# Multiple scopes.
|
|
('devtable', 'password',
|
|
['repository:devtable/simple:push,pull,*', 'repository:buynlarge/orgrepo:pull'], [
|
|
{'type': 'repository', 'name': 'devtable/simple', 'actions': ['push', 'pull', '*']},
|
|
{'type': 'repository', 'name': 'buynlarge/orgrepo', 'actions': ['pull']},
|
|
], True),
|
|
|
|
# Multiple scopes.
|
|
('devtable', 'password',
|
|
['repository:devtable/simple:push,pull,*', 'repository:public/publicrepo:push,pull'], [
|
|
{'type': 'repository', 'name': 'devtable/simple', 'actions': ['push', 'pull', '*']},
|
|
{'type': 'repository', 'name': 'public/publicrepo', 'actions': ['pull']},
|
|
], True),
|
|
])
|
|
def test_login_scopes(username, password, scopes, expected_access, expect_success, v2_protocol,
|
|
liveserver_session, api_caller, app_reloader):
|
|
""" Test: Login via the V2 auth protocol reacts correctly to requested scopes. """
|
|
if not isinstance(password, str):
|
|
password = password(api_caller)
|
|
|
|
response = v2_protocol.login(liveserver_session, username, password, scopes, expect_success)
|
|
if not expect_success:
|
|
return
|
|
|
|
# Validate the returned token.
|
|
encoded = response.json()['token']
|
|
payload = decode_bearer_header('Bearer ' + encoded, instance_keys,
|
|
{'SERVER_HOSTNAME': 'localhost:5000'})
|
|
assert payload is not None
|
|
assert payload['access'] == expected_access
|