This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/test/registry/registry_tests.py
Joseph Schorr f86c087b3b Prevent registry operations against disabled namespaces
Allows admins to completely wall off a namespace by disabling it

Fixes https://jira.coreos.com/browse/QUAY-869
2018-05-22 18:36:04 -04:00

922 lines
37 KiB
Python

# pylint: disable=W0401, W0621, W0613, W0614, R0913
import hashlib
import tarfile
from cStringIO import StringIO
import binascii
import bencode
import resumablehashlib
from test.fixtures import *
from test.registry.liveserverfixture import *
from test.registry.fixtures import *
from test.registry.protocol_fixtures import *
from test.registry.protocols import Failures, Image, layer_bytes_for_contents, ProtocolOptions
from app import instance_keys
from util.security.registry_jwt import decode_bearer_header
from util.timedeltastring import convert_to_timedelta
def test_basic_push_pull(pusher, puller, basic_images, liveserver_session, app_reloader):
""" Test: Basic push and pull of an image to a new repository. """
credentials = ('devtable', 'password')
# Push a new repository.
pusher.push(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images,
credentials=credentials)
# Pull the repository to verify.
puller.pull(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images,
credentials=credentials)
def test_basic_push_pull_by_manifest(manifest_protocol, basic_images, liveserver_session,
app_reloader):
""" Test: Basic push and pull-by-manifest of an image to a new repository. """
credentials = ('devtable', 'password')
# Push a new repository.
result = manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images,
credentials=credentials)
# Pull the repository by digests to verify.
digests = [str(manifest.digest) for manifest in result.manifests.values()]
manifest_protocol.pull(liveserver_session, 'devtable', 'newrepo', digests, basic_images,
credentials=credentials)
def test_push_invalid_credentials(pusher, basic_images, liveserver_session, app_reloader):
""" Test: Ensure we get auth errors when trying to push with invalid credentials. """
invalid_credentials = ('devtable', 'notcorrectpassword')
pusher.push(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images,
credentials=invalid_credentials, expected_failure=Failures.UNAUTHENTICATED)
def test_pull_invalid_credentials(puller, basic_images, liveserver_session, app_reloader):
""" Test: Ensure we get auth errors when trying to pull with invalid credentials. """
invalid_credentials = ('devtable', 'notcorrectpassword')
puller.pull(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images,
credentials=invalid_credentials, expected_failure=Failures.UNAUTHENTICATED)
def test_push_pull_formerly_bad_repo_name(pusher, puller, basic_images, liveserver_session,
app_reloader):
""" Test: Basic push and pull of an image to a new repository with a name that formerly
failed. """
credentials = ('devtable', 'password')
# Push a new repository.
pusher.push(liveserver_session, 'devtable', 'foo.bar', 'latest', basic_images,
credentials=credentials)
# Pull the repository to verify.
puller.pull(liveserver_session, 'devtable', 'foo.bar', 'latest', basic_images,
credentials=credentials)
def test_application_repo(pusher, puller, basic_images, liveserver_session, app_reloader,
registry_server_executor, liveserver):
""" Test: Attempting to push or pull from an *application* repository raises a 405. """
credentials = ('devtable', 'password')
registry_server_executor.on(liveserver).create_app_repository('devtable', 'someapprepo')
# Attempt to push to the repository.
pusher.push(liveserver_session, 'devtable', 'someapprepo', 'latest', basic_images,
credentials=credentials, expected_failure=Failures.APP_REPOSITORY)
# Attempt to pull from the repository.
puller.pull(liveserver_session, 'devtable', 'someapprepo', 'latest', basic_images,
credentials=credentials, expected_failure=Failures.APP_REPOSITORY)
def test_middle_layer_different_sha(manifest_protocol, v1_protocol, liveserver_session,
app_reloader):
""" Test: Pushing of a 3-layer image with the *same* V1 ID's, but the middle layer having
different bytes, must result in new IDs being generated for the leaf layer, as
they point to different "images".
"""
credentials = ('devtable', 'password')
first_images = [
Image(id='baseimage', parent_id=None, size=None, bytes=layer_bytes_for_contents('base')),
Image(id='middleimage', parent_id='baseimage', size=None,
bytes=layer_bytes_for_contents('middle')),
Image(id='leafimage', parent_id='middleimage', size=None,
bytes=layer_bytes_for_contents('leaf')),
]
# First push and pull the images, to ensure we have the basics setup and working.
manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest', first_images,
credentials=credentials)
first_pull_result = manifest_protocol.pull(liveserver_session, 'devtable', 'newrepo', 'latest',
first_images, credentials=credentials)
first_manifest = first_pull_result.manifests['latest']
assert set([image.id for image in first_images]) == set(first_manifest.image_ids)
assert first_pull_result.image_ids['latest'] == 'leafimage'
# Next, create an image list with the middle image's *bytes* changed.
second_images = list(first_images)
second_images[1] = Image(id='middleimage', parent_id='baseimage', size=None,
bytes=layer_bytes_for_contents('different middle bytes'))
# Push and pull the image, ensuring that the produced ID for the middle and leaf layers
# are synthesized.
options = ProtocolOptions()
options.munge_shas = True
options.skip_head_checks = True
manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest', second_images,
credentials=credentials, options=options)
second_pull_result = v1_protocol.pull(liveserver_session, 'devtable', 'newrepo', 'latest',
second_images, credentials=credentials, options=options)
assert second_pull_result.image_ids['latest'] != 'leafimage'
def add_robot(api_caller, _):
api_caller.conduct_auth('devtable', 'password')
resp = api_caller.get('/api/v1/organization/buynlarge/robots/ownerbot')
return ('buynlarge+ownerbot', resp.json()['token'])
def add_token(_, executor):
return ('$token', executor.add_token().text)
@pytest.mark.parametrize('credentials, namespace, expected_performer', [
(('devtable', 'password'), 'devtable', 'devtable'),
(add_robot, 'buynlarge', 'buynlarge+ownerbot'),
(('$oauthtoken', 'test'), 'devtable', 'devtable'),
(('$app', 'test'), 'devtable', 'devtable'),
(add_token, 'devtable', None),
])
def test_push_pull_logging(credentials, namespace, expected_performer, pusher, puller, basic_images,
liveserver_session, liveserver, api_caller, app_reloader,
registry_server_executor):
""" Test: Basic push and pull, ensuring that logs are added for each operation. """
# Create the repository before the test push.
start_images = [Image(id='startimage', parent_id=None, size=None,
bytes=layer_bytes_for_contents('start image'))]
pusher.push(liveserver_session, namespace, 'newrepo', 'latest', start_images,
credentials=('devtable', 'password'))
# Retrieve the credentials to use. This must be after the repo is created, because
# some credentials creation code adds the new entity to the repository.
if not isinstance(credentials, tuple):
credentials = credentials(api_caller, registry_server_executor.on(liveserver))
# Push to the repository with the specified credentials.
pusher.push(liveserver_session, namespace, 'newrepo', 'latest', basic_images,
credentials=credentials)
# Check the logs for the push.
api_caller.conduct_auth('devtable', 'password')
result = api_caller.get('/api/v1/repository/%s/newrepo/logs' % namespace)
logs = result.json()['logs']
assert len(logs) == 2
assert logs[0]['kind'] == 'push_repo'
assert logs[0]['metadata']['namespace'] == namespace
assert logs[0]['metadata']['repo'] == 'newrepo'
if expected_performer is not None:
assert logs[0]['performer']['name'] == expected_performer
# Pull the repository to verify.
puller.pull(liveserver_session, namespace, 'newrepo', 'latest', basic_images,
credentials=credentials)
# Check the logs for the pull.
result = api_caller.get('/api/v1/repository/%s/newrepo/logs' % namespace)
logs = result.json()['logs']
assert len(logs) == 3
assert logs[0]['kind'] == 'pull_repo'
assert logs[0]['metadata']['namespace'] == namespace
assert logs[0]['metadata']['repo'] == 'newrepo'
if expected_performer is not None:
assert logs[0]['performer']['name'] == expected_performer
def test_pull_publicrepo_anonymous(pusher, puller, basic_images, liveserver_session,
app_reloader, api_caller, liveserver):
""" Test: Pull a public repository anonymously. """
# Add a new repository under the public user, so we have a repository to pull.
pusher.push(liveserver_session, 'public', 'newrepo', 'latest', basic_images,
credentials=('public', 'password'))
# First try to pull the (currently private) repo anonymously, which should fail (since it is
# private)
puller.pull(liveserver_session, 'public', 'newrepo', 'latest', basic_images,
expected_failure=Failures.UNAUTHORIZED)
# Using a non-public user should also fail.
puller.pull(liveserver_session, 'public', 'newrepo', 'latest', basic_images,
credentials=('devtable', 'password'),
expected_failure=Failures.UNAUTHORIZED)
# Make the repository public.
api_caller.conduct_auth('public', 'password')
api_caller.change_repo_visibility('public', 'newrepo', 'public')
# Pull the repository anonymously, which should succeed because the repository is public.
puller.pull(liveserver_session, 'public', 'newrepo', 'latest', basic_images)
def test_pull_publicrepo_no_anonymous_access(pusher, puller, basic_images, liveserver_session,
app_reloader, api_caller, liveserver,
registry_server_executor):
""" Test: Attempts to pull a public repository anonymously, with the feature flag disabled. """
# Add a new repository under the public user, so we have a repository to pull.
pusher.push(liveserver_session, 'public', 'newrepo', 'latest', basic_images,
credentials=('public', 'password'))
# First try to pull the (currently private) repo anonymously, which should fail (since it is
# private)
puller.pull(liveserver_session, 'public', 'newrepo', 'latest', basic_images,
expected_failure=Failures.UNAUTHORIZED)
# Using a non-public user should also fail.
puller.pull(liveserver_session, 'public', 'newrepo', 'latest', basic_images,
credentials=('devtable', 'password'),
expected_failure=Failures.UNAUTHORIZED)
# Make the repository public.
api_caller.conduct_auth('public', 'password')
api_caller.change_repo_visibility('public', 'newrepo', 'public')
with FeatureFlagValue('ANONYMOUS_ACCESS', False, registry_server_executor.on(liveserver)):
# Attempt again to pull the (now public) repo anonymously, which should fail since
# the feature flag for anonymous access is turned off.
puller.pull(liveserver_session, 'public', 'newrepo', 'latest', basic_images,
expected_failure=Failures.ANONYMOUS_NOT_ALLOWED)
# Using a non-public user should now succeed.
puller.pull(liveserver_session, 'public', 'newrepo', 'latest', basic_images,
credentials=('devtable', 'password'))
def test_basic_organization_flow(pusher, puller, basic_images, liveserver_session, app_reloader):
""" Test: Basic push and pull of an image to a new repository by members of an organization. """
# Add a new repository under the organization via the creator user.
pusher.push(liveserver_session, 'buynlarge', 'newrepo', 'latest', basic_images,
credentials=('creator', 'password'))
# Ensure that the creator can pull it.
puller.pull(liveserver_session, 'buynlarge', 'newrepo', 'latest', basic_images,
credentials=('creator', 'password'))
# Ensure that the admin can pull it.
puller.pull(liveserver_session, 'buynlarge', 'newrepo', 'latest', basic_images,
credentials=('devtable', 'password'))
# Ensure that the reader *cannot* pull it.
puller.pull(liveserver_session, 'buynlarge', 'newrepo', 'latest', basic_images,
credentials=('reader', 'password'),
expected_failure=Failures.UNAUTHORIZED)
def test_library_support(pusher, puller, basic_images, liveserver_session, app_reloader):
""" Test: Pushing and pulling from the implicit library namespace. """
credentials = ('devtable', 'password')
# Push a new repository.
pusher.push(liveserver_session, '', 'newrepo', 'latest', basic_images,
credentials=credentials)
# Pull the repository to verify.
puller.pull(liveserver_session, '', 'newrepo', 'latest', basic_images,
credentials=credentials)
# Pull the repository from the library namespace to verify.
puller.pull(liveserver_session, 'library', 'newrepo', 'latest', basic_images,
credentials=credentials)
def test_library_namespace_with_support_disabled(pusher, puller, basic_images, liveserver_session,
app_reloader, liveserver,
registry_server_executor):
""" Test: Pushing and pulling from the explicit library namespace, even when the
implicit one is disabled.
"""
credentials = ('devtable', 'password')
with FeatureFlagValue('LIBRARY_SUPPORT', False, registry_server_executor.on(liveserver)):
# Push a new repository.
pusher.push(liveserver_session, 'library', 'newrepo', 'latest', basic_images,
credentials=credentials)
# Pull the repository from the library namespace to verify.
puller.pull(liveserver_session, 'library', 'newrepo', 'latest', basic_images,
credentials=credentials)
def test_push_library_with_support_disabled(pusher, basic_images, liveserver_session,
app_reloader, liveserver,
registry_server_executor):
""" Test: Pushing to the implicit library namespace, when disabled,
should fail.
"""
credentials = ('devtable', 'password')
with FeatureFlagValue('LIBRARY_SUPPORT', False, registry_server_executor.on(liveserver)):
# Attempt to push a new repository.
pusher.push(liveserver_session, '', 'newrepo', 'latest', basic_images,
credentials=credentials,
expected_failure=Failures.DISALLOWED_LIBRARY_NAMESPACE)
def test_pull_library_with_support_disabled(puller, basic_images, liveserver_session,
app_reloader, liveserver,
registry_server_executor):
""" Test: Pushing to the implicit library namespace, when disabled,
should fail.
"""
credentials = ('devtable', 'password')
with FeatureFlagValue('LIBRARY_SUPPORT', False, registry_server_executor.on(liveserver)):
# Attempt to pull the repository from the library namespace.
puller.pull(liveserver_session, '', 'newrepo', 'latest', basic_images,
credentials=credentials,
expected_failure=Failures.DISALLOWED_LIBRARY_NAMESPACE)
def test_image_replication(pusher, basic_images, liveserver_session, app_reloader, liveserver,
registry_server_executor):
""" Test: Ensure that entries are created for replication of the images pushed. """
credentials = ('devtable', 'password')
with FeatureFlagValue('STORAGE_REPLICATION', True, registry_server_executor.on(liveserver)):
pusher.push(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images,
credentials=credentials)
# Ensure that entries were created for each image.
for image in basic_images:
r = registry_server_executor.on(liveserver).get_storage_replication_entry(image.id)
assert r.text == 'OK'
def test_push_reponame_with_slashes(pusher, basic_images, liveserver_session, app_reloader):
""" Test: Attempt to add a repository name with slashes. This should fail as we do not
support it.
"""
credentials = ('devtable', 'password')
pusher.push(liveserver_session, 'devtable', 'some/slash', 'latest', basic_images,
credentials=credentials,
expected_failure=Failures.INVALID_REPOSITORY)
@pytest.mark.parametrize('tag_name, expected_failure', [
('l', None),
('1', None),
('x' * 128, None),
('', Failures.MISSING_TAG),
('x' * 129, Failures.INVALID_TAG),
('.fail', Failures.INVALID_TAG),
('-fail', Failures.INVALID_TAG),
])
def test_tag_validaton(tag_name, expected_failure, pusher, basic_images, liveserver_session,
app_reloader):
""" Test: Various forms of tags and whether they succeed or fail as expected. """
credentials = ('devtable', 'password')
pusher.push(liveserver_session, 'devtable', 'newrepo', tag_name, basic_images,
credentials=credentials,
expected_failure=expected_failure)
def test_invalid_parent(pusher, liveserver_session, app_reloader):
""" Test: Attempt to push an image with an invalid/missing parent. """
images = [
Image(id='childimage', parent_id='parentimage', size=None,
bytes=layer_bytes_for_contents('child')),
]
credentials = ('devtable', 'password')
pusher.push(liveserver_session, 'devtable', 'newrepo', 'latest', images,
credentials=credentials,
expected_failure=Failures.INVALID_IMAGES)
def test_wrong_image_order(pusher, liveserver_session, app_reloader):
""" Test: Attempt to push an image with its layers in the wrong order. """
images = [
Image(id='childimage', parent_id='parentimage', size=None,
bytes=layer_bytes_for_contents('child')),
Image(id='parentimage', parent_id=None, size=None,
bytes=layer_bytes_for_contents('parent')),
]
credentials = ('devtable', 'password')
pusher.push(liveserver_session, 'devtable', 'newrepo', 'latest', images,
credentials=credentials,
expected_failure=Failures.INVALID_IMAGES)
@pytest.mark.parametrize('labels', [
# Basic labels.
[('foo', 'bar', 'text/plain'), ('baz', 'meh', 'text/plain')],
# Theoretically invalid, but allowed when pushed via registry protocol.
[('theoretically-invalid--label', 'foo', 'text/plain')],
# JSON label.
[('somejson', '{"some": "json"}', 'application/json'), ('plain', '', 'text/plain')],
# JSON-esque (but not valid JSON) labels.
[('foo', '[hello world]', 'text/plain'), ('bar', '{wassup?!}', 'text/plain')],
])
def test_labels(labels, manifest_protocol, liveserver_session, api_caller, app_reloader):
""" Test: Image pushed with labels has those labels found in the database after the
push succeeds.
"""
images = [
Image(id='theimage', parent_id=None, bytes=layer_bytes_for_contents('image'),
config={'Labels': {key: value for (key, value, _) in labels}}),
]
credentials = ('devtable', 'password')
result = manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest', images,
credentials=credentials)
digest = result.manifests['latest'].digest
api_caller.conduct_auth('devtable', 'password')
data = api_caller.get('/api/v1/repository/devtable/newrepo/manifest/%s/labels' % digest).json()
labels_found = data['labels']
assert len(labels_found) == len(labels)
labels_found_map = {l['key']: l for l in labels_found}
assert set(images[0].config['Labels'].keys()) == set(labels_found_map.keys())
for key, _, media_type in labels:
assert labels_found_map[key]['source_type'] == 'manifest'
assert labels_found_map[key]['media_type'] == media_type
@pytest.mark.parametrize('label_value, expected_expiration', [
('1d', True),
('1h', True),
('2w', True),
('1g', False),
('something', False),
])
def test_expiration_label(label_value, expected_expiration, manifest_protocol, liveserver_session,
api_caller, app_reloader):
""" Test: Tag pushed with a valid `quay.expires-after` will have its expiration set to its
start time plus the duration specified. If the duration is invalid, no expiration will
be set.
"""
images = [
Image(id='theimage', parent_id=None, bytes=layer_bytes_for_contents('image'),
config={'Labels': {'quay.expires-after': label_value}}),
]
credentials = ('devtable', 'password')
manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest', images,
credentials=credentials)
api_caller.conduct_auth('devtable', 'password')
tag_data = api_caller.get('/api/v1/repository/devtable/newrepo/tag').json()['tags'][0]
if expected_expiration:
diff = convert_to_timedelta(label_value).total_seconds()
assert tag_data['end_ts'] == tag_data['start_ts'] + diff
else:
assert tag_data.get('end_ts') is None
@pytest.mark.parametrize('content_type', [
'application/vnd.oci.image.manifest.v1+json',
'application/vnd.docker.distribution.manifest.v2+json',
'application/vnd.foo.bar',
])
def test_unsupported_manifest_content_type(content_type, manifest_protocol, basic_images,
liveserver_session, app_reloader):
""" Test: Attempt to push a manifest with an unsupported media type. """
credentials = ('devtable', 'password')
options = ProtocolOptions()
options.manifest_content_type = content_type
# Attempt to push a new repository.
manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images,
credentials=credentials,
options=options,
expected_failure=Failures.UNSUPPORTED_CONTENT_TYPE)
def test_invalid_blob_reference(manifest_protocol, basic_images, liveserver_session, app_reloader):
""" Test: Attempt to push a manifest with an invalid blob reference. """
credentials = ('devtable', 'password')
options = ProtocolOptions()
options.manifest_invalid_blob_references = True
# Attempt to push a new repository.
manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images,
credentials=credentials,
options=options,
expected_failure=Failures.INVALID_BLOB)
def test_delete_tag(manifest_protocol, puller, basic_images, liveserver_session,
app_reloader):
""" Test: Push a repository, delete a tag, and attempt to pull. """
credentials = ('devtable', 'password')
# Push the tags.
result = manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', ['one', 'two'],
basic_images, credentials=credentials)
# Delete tag `one` by digest.
manifest_protocol.delete(liveserver_session, 'devtable', 'newrepo',
result.manifests['one'].digest,
credentials=credentials)
# Attempt to pull tag `one` and ensure it doesn't work.
puller.pull(liveserver_session, 'devtable', 'newrepo', 'one', basic_images,
credentials=credentials,
expected_failure=Failures.UNKNOWN_TAG)
# Pull tag `two` to verify it works.
puller.pull(liveserver_session, 'devtable', 'newrepo', 'two', basic_images,
credentials=credentials)
def test_cancel_upload(manifest_protocol, basic_images, liveserver_session, app_reloader):
""" Test: Cancelation of blob uploads. """
credentials = ('devtable', 'password')
options = ProtocolOptions()
options.cancel_blob_upload = True
manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest',
basic_images, credentials=credentials)
def test_blob_caching(manifest_protocol, basic_images, liveserver_session, app_reloader,
liveserver, registry_server_executor):
""" Test: Pulling of blobs after initially pulled will result in the blobs being cached. """
credentials = ('devtable', 'password')
# Push a tag.
result = manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest',
basic_images, credentials=credentials)
# Conduct the initial pull to prime the cache.
manifest_protocol.pull(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images,
credentials=credentials)
# Disconnect the server from the database.
registry_server_executor.on(liveserver).break_database()
# Pull each blob, which should succeed due to caching. If caching is broken, this will
# fail when it attempts to hit the database.
for layer in result.manifests['latest'].layers:
blob_id = str(layer.digest)
r = liveserver_session.get('/v2/devtable/newrepo/blobs/%s' % blob_id, headers=result.headers)
assert r.status_code == 200
@pytest.mark.parametrize('chunks', [
# Two chunks.
[(0, 100), (100, None)],
# Multiple chunks.
[(0, 10), (10, 20), (20, None)],
[(0, 10), (10, 20), (20, 30), (30, 40), (40, 50), (50, None)],
# Overlapping chunks.
[(0, 1024), (10, None)],
])
def test_chunked_blob_uploading(chunks, random_layer_data, manifest_protocol, puller,
liveserver_session, app_reloader):
""" Test: Uploading of blobs as chunks. """
credentials = ('devtable', 'password')
adjusted_chunks = []
for start_byte, end_byte in chunks:
adjusted_chunks.append((start_byte, end_byte if end_byte else len(random_layer_data)))
images = [
Image(id='theimage', parent_id=None, bytes=random_layer_data),
]
options = ProtocolOptions()
options.chunks_for_upload = adjusted_chunks
# Push the image, using the specified chunking.
manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest',
images, credentials=credentials, options=options)
# Pull to verify the image was created.
puller.pull(liveserver_session, 'devtable', 'newrepo', 'latest', images,
credentials=credentials)
def test_chunked_uploading_mismatched_chunks(manifest_protocol, random_layer_data,
liveserver_session, app_reloader):
""" Test: Attempt to upload chunks with data missing. """
credentials = ('devtable', 'password')
images = [
Image(id='theimage', parent_id=None, bytes=random_layer_data),
]
# Note: Byte #100 is missing.
options = ProtocolOptions()
options.chunks_for_upload = [(0, 100), (101, len(random_layer_data), 416)]
# Attempt to push, with the chunked upload failing.
manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest',
images, credentials=credentials, options=options)
def test_pull_disabled_namespace(pusher, puller, basic_images, liveserver_session,
app_reloader, liveserver, registry_server_executor):
""" Test: Attempt to pull a repository from a disabled namespace results in an error. """
credentials = ('devtable', 'password')
# Push a new repository.
pusher.push(liveserver_session, 'buynlarge', 'someneworgrepo', 'latest', basic_images,
credentials=credentials)
# Disable the namespace.
registry_server_executor.on(liveserver).disable_namespace('buynlarge')
# Attempt to pull, which should fail.
puller.pull(liveserver_session, 'buynlarge', 'someneworgrepo', 'latest', basic_images,
credentials=credentials, expected_failure=Failures.NAMESPACE_DISABLED)
def test_push_disabled_namespace(pusher, basic_images, liveserver_session,
app_reloader, liveserver, registry_server_executor):
""" Test: Attempt to push a repository from a disabled namespace results in an error. """
credentials = ('devtable', 'password')
# Disable the namespace.
registry_server_executor.on(liveserver).disable_namespace('buynlarge')
# Attempt to push, which should fail.
pusher.push(liveserver_session, 'buynlarge', 'someneworgrepo', 'latest', basic_images,
credentials=credentials, expected_failure=Failures.NAMESPACE_DISABLED)
@pytest.mark.parametrize('public_catalog, credentials, expected_repos', [
# No public access and no credentials => No results.
(False, None, None),
# Public access and no credentials => public repositories.
(True, None, ['public/publicrepo']),
# Private creds => private repositories.
(False, ('devtable', 'password'), ['devtable/simple', 'devtable/complex', 'devtable/gargantuan']),
(True, ('devtable', 'password'), ['devtable/simple', 'devtable/complex', 'devtable/gargantuan']),
])
@pytest.mark.parametrize('page_size', [
1,
2,
10,
50,
100,
])
def test_catalog(public_catalog, credentials, expected_repos, page_size, v2_protocol,
liveserver_session, app_reloader, liveserver, registry_server_executor):
""" Test: Retrieving results from the V2 catalog. """
with FeatureFlagValue('PUBLIC_CATALOG', public_catalog, registry_server_executor.on(liveserver)):
results = v2_protocol.catalog(liveserver_session, page_size=page_size, credentials=credentials,
namespace='devtable', repo_name='simple')
if expected_repos is None:
assert len(results) == 0
else:
assert set(expected_repos).issubset(set(results))
def test_pull_torrent(pusher, basic_images, liveserver_session, liveserver,
registry_server_executor, app_reloader):
""" Test: Retrieve a torrent for pulling the image via the Quay CLI. """
credentials = ('devtable', 'password')
# Push an image to download.
pusher.push(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images,
credentials=credentials)
# Required for torrent.
registry_server_executor.on(liveserver).set_supports_direct_download(True)
# For each layer, retrieve a torrent for the blob.
for image in basic_images:
digest = 'sha256:' + hashlib.sha256(image.bytes).hexdigest()
response = liveserver_session.get('/c1/torrent/devtable/newrepo/blobs/%s' % digest,
auth=credentials)
torrent_info = bencode.bdecode(response.content)
# Check the announce URL.
assert torrent_info['url-list'] == 'http://somefakeurl?goes=here'
# Check the metadata.
assert torrent_info.get('info', {}).get('pieces') is not None
assert torrent_info.get('announce') is not None
# Check the pieces.
sha = resumablehashlib.sha1()
sha.update(image.bytes)
expected = binascii.hexlify(sha.digest())
found = binascii.hexlify(torrent_info['info']['pieces'])
assert expected == found
@pytest.mark.parametrize('use_estimates', [
False,
True,
])
def test_squashed_images(use_estimates, pusher, sized_images, liveserver_session,
liveserver, registry_server_executor, app_reloader):
""" Test: Pulling of squashed images. """
credentials = ('devtable', 'password')
# Push an image to download.
pusher.push(liveserver_session, 'devtable', 'newrepo', 'latest', sized_images,
credentials=credentials)
if use_estimates:
# Clear the uncompressed size stored for the images, to ensure that we estimate instead.
for image in sized_images:
registry_server_executor.on(liveserver).clear_uncompressed_size(image.id)
# Pull the squashed version.
response = liveserver_session.get('/c1/squash/devtable/newrepo/latest', auth=credentials)
tar = tarfile.open(fileobj=StringIO(response.content))
# Verify the squashed image.
expected_image_id = '13a2d9711a3e242fcd50a7627c02d86901ac801b78dcea3147e8ff640078de52'
expected_names = ['repositories',
expected_image_id,
'%s/json' % expected_image_id,
'%s/VERSION' % expected_image_id,
'%s/layer.tar' % expected_image_id]
assert tar.getnames() == expected_names
# Verify the JSON image data.
json_data = (tar.extractfile(tar.getmember('%s/json' % expected_image_id)).read())
# Ensure the JSON loads and parses.
result = json.loads(json_data)
assert result['id'] == expected_image_id
# Ensure that squashed layer tar can be opened.
tarfile.open(fileobj=tar.extractfile(tar.getmember('%s/layer.tar' % expected_image_id)))
def get_robot_password(api_caller):
api_caller.conduct_auth('devtable', 'password')
resp = api_caller.get('/api/v1/organization/buynlarge/robots/ownerbot')
return resp.json()['token']
def get_encrypted_password(api_caller):
api_caller.conduct_auth('devtable', 'password')
resp = api_caller.post('/api/v1/user/clientkey', data=json.dumps(dict(password='password')),
headers={'Content-Type': 'application/json'})
return resp.json()['key']
@pytest.mark.parametrize('username, password, expect_success', [
# Invalid username.
('invaliduser', 'somepassword', False),
# Invalid password.
('devtable', 'invalidpassword', False),
# Invalid OAuth token.
('$oauthtoken', 'unknown', False),
# Invalid CLI token.
('$app', 'invalid', False),
# Valid.
('devtable', 'password', True),
# Robot.
('buynlarge+ownerbot', get_robot_password, True),
# Encrypted password.
('devtable', get_encrypted_password, True),
# OAuth.
('$oauthtoken', 'test', True),
# CLI Token.
('$app', 'test', True),
])
def test_login(username, password, expect_success, loginer, liveserver_session,
api_caller, app_reloader):
""" Test: Login flow. """
if not isinstance(password, str):
password = password(api_caller)
loginer.login(liveserver_session, username, password, [], expect_success)
@pytest.mark.parametrize('username, password, scopes, expected_access, expect_success', [
# No scopes.
('devtable', 'password', [], [], True),
# Basic pull.
('devtable', 'password', ['repository:devtable/simple:pull'], [
{'type': 'repository', 'name': 'devtable/simple', 'actions': ['pull']},
], True),
# Basic push.
('devtable', 'password', ['repository:devtable/simple:push'], [
{'type': 'repository', 'name': 'devtable/simple', 'actions': ['push']},
], True),
# Basic push/pull.
('devtable', 'password', ['repository:devtable/simple:push,pull'], [
{'type': 'repository', 'name': 'devtable/simple', 'actions': ['push', 'pull']},
], True),
# Admin.
('devtable', 'password', ['repository:devtable/simple:push,pull,*'], [
{'type': 'repository', 'name': 'devtable/simple', 'actions': ['push', 'pull', '*']},
], True),
# Basic pull with endpoint.
('devtable', 'password', ['repository:localhost:5000/devtable/simple:pull'], [
{'type': 'repository', 'name': 'localhost:5000/devtable/simple', 'actions': ['pull']},
], True),
# Basic pull with invalid endpoint.
('devtable', 'password', ['repository:someinvalid/devtable/simple:pull'], [], False),
# Pull with no access.
('public', 'password', ['repository:devtable/simple:pull'], [
{'type': 'repository', 'name': 'devtable/simple', 'actions': []},
], True),
# Anonymous push and pull on a private repository.
('', '', ['repository:devtable/simple:pull,push'], [
{'type': 'repository', 'name': 'devtable/simple', 'actions': []},
], True),
# Pull and push with no push access.
('reader', 'password', ['repository:buynlarge/orgrepo:pull,push'], [
{'type': 'repository', 'name': 'buynlarge/orgrepo', 'actions': ['pull']},
], True),
# OAuth.
('$oauthtoken', 'test', ['repository:public/publicrepo:pull,push'], [
{'type': 'repository', 'name': 'public/publicrepo', 'actions': ['pull']},
], True),
# Anonymous public repo.
('', '', ['repository:public/publicrepo:pull,push'], [
{'type': 'repository', 'name': 'public/publicrepo', 'actions': ['pull']},
], True),
# Multiple scopes.
('devtable', 'password',
['repository:devtable/simple:push,pull,*', 'repository:buynlarge/orgrepo:pull'], [
{'type': 'repository', 'name': 'devtable/simple', 'actions': ['push', 'pull', '*']},
{'type': 'repository', 'name': 'buynlarge/orgrepo', 'actions': ['pull']},
], True),
# Multiple scopes.
('devtable', 'password',
['repository:devtable/simple:push,pull,*', 'repository:public/publicrepo:push,pull'], [
{'type': 'repository', 'name': 'devtable/simple', 'actions': ['push', 'pull', '*']},
{'type': 'repository', 'name': 'public/publicrepo', 'actions': ['pull']},
], True),
])
def test_login_scopes(username, password, scopes, expected_access, expect_success, v2_protocol,
liveserver_session, api_caller, app_reloader):
""" Test: Login via the V2 auth protocol reacts correctly to requested scopes. """
if not isinstance(password, str):
password = password(api_caller)
response = v2_protocol.login(liveserver_session, username, password, scopes, expect_success)
if not expect_success:
return
# Validate the returned token.
encoded = response.json()['token']
payload = decode_bearer_header('Bearer ' + encoded, instance_keys,
{'SERVER_HOSTNAME': 'localhost:5000'})
assert payload is not None
assert payload['access'] == expected_access