# pylint: disable=W0401, W0621, W0613, W0614, R0913 import hashlib import tarfile from cStringIO import StringIO import binascii import bencode import resumablehashlib from werkzeug.datastructures import Accept from test.fixtures import * from test.registry.liveserverfixture import * from test.registry.fixtures import * from test.registry.protocol_fixtures import * from test.registry.protocols import Failures, Image, layer_bytes_for_contents, ProtocolOptions from app import instance_keys from data.model.tag import list_repository_tags from image.docker.schema1 import DOCKER_SCHEMA1_MANIFEST_CONTENT_TYPE from util.security.registry_jwt import decode_bearer_header from util.timedeltastring import convert_to_timedelta def test_basic_push_pull(pusher, puller, basic_images, liveserver_session, app_reloader): """ Test: Basic push and pull of an image to a new repository. """ credentials = ('devtable', 'password') # Push a new repository. pusher.push(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images, credentials=credentials) # Pull the repository to verify. puller.pull(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images, credentials=credentials) def test_multi_layer_images_push_pull(pusher, puller, multi_layer_images, liveserver_session, app_reloader): """ Test: Basic push and pull of a multi-layered image to a new repository. """ credentials = ('devtable', 'password') # Push a new repository. pusher.push(liveserver_session, 'devtable', 'newrepo', 'latest', multi_layer_images, credentials=credentials) # Pull the repository to verify. puller.pull(liveserver_session, 'devtable', 'newrepo', 'latest', multi_layer_images, credentials=credentials) def test_no_tag_manifests(pusher, puller, basic_images, liveserver_session, app_reloader, liveserver, registry_server_executor): """ Test: Basic pull without manifests. """ credentials = ('devtable', 'password') # Push a new repository. pusher.push(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images, credentials=credentials) # Delete all tag manifests. registry_server_executor.on(liveserver).delete_manifests() # Ensure we can still pull. puller.pull(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images, credentials=credentials) def test_basic_push_pull_by_manifest(manifest_protocol, basic_images, liveserver_session, app_reloader): """ Test: Basic push and pull-by-manifest of an image to a new repository. """ credentials = ('devtable', 'password') # Push a new repository. result = manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images, credentials=credentials) # Pull the repository by digests to verify. digests = [str(manifest.digest) for manifest in result.manifests.values()] manifest_protocol.pull(liveserver_session, 'devtable', 'newrepo', digests, basic_images, credentials=credentials) def test_basic_push_by_manifest_digest(manifest_protocol, basic_images, liveserver_session, app_reloader): """ Test: Basic push-by-manifest and pull-by-manifest of an image to a new repository. """ credentials = ('devtable', 'password') # Push a new repository. options = ProtocolOptions() options.push_by_manifest_digest = True result = manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images, credentials=credentials, options=options) # Pull the repository by digests to verify. digests = [str(manifest.digest) for manifest in result.manifests.values()] manifest_protocol.pull(liveserver_session, 'devtable', 'newrepo', digests, basic_images, credentials=credentials) def test_push_invalid_credentials(pusher, basic_images, liveserver_session, app_reloader): """ Test: Ensure we get auth errors when trying to push with invalid credentials. """ invalid_credentials = ('devtable', 'notcorrectpassword') pusher.push(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images, credentials=invalid_credentials, expected_failure=Failures.UNAUTHENTICATED) def test_pull_invalid_credentials(puller, basic_images, liveserver_session, app_reloader): """ Test: Ensure we get auth errors when trying to pull with invalid credentials. """ invalid_credentials = ('devtable', 'notcorrectpassword') puller.pull(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images, credentials=invalid_credentials, expected_failure=Failures.UNAUTHENTICATED) def test_push_pull_formerly_bad_repo_name(pusher, puller, basic_images, liveserver_session, app_reloader): """ Test: Basic push and pull of an image to a new repository with a name that formerly failed. """ credentials = ('devtable', 'password') # Push a new repository. pusher.push(liveserver_session, 'devtable', 'foo.bar', 'latest', basic_images, credentials=credentials) # Pull the repository to verify. puller.pull(liveserver_session, 'devtable', 'foo.bar', 'latest', basic_images, credentials=credentials) def test_application_repo(pusher, puller, basic_images, liveserver_session, app_reloader, registry_server_executor, liveserver): """ Test: Attempting to push or pull from an *application* repository raises a 405. """ credentials = ('devtable', 'password') registry_server_executor.on(liveserver).create_app_repository('devtable', 'someapprepo') # Attempt to push to the repository. pusher.push(liveserver_session, 'devtable', 'someapprepo', 'latest', basic_images, credentials=credentials, expected_failure=Failures.APP_REPOSITORY) # Attempt to pull from the repository. puller.pull(liveserver_session, 'devtable', 'someapprepo', 'latest', basic_images, credentials=credentials, expected_failure=Failures.APP_REPOSITORY) def test_middle_layer_different_sha(manifest_protocol, v1_protocol, liveserver_session, app_reloader): """ Test: Pushing of a 3-layer image with the *same* V1 ID's, but the middle layer having different bytes, must result in new IDs being generated for the leaf layer, as they point to different "images". """ credentials = ('devtable', 'password') first_images = [ Image(id='baseimage', parent_id=None, size=None, bytes=layer_bytes_for_contents('base')), Image(id='middleimage', parent_id='baseimage', size=None, bytes=layer_bytes_for_contents('middle')), Image(id='leafimage', parent_id='middleimage', size=None, bytes=layer_bytes_for_contents('leaf')), ] # First push and pull the images, to ensure we have the basics setup and working. manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest', first_images, credentials=credentials) first_pull_result = manifest_protocol.pull(liveserver_session, 'devtable', 'newrepo', 'latest', first_images, credentials=credentials) first_manifest = first_pull_result.manifests['latest'] assert set([image.id for image in first_images]) == set(first_manifest.image_ids) assert first_pull_result.image_ids['latest'] == 'leafimage' # Next, create an image list with the middle image's *bytes* changed. second_images = list(first_images) second_images[1] = Image(id='middleimage', parent_id='baseimage', size=None, bytes=layer_bytes_for_contents('different middle bytes')) # Push and pull the image, ensuring that the produced ID for the middle and leaf layers # are synthesized. options = ProtocolOptions() options.munge_shas = True options.skip_head_checks = True manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest', second_images, credentials=credentials, options=options) second_pull_result = v1_protocol.pull(liveserver_session, 'devtable', 'newrepo', 'latest', second_images, credentials=credentials, options=options) assert second_pull_result.image_ids['latest'] != 'leafimage' def add_robot(api_caller, _): api_caller.conduct_auth('devtable', 'password') resp = api_caller.get('/api/v1/organization/buynlarge/robots/ownerbot') return ('buynlarge+ownerbot', resp.json()['token']) def add_token(_, executor): return ('$token', executor.add_token().text) @pytest.mark.parametrize('credentials, namespace, expected_performer', [ (('devtable', 'password'), 'devtable', 'devtable'), (add_robot, 'buynlarge', 'buynlarge+ownerbot'), (('$oauthtoken', 'test'), 'devtable', 'devtable'), (('$app', 'test'), 'devtable', 'devtable'), (add_token, 'devtable', None), ]) def test_push_pull_logging(credentials, namespace, expected_performer, pusher, puller, basic_images, liveserver_session, liveserver, api_caller, app_reloader, registry_server_executor): """ Test: Basic push and pull, ensuring that logs are added for each operation. """ # Create the repository before the test push. start_images = [Image(id='startimage', parent_id=None, size=None, bytes=layer_bytes_for_contents('start image'))] pusher.push(liveserver_session, namespace, 'newrepo', 'latest', start_images, credentials=('devtable', 'password')) # Retrieve the credentials to use. This must be after the repo is created, because # some credentials creation code adds the new entity to the repository. if not isinstance(credentials, tuple): credentials = credentials(api_caller, registry_server_executor.on(liveserver)) # Push to the repository with the specified credentials. pusher.push(liveserver_session, namespace, 'newrepo', 'latest', basic_images, credentials=credentials) # Check the logs for the push. api_caller.conduct_auth('devtable', 'password') result = api_caller.get('/api/v1/repository/%s/newrepo/logs' % namespace) logs = result.json()['logs'] assert len(logs) == 2 assert logs[0]['kind'] == 'push_repo' assert logs[0]['metadata']['namespace'] == namespace assert logs[0]['metadata']['repo'] == 'newrepo' if expected_performer is not None: assert logs[0]['performer']['name'] == expected_performer # Pull the repository to verify. puller.pull(liveserver_session, namespace, 'newrepo', 'latest', basic_images, credentials=credentials) # Check the logs for the pull. result = api_caller.get('/api/v1/repository/%s/newrepo/logs' % namespace) logs = result.json()['logs'] assert len(logs) == 3 assert logs[0]['kind'] == 'pull_repo' assert logs[0]['metadata']['namespace'] == namespace assert logs[0]['metadata']['repo'] == 'newrepo' if expected_performer is not None: assert logs[0]['performer']['name'] == expected_performer def test_pull_publicrepo_anonymous(pusher, puller, basic_images, liveserver_session, app_reloader, api_caller, liveserver): """ Test: Pull a public repository anonymously. """ # Add a new repository under the public user, so we have a repository to pull. pusher.push(liveserver_session, 'public', 'newrepo', 'latest', basic_images, credentials=('public', 'password')) # First try to pull the (currently private) repo anonymously, which should fail (since it is # private) puller.pull(liveserver_session, 'public', 'newrepo', 'latest', basic_images, expected_failure=Failures.UNAUTHORIZED) # Using a non-public user should also fail. puller.pull(liveserver_session, 'public', 'newrepo', 'latest', basic_images, credentials=('devtable', 'password'), expected_failure=Failures.UNAUTHORIZED) # Make the repository public. api_caller.conduct_auth('public', 'password') api_caller.change_repo_visibility('public', 'newrepo', 'public') # Pull the repository anonymously, which should succeed because the repository is public. puller.pull(liveserver_session, 'public', 'newrepo', 'latest', basic_images) def test_pull_publicrepo_no_anonymous_access(pusher, puller, basic_images, liveserver_session, app_reloader, api_caller, liveserver, registry_server_executor): """ Test: Attempts to pull a public repository anonymously, with the feature flag disabled. """ # Add a new repository under the public user, so we have a repository to pull. pusher.push(liveserver_session, 'public', 'newrepo', 'latest', basic_images, credentials=('public', 'password')) # First try to pull the (currently private) repo anonymously, which should fail (since it is # private) puller.pull(liveserver_session, 'public', 'newrepo', 'latest', basic_images, expected_failure=Failures.UNAUTHORIZED) # Using a non-public user should also fail. puller.pull(liveserver_session, 'public', 'newrepo', 'latest', basic_images, credentials=('devtable', 'password'), expected_failure=Failures.UNAUTHORIZED) # Make the repository public. api_caller.conduct_auth('public', 'password') api_caller.change_repo_visibility('public', 'newrepo', 'public') with FeatureFlagValue('ANONYMOUS_ACCESS', False, registry_server_executor.on(liveserver)): # Attempt again to pull the (now public) repo anonymously, which should fail since # the feature flag for anonymous access is turned off. puller.pull(liveserver_session, 'public', 'newrepo', 'latest', basic_images, expected_failure=Failures.ANONYMOUS_NOT_ALLOWED) # Using a non-public user should now succeed. puller.pull(liveserver_session, 'public', 'newrepo', 'latest', basic_images, credentials=('devtable', 'password')) def test_basic_organization_flow(pusher, puller, basic_images, liveserver_session, app_reloader): """ Test: Basic push and pull of an image to a new repository by members of an organization. """ # Add a new repository under the organization via the creator user. pusher.push(liveserver_session, 'buynlarge', 'newrepo', 'latest', basic_images, credentials=('creator', 'password')) # Ensure that the creator can pull it. puller.pull(liveserver_session, 'buynlarge', 'newrepo', 'latest', basic_images, credentials=('creator', 'password')) # Ensure that the admin can pull it. puller.pull(liveserver_session, 'buynlarge', 'newrepo', 'latest', basic_images, credentials=('devtable', 'password')) # Ensure that the reader *cannot* pull it. puller.pull(liveserver_session, 'buynlarge', 'newrepo', 'latest', basic_images, credentials=('reader', 'password'), expected_failure=Failures.UNAUTHORIZED) def test_library_support(pusher, puller, basic_images, liveserver_session, app_reloader): """ Test: Pushing and pulling from the implicit library namespace. """ credentials = ('devtable', 'password') # Push a new repository. pusher.push(liveserver_session, '', 'newrepo', 'latest', basic_images, credentials=credentials) # Pull the repository to verify. puller.pull(liveserver_session, '', 'newrepo', 'latest', basic_images, credentials=credentials) # Pull the repository from the library namespace to verify. puller.pull(liveserver_session, 'library', 'newrepo', 'latest', basic_images, credentials=credentials) def test_library_namespace_with_support_disabled(pusher, puller, basic_images, liveserver_session, app_reloader, liveserver, registry_server_executor): """ Test: Pushing and pulling from the explicit library namespace, even when the implicit one is disabled. """ credentials = ('devtable', 'password') with FeatureFlagValue('LIBRARY_SUPPORT', False, registry_server_executor.on(liveserver)): # Push a new repository. pusher.push(liveserver_session, 'library', 'newrepo', 'latest', basic_images, credentials=credentials) # Pull the repository from the library namespace to verify. puller.pull(liveserver_session, 'library', 'newrepo', 'latest', basic_images, credentials=credentials) def test_push_library_with_support_disabled(pusher, basic_images, liveserver_session, app_reloader, liveserver, registry_server_executor): """ Test: Pushing to the implicit library namespace, when disabled, should fail. """ credentials = ('devtable', 'password') with FeatureFlagValue('LIBRARY_SUPPORT', False, registry_server_executor.on(liveserver)): # Attempt to push a new repository. pusher.push(liveserver_session, '', 'newrepo', 'latest', basic_images, credentials=credentials, expected_failure=Failures.DISALLOWED_LIBRARY_NAMESPACE) def test_pull_library_with_support_disabled(puller, basic_images, liveserver_session, app_reloader, liveserver, registry_server_executor): """ Test: Pushing to the implicit library namespace, when disabled, should fail. """ credentials = ('devtable', 'password') with FeatureFlagValue('LIBRARY_SUPPORT', False, registry_server_executor.on(liveserver)): # Attempt to pull the repository from the library namespace. puller.pull(liveserver_session, '', 'newrepo', 'latest', basic_images, credentials=credentials, expected_failure=Failures.DISALLOWED_LIBRARY_NAMESPACE) def test_image_replication(pusher, basic_images, liveserver_session, app_reloader, liveserver, registry_server_executor): """ Test: Ensure that entries are created for replication of the images pushed. """ credentials = ('devtable', 'password') with FeatureFlagValue('STORAGE_REPLICATION', True, registry_server_executor.on(liveserver)): pusher.push(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images, credentials=credentials) # Ensure that entries were created for each image. for image in basic_images: r = registry_server_executor.on(liveserver).get_storage_replication_entry(image.id) assert r.text == 'OK' @pytest.mark.parametrize('repo_name, expected_failure', [ ('something', None), ('some/slash', Failures.SLASH_REPOSITORY), pytest.param('x' * 255, None, id='Valid long name'), pytest.param('x' * 256, Failures.INVALID_REPOSITORY, id='Name too long'), ]) def test_push_reponame(repo_name, expected_failure, pusher, puller, basic_images, liveserver_session, app_reloader): """ Test: Attempt to add a repository with various names. """ credentials = ('devtable', 'password') pusher.push(liveserver_session, 'devtable', repo_name, 'latest', basic_images, credentials=credentials, expected_failure=expected_failure) if expected_failure is None: puller.pull(liveserver_session, 'devtable', repo_name, 'latest', basic_images, credentials=credentials) @pytest.mark.parametrize('tag_name, expected_failure', [ ('l', None), ('1', None), ('x' * 128, None), ('', Failures.MISSING_TAG), ('x' * 129, Failures.INVALID_TAG), ('.fail', Failures.INVALID_TAG), ('-fail', Failures.INVALID_TAG), ]) def test_tag_validaton(tag_name, expected_failure, pusher, basic_images, liveserver_session, app_reloader): """ Test: Various forms of tags and whether they succeed or fail as expected. """ credentials = ('devtable', 'password') pusher.push(liveserver_session, 'devtable', 'newrepo', tag_name, basic_images, credentials=credentials, expected_failure=expected_failure) def test_invalid_parent(pusher, liveserver_session, app_reloader): """ Test: Attempt to push an image with an invalid/missing parent. """ images = [ Image(id='childimage', parent_id='parentimage', size=None, bytes=layer_bytes_for_contents('child')), ] credentials = ('devtable', 'password') pusher.push(liveserver_session, 'devtable', 'newrepo', 'latest', images, credentials=credentials, expected_failure=Failures.INVALID_IMAGES) def test_wrong_image_order(pusher, liveserver_session, app_reloader): """ Test: Attempt to push an image with its layers in the wrong order. """ images = [ Image(id='childimage', parent_id='parentimage', size=None, bytes=layer_bytes_for_contents('child')), Image(id='parentimage', parent_id=None, size=None, bytes=layer_bytes_for_contents('parent')), ] credentials = ('devtable', 'password') pusher.push(liveserver_session, 'devtable', 'newrepo', 'latest', images, credentials=credentials, expected_failure=Failures.INVALID_IMAGES) @pytest.mark.parametrize('labels', [ # Basic labels. [('foo', 'bar', 'text/plain'), ('baz', 'meh', 'text/plain')], # Theoretically invalid, but allowed when pushed via registry protocol. [('theoretically-invalid--label', 'foo', 'text/plain')], # JSON label. [('somejson', '{"some": "json"}', 'application/json'), ('plain', '', 'text/plain')], # JSON-esque (but not valid JSON) labels. [('foo', '[hello world]', 'text/plain'), ('bar', '{wassup?!}', 'text/plain')], ]) def test_labels(labels, manifest_protocol, liveserver_session, api_caller, app_reloader): """ Test: Image pushed with labels has those labels found in the database after the push succeeds. """ images = [ Image(id='theimage', parent_id=None, bytes=layer_bytes_for_contents('image'), config={'Labels': {key: value for (key, value, _) in labels}}), ] credentials = ('devtable', 'password') result = manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest', images, credentials=credentials) digest = result.manifests['latest'].digest api_caller.conduct_auth('devtable', 'password') data = api_caller.get('/api/v1/repository/devtable/newrepo/manifest/%s/labels' % digest).json() labels_found = data['labels'] assert len(labels_found) == len(labels) labels_found_map = {l['key']: l for l in labels_found} assert set(images[0].config['Labels'].keys()) == set(labels_found_map.keys()) for key, _, media_type in labels: assert labels_found_map[key]['source_type'] == 'manifest' assert labels_found_map[key]['media_type'] == media_type @pytest.mark.parametrize('label_value, expected_expiration', [ ('1d', True), ('1h', True), ('2w', True), ('1g', False), ('something', False), ]) def test_expiration_label(label_value, expected_expiration, manifest_protocol, liveserver_session, api_caller, app_reloader): """ Test: Tag pushed with a valid `quay.expires-after` will have its expiration set to its start time plus the duration specified. If the duration is invalid, no expiration will be set. """ images = [ Image(id='theimage', parent_id=None, bytes=layer_bytes_for_contents('image'), config={'Labels': {'quay.expires-after': label_value}}), ] credentials = ('devtable', 'password') manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest', images, credentials=credentials) api_caller.conduct_auth('devtable', 'password') tag_data = api_caller.get('/api/v1/repository/devtable/newrepo/tag').json()['tags'][0] if expected_expiration: diff = convert_to_timedelta(label_value).total_seconds() assert tag_data['end_ts'] == tag_data['start_ts'] + diff else: assert tag_data.get('end_ts') is None @pytest.mark.parametrize('content_type', [ 'application/vnd.oci.image.manifest.v1+json', 'application/vnd.docker.distribution.manifest.v2+json', ]) def test_unsupported_manifest_content_type(content_type, manifest_protocol, basic_images, liveserver_session, app_reloader): """ Test: Attempt to push a manifest with an unsupported media type. """ credentials = ('devtable', 'password') options = ProtocolOptions() options.manifest_content_type = content_type # Attempt to push a new repository. manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images, credentials=credentials, options=options, expected_failure=Failures.UNSUPPORTED_CONTENT_TYPE) @pytest.mark.parametrize('accept_mimetypes', [ [('application/vnd.oci.image.manifest.v1+json', 1)], [('application/vnd.docker.distribution.manifest.v2+json', 1), ('application/vnd.docker.distribution.manifest.list.v2+json', 1)], [('application/vnd.foo.bar', 1)], ]) def test_unsupported_manifest_accept_headers(accept_mimetypes, manifest_protocol, basic_images, liveserver_session, app_reloader): """ Test: Attempt to push a manifest with an unsupported accept headers. """ credentials = ('devtable', 'password') options = ProtocolOptions() options.manifest_content_type = DOCKER_SCHEMA1_MANIFEST_CONTENT_TYPE options.accept_mimetypes = str(Accept(accept_mimetypes)) # Attempt to push a new repository. manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images, credentials=credentials, options=options, expected_failure=Failures.UNSUPPORTED_CONTENT_TYPE) def test_invalid_blob_reference(manifest_protocol, basic_images, liveserver_session, app_reloader): """ Test: Attempt to push a manifest with an invalid blob reference. """ credentials = ('devtable', 'password') options = ProtocolOptions() options.manifest_invalid_blob_references = True # Attempt to push a new repository. manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images, credentials=credentials, options=options, expected_failure=Failures.INVALID_BLOB) def test_delete_tag(pusher, puller, basic_images, liveserver_session, app_reloader): """ Test: Push a repository, delete a tag, and attempt to pull. """ credentials = ('devtable', 'password') # Push the tags. result = pusher.push(liveserver_session, 'devtable', 'newrepo', ['one', 'two'], basic_images, credentials=credentials) # Delete tag `one` by digest or tag. pusher.delete(liveserver_session, 'devtable', 'newrepo', result.manifests['one'].digest if result.manifests else 'one', credentials=credentials) # Attempt to pull tag `one` and ensure it doesn't work. puller.pull(liveserver_session, 'devtable', 'newrepo', 'one', basic_images, credentials=credentials, expected_failure=Failures.UNKNOWN_TAG) # Pull tag `two` to verify it works. puller.pull(liveserver_session, 'devtable', 'newrepo', 'two', basic_images, credentials=credentials) def test_cancel_upload(manifest_protocol, basic_images, liveserver_session, app_reloader): """ Test: Cancelation of blob uploads. """ credentials = ('devtable', 'password') options = ProtocolOptions() options.cancel_blob_upload = True manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images, credentials=credentials) def test_blob_caching(manifest_protocol, basic_images, liveserver_session, app_reloader, liveserver, registry_server_executor): """ Test: Pulling of blobs after initially pulled will result in the blobs being cached. """ credentials = ('devtable', 'password') # Push a tag. result = manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images, credentials=credentials) # Conduct the initial pull to prime the cache. manifest_protocol.pull(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images, credentials=credentials) # Disconnect the server from the database. registry_server_executor.on(liveserver).break_database() # Pull each blob, which should succeed due to caching. If caching is broken, this will # fail when it attempts to hit the database. for layer in result.manifests['latest'].layers: blob_id = str(layer.digest) r = liveserver_session.get('/v2/devtable/newrepo/blobs/%s' % blob_id, headers=result.headers) assert r.status_code == 200 @pytest.mark.parametrize('chunks', [ # Two chunks. [(0, 100), (100, None)], # Multiple chunks. [(0, 10), (10, 20), (20, None)], [(0, 10), (10, 20), (20, 30), (30, 40), (40, 50), (50, None)], # Overlapping chunks. [(0, 1024), (10, None)], ]) def test_chunked_blob_uploading(chunks, random_layer_data, manifest_protocol, puller, liveserver_session, app_reloader): """ Test: Uploading of blobs as chunks. """ credentials = ('devtable', 'password') adjusted_chunks = [] for start_byte, end_byte in chunks: adjusted_chunks.append((start_byte, end_byte if end_byte else len(random_layer_data))) images = [ Image(id='theimage', parent_id=None, bytes=random_layer_data), ] options = ProtocolOptions() options.chunks_for_upload = adjusted_chunks # Push the image, using the specified chunking. manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest', images, credentials=credentials, options=options) # Pull to verify the image was created. puller.pull(liveserver_session, 'devtable', 'newrepo', 'latest', images, credentials=credentials) def test_chunked_uploading_mismatched_chunks(manifest_protocol, random_layer_data, liveserver_session, app_reloader): """ Test: Attempt to upload chunks with data missing. """ credentials = ('devtable', 'password') images = [ Image(id='theimage', parent_id=None, bytes=random_layer_data), ] # Note: Byte #100 is missing. options = ProtocolOptions() options.chunks_for_upload = [(0, 100), (101, len(random_layer_data), 416)] # Attempt to push, with the chunked upload failing. manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest', images, credentials=credentials, options=options) def test_pull_disabled_namespace(pusher, puller, basic_images, liveserver_session, app_reloader, liveserver, registry_server_executor): """ Test: Attempt to pull a repository from a disabled namespace results in an error. """ credentials = ('devtable', 'password') # Push a new repository. pusher.push(liveserver_session, 'buynlarge', 'someneworgrepo', 'latest', basic_images, credentials=credentials) # Disable the namespace. registry_server_executor.on(liveserver).disable_namespace('buynlarge') # Attempt to pull, which should fail. puller.pull(liveserver_session, 'buynlarge', 'someneworgrepo', 'latest', basic_images, credentials=credentials, expected_failure=Failures.NAMESPACE_DISABLED) def test_push_disabled_namespace(pusher, basic_images, liveserver_session, app_reloader, liveserver, registry_server_executor): """ Test: Attempt to push a repository from a disabled namespace results in an error. """ credentials = ('devtable', 'password') # Disable the namespace. registry_server_executor.on(liveserver).disable_namespace('buynlarge') # Attempt to push, which should fail. pusher.push(liveserver_session, 'buynlarge', 'someneworgrepo', 'latest', basic_images, credentials=credentials, expected_failure=Failures.NAMESPACE_DISABLED) def test_private_catalog_no_access(v2_protocol, liveserver_session, app_reloader, liveserver, registry_server_executor): """ Test: Ensure that accessing a private catalog with anonymous access results in no database connections. """ with FeatureFlagValue('PUBLIC_CATALOG', False, registry_server_executor.on(liveserver)): # Disconnect the server from the database. registry_server_executor.on(liveserver).break_database() results = v2_protocol.catalog(liveserver_session) assert not results @pytest.mark.parametrize('public_catalog, credentials, expected_repos', [ # No public access and no credentials => No results. (False, None, None), # Public access and no credentials => public repositories. (True, None, ['public/publicrepo']), # Private creds => private repositories. (False, ('devtable', 'password'), ['devtable/simple', 'devtable/complex', 'devtable/gargantuan']), (True, ('devtable', 'password'), ['devtable/simple', 'devtable/complex', 'devtable/gargantuan']), ]) @pytest.mark.parametrize('page_size', [ 1, 2, 10, 50, 100, ]) def test_catalog(public_catalog, credentials, expected_repos, page_size, v2_protocol, liveserver_session, app_reloader, liveserver, registry_server_executor): """ Test: Retrieving results from the V2 catalog. """ with FeatureFlagValue('PUBLIC_CATALOG', public_catalog, registry_server_executor.on(liveserver)): results = v2_protocol.catalog(liveserver_session, page_size=page_size, credentials=credentials, namespace='devtable', repo_name='simple') if expected_repos is None: assert len(results) == 0 else: assert set(expected_repos).issubset(set(results)) def test_catalog_caching(v2_protocol, basic_images, liveserver_session, app_reloader, liveserver, registry_server_executor): """ Test: Calling the catalog after initially pulled will result in the catalog being cached. """ credentials = ('devtable', 'password') # Conduct the initial catalog call to prime the cache. results = v2_protocol.catalog(liveserver_session, credentials=credentials, namespace='devtable', repo_name='simple') token, _ = v2_protocol.auth(liveserver_session, credentials, 'devtable', 'simple') # Disconnect the server from the database. registry_server_executor.on(liveserver).break_database() # Call the catalog again, which should now be cached. cached_results = v2_protocol.catalog(liveserver_session, bearer_token=token) assert len(cached_results) == len(results) assert set(cached_results) == set(results) def test_catalog_disabled_namespace(v2_protocol, basic_images, liveserver_session, app_reloader, liveserver, registry_server_executor): credentials = ('devtable', 'password') # Get a valid token. token, _ = v2_protocol.auth(liveserver_session, credentials, 'devtable', 'simple') # Disable the devtable namespace. registry_server_executor.on(liveserver).disable_namespace('devtable') # Try to retrieve the catalog and ensure it fails to return any results. results = v2_protocol.catalog(liveserver_session, bearer_token=token) assert len(results) == 0 @pytest.mark.parametrize('username, namespace, repository', [ ('devtable', 'devtable', 'simple'), ('devtable', 'devtable', 'gargantuan'), ('public', 'public', 'publicrepo'), ('devtable', 'buynlarge', 'orgrepo'), ]) @pytest.mark.parametrize('page_size', [ 1, 2, 10, 50, 100, ]) def test_tags(username, namespace, repository, page_size, v2_protocol, liveserver_session, app_reloader, liveserver, registry_server_executor): """ Test: Retrieving results from the V2 catalog. """ credentials = (username, 'password') results = v2_protocol.tags(liveserver_session, page_size=page_size, credentials=credentials, namespace=namespace, repo_name=repository) expected_tags = [tag.name for tag in list_repository_tags(namespace, repository)] assert len(results) == len(expected_tags) assert set([r for r in results]) == set(expected_tags) def test_tags_disabled_namespace(v2_protocol, basic_images, liveserver_session, app_reloader, liveserver, registry_server_executor): credentials = ('devtable', 'password') # Disable the buynlarge namespace. registry_server_executor.on(liveserver).disable_namespace('buynlarge') # Try to retrieve the tags and ensure it fails. v2_protocol.tags(liveserver_session, credentials=credentials, namespace='buynlarge', repo_name='orgrepo', expected_failure=Failures.NAMESPACE_DISABLED) def test_pull_torrent(pusher, basic_images, liveserver_session, liveserver, registry_server_executor, app_reloader): """ Test: Retrieve a torrent for pulling the image via the Quay CLI. """ credentials = ('devtable', 'password') # Push an image to download. pusher.push(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images, credentials=credentials) # Required for torrent. registry_server_executor.on(liveserver).set_supports_direct_download(True) # For each layer, retrieve a torrent for the blob. for image in basic_images: digest = 'sha256:' + hashlib.sha256(image.bytes).hexdigest() response = liveserver_session.get('/c1/torrent/devtable/newrepo/blobs/%s' % digest, auth=credentials) torrent_info = bencode.bdecode(response.content) # Check the announce URL. assert torrent_info['url-list'] == 'http://somefakeurl?goes=here' # Check the metadata. assert torrent_info.get('info', {}).get('pieces') is not None assert torrent_info.get('announce') is not None # Check the pieces. sha = resumablehashlib.sha1() sha.update(image.bytes) expected = binascii.hexlify(sha.digest()) found = binascii.hexlify(torrent_info['info']['pieces']) assert expected == found def test_squashed_image_disabled_namespace(pusher, sized_images, liveserver_session, liveserver, registry_server_executor, app_reloader): """ Test: Attempting to pull a squashed image from a disabled namespace. """ credentials = ('devtable', 'password') # Push an image to download. pusher.push(liveserver_session, 'buynlarge', 'newrepo', 'latest', sized_images, credentials=credentials) # Disable the buynlarge namespace. registry_server_executor.on(liveserver).disable_namespace('buynlarge') # Attempt to pull the squashed version. response = liveserver_session.get('/c1/squash/buynlarge/newrepo/latest', auth=credentials) assert response.status_code == 400 def test_squashed_image_disabled_user(pusher, sized_images, liveserver_session, liveserver, registry_server_executor, app_reloader): """ Test: Attempting to pull a squashed image via a disabled user. """ credentials = ('devtable', 'password') # Push an image to download. pusher.push(liveserver_session, 'buynlarge', 'newrepo', 'latest', sized_images, credentials=credentials) # Disable the devtable namespace. registry_server_executor.on(liveserver).disable_namespace('devtable') # Attempt to pull the squashed version. response = liveserver_session.get('/c1/squash/buynlarge/newrepo/latest', auth=credentials) assert response.status_code == 403 @pytest.mark.parametrize('use_estimates', [ False, True, ]) def test_multilayer_squashed_images(use_estimates, pusher, multi_layer_images, liveserver_session, liveserver, registry_server_executor, app_reloader): """ Test: Pulling of multilayer, complex squashed images. """ credentials = ('devtable', 'password') # Push an image to download. pusher.push(liveserver_session, 'devtable', 'newrepo', 'latest', multi_layer_images, credentials=credentials) if use_estimates: # Clear the uncompressed size stored for the images, to ensure that we estimate instead. for image in multi_layer_images: registry_server_executor.on(liveserver).clear_uncompressed_size(image.id) # Pull the squashed version. response = liveserver_session.get('/c1/squash/devtable/newrepo/latest', auth=credentials) assert response.status_code == 200 tar = tarfile.open(fileobj=StringIO(response.content)) # Verify the squashed image. expected_image_id = 'cdc6d6c0d07d2cbacfc579e49ce0c256c5084b9b2b16c1b1b0c45f26a12a4ba5' expected_names = ['repositories', expected_image_id, '%s/json' % expected_image_id, '%s/VERSION' % expected_image_id, '%s/layer.tar' % expected_image_id] assert tar.getnames() == expected_names # Verify the JSON image data. json_data = (tar.extractfile(tar.getmember('%s/json' % expected_image_id)).read()) # Ensure the JSON loads and parses. result = json.loads(json_data) assert result['id'] == expected_image_id assert result['config']['internal_id'] == 'layer5' # Ensure that squashed layer tar can be opened. tar = tarfile.open(fileobj=tar.extractfile(tar.getmember('%s/layer.tar' % expected_image_id))) assert set(tar.getnames()) == {'contents', 'file1', 'file2', 'file3', 'file4'} # Check the contents of various files. assert tar.extractfile('contents').read() == 'layer 5 contents' assert tar.extractfile('file1').read() == 'from-layer-3' assert tar.extractfile('file2').read() == 'from-layer-2' assert tar.extractfile('file3').read() == 'from-layer-4' assert tar.extractfile('file4').read() == 'from-layer-5' @pytest.mark.parametrize('use_estimates', [ False, True, ]) def test_squashed_images(use_estimates, pusher, sized_images, liveserver_session, liveserver, registry_server_executor, app_reloader): """ Test: Pulling of squashed images. """ credentials = ('devtable', 'password') # Push an image to download. pusher.push(liveserver_session, 'devtable', 'newrepo', 'latest', sized_images, credentials=credentials) if use_estimates: # Clear the uncompressed size stored for the images, to ensure that we estimate instead. for image in sized_images: registry_server_executor.on(liveserver).clear_uncompressed_size(image.id) # Pull the squashed version. response = liveserver_session.get('/c1/squash/devtable/newrepo/latest', auth=credentials) assert response.status_code == 200 tar = tarfile.open(fileobj=StringIO(response.content)) # Verify the squashed image. expected_image_id = 'cdc6d6c0d07d2cbacfc579e49ce0c256c5084b9b2b16c1b1b0c45f26a12a4ba5' expected_names = ['repositories', expected_image_id, '%s/json' % expected_image_id, '%s/VERSION' % expected_image_id, '%s/layer.tar' % expected_image_id] assert tar.getnames() == expected_names # Verify the JSON image data. json_data = (tar.extractfile(tar.getmember('%s/json' % expected_image_id)).read()) # Ensure the JSON loads and parses. result = json.loads(json_data) assert result['id'] == expected_image_id assert result['config']['foo'] == 'childbar' # Ensure that squashed layer tar can be opened. tar = tarfile.open(fileobj=tar.extractfile(tar.getmember('%s/layer.tar' % expected_image_id))) assert tar.getnames() == ['contents'] # Check the contents. assert tar.extractfile('contents').read() == 'some contents' EXPECTED_ACI_MANIFEST = { "acKind": "ImageManifest", "app": { "environment": [], "mountPoints": [], "group": "root", "user": "root", "workingDirectory": "/", "exec": [u'/bin/sh', u'-c', u'""hello""'], "isolators": [], "eventHandlers": [], "ports": [], "annotations": [ {"name": "created", "value": ""}, {"name": "homepage", "value": "http://localhost:5000/devtable/newrepo:latest"}, {"name": "quay.io/derived-image", "value": "035333848582cdb72d2bac4a0809bc7eed9d88004cfb3463562013fce53c2499"}, ] }, "labels": [ {"name": "version", "value": "latest"}, {"name": "arch", "value": "amd64"}, {"name": "os", "value": "linux"} ], "acVersion": "0.6.1", "name": "localhost/devtable/newrepo", } def test_aci_conversion(pusher, sized_images, liveserver_session, liveserver, registry_server_executor, app_reloader): """ Test: Pulling of ACI converted images. """ credentials = ('devtable', 'password') # Push an image to download. pusher.push(liveserver_session, 'devtable', 'newrepo', 'latest', sized_images, credentials=credentials) # Pull the ACI version. response = liveserver_session.get('/c1/aci/server_name/devtable/newrepo/latest/aci/linux/amd64', auth=credentials) assert response.status_code == 200 tar = tarfile.open(fileobj=StringIO(response.content)) assert set(tar.getnames()) == {'manifest', 'rootfs', 'rootfs/contents'} assert tar.extractfile('rootfs/contents').read() == 'some contents' assert json.loads(tar.extractfile('manifest').read()) == EXPECTED_ACI_MANIFEST # Pull the ACI signature. response = liveserver_session.get('/c1/aci/server_name/devtable/newrepo/latest/aci.asc/linux/amd64', auth=credentials) assert response.status_code == 200 @pytest.mark.parametrize('push_user, push_namespace, push_repo, mount_repo_name, expected_failure', [ # Successful mount, same namespace. ('devtable', 'devtable', 'baserepo', 'devtable/baserepo', None), # Successful mount, cross namespace. ('devtable', 'buynlarge', 'baserepo', 'buynlarge/baserepo', None), # Unsuccessful mount, unknown repo. ('devtable', 'devtable', 'baserepo', 'unknown/repohere', Failures.UNAUTHORIZED_FOR_MOUNT), # Unsuccessful mount, no access. ('public', 'public', 'baserepo', 'public/baserepo', Failures.UNAUTHORIZED_FOR_MOUNT), ]) def test_blob_mounting(push_user, push_namespace, push_repo, mount_repo_name, expected_failure, manifest_protocol, pusher, puller, basic_images, liveserver_session, app_reloader): # Push an image so we can attempt to mount it. pusher.push(liveserver_session, push_namespace, push_repo, 'latest', basic_images, credentials=(push_user, 'password')) # Push again, trying to mount the image layer(s) from the mount repo. options = ProtocolOptions() options.scopes = ['repository:devtable/newrepo:push,pull', 'repository:%s:pull' % (mount_repo_name)] options.mount_blobs = {image.id: mount_repo_name for image in basic_images} manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images, credentials=('devtable', 'password'), options=options, expected_failure=expected_failure) if expected_failure is None: # Pull to ensure it worked. puller.pull(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images, credentials=('devtable', 'password')) def get_robot_password(api_caller): api_caller.conduct_auth('devtable', 'password') resp = api_caller.get('/api/v1/organization/buynlarge/robots/ownerbot') return resp.json()['token'] def get_encrypted_password(api_caller): api_caller.conduct_auth('devtable', 'password') resp = api_caller.post('/api/v1/user/clientkey', data=json.dumps(dict(password='password')), headers={'Content-Type': 'application/json'}) return resp.json()['key'] @pytest.mark.parametrize('username, password, expect_success', [ # Invalid username. ('invaliduser', 'somepassword', False), # Invalid password. ('devtable', 'invalidpassword', False), # Invalid OAuth token. ('$oauthtoken', 'unknown', False), # Invalid CLI token. ('$app', 'invalid', False), # Valid. ('devtable', 'password', True), # Robot. ('buynlarge+ownerbot', get_robot_password, True), # Encrypted password. ('devtable', get_encrypted_password, True), # OAuth. ('$oauthtoken', 'test', True), # CLI Token. ('$app', 'test', True), ]) def test_login(username, password, expect_success, loginer, liveserver_session, api_caller, app_reloader): """ Test: Login flow. """ if not isinstance(password, str): password = password(api_caller) loginer.login(liveserver_session, username, password, [], expect_success) @pytest.mark.parametrize('username, password, scopes, expected_access, expect_success', [ # No scopes. ('devtable', 'password', [], [], True), # Basic pull. ('devtable', 'password', ['repository:devtable/simple:pull'], [ {'type': 'repository', 'name': 'devtable/simple', 'actions': ['pull']}, ], True), # Basic push. ('devtable', 'password', ['repository:devtable/simple:push'], [ {'type': 'repository', 'name': 'devtable/simple', 'actions': ['push']}, ], True), # Basic push/pull. ('devtable', 'password', ['repository:devtable/simple:push,pull'], [ {'type': 'repository', 'name': 'devtable/simple', 'actions': ['push', 'pull']}, ], True), # Admin. ('devtable', 'password', ['repository:devtable/simple:push,pull,*'], [ {'type': 'repository', 'name': 'devtable/simple', 'actions': ['push', 'pull', '*']}, ], True), # Basic pull with endpoint. ('devtable', 'password', ['repository:localhost:5000/devtable/simple:pull'], [ {'type': 'repository', 'name': 'localhost:5000/devtable/simple', 'actions': ['pull']}, ], True), # Basic pull with invalid endpoint. ('devtable', 'password', ['repository:someinvalid/devtable/simple:pull'], [], False), # Pull with no access. ('public', 'password', ['repository:devtable/simple:pull'], [ {'type': 'repository', 'name': 'devtable/simple', 'actions': []}, ], True), # Anonymous push and pull on a private repository. ('', '', ['repository:devtable/simple:pull,push'], [ {'type': 'repository', 'name': 'devtable/simple', 'actions': []}, ], True), # Pull and push with no push access. ('reader', 'password', ['repository:buynlarge/orgrepo:pull,push'], [ {'type': 'repository', 'name': 'buynlarge/orgrepo', 'actions': ['pull']}, ], True), # OAuth. ('$oauthtoken', 'test', ['repository:public/publicrepo:pull,push'], [ {'type': 'repository', 'name': 'public/publicrepo', 'actions': ['pull']}, ], True), # Anonymous public repo. ('', '', ['repository:public/publicrepo:pull,push'], [ {'type': 'repository', 'name': 'public/publicrepo', 'actions': ['pull']}, ], True), # Multiple scopes. ('devtable', 'password', ['repository:devtable/simple:push,pull,*', 'repository:buynlarge/orgrepo:pull'], [ {'type': 'repository', 'name': 'devtable/simple', 'actions': ['push', 'pull', '*']}, {'type': 'repository', 'name': 'buynlarge/orgrepo', 'actions': ['pull']}, ], True), # Multiple scopes. ('devtable', 'password', ['repository:devtable/simple:push,pull,*', 'repository:public/publicrepo:push,pull'], [ {'type': 'repository', 'name': 'devtable/simple', 'actions': ['push', 'pull', '*']}, {'type': 'repository', 'name': 'public/publicrepo', 'actions': ['pull']}, ], True), ]) def test_login_scopes(username, password, scopes, expected_access, expect_success, v2_protocol, liveserver_session, api_caller, app_reloader): """ Test: Login via the V2 auth protocol reacts correctly to requested scopes. """ if not isinstance(password, str): password = password(api_caller) response = v2_protocol.login(liveserver_session, username, password, scopes, expect_success) if not expect_success: return # Validate the returned token. encoded = response.json()['token'] payload = decode_bearer_header('Bearer ' + encoded, instance_keys, {'SERVER_HOSTNAME': 'localhost:5000'}) assert payload is not None assert payload['access'] == expected_access def test_push_pull_same_blobs(pusher, puller, liveserver_session, app_reloader): """ Test: Push and pull of an image to a new repository where a blob is shared between layers. """ credentials = ('devtable', 'password') layer_bytes = layer_bytes_for_contents('some contents') images = [ Image(id='parentid', bytes=layer_bytes, parent_id=None), Image(id='someid', bytes=layer_bytes, parent_id='parentid'), ] options = ProtocolOptions() options.skip_head_checks = True # Since the blob will already exist. # Push a new repository. pusher.push(liveserver_session, 'devtable', 'newrepo', 'latest', images, credentials=credentials, options=options) # Pull the repository to verify. puller.pull(liveserver_session, 'devtable', 'newrepo', 'latest', images, credentials=credentials, options=options)