Note that this change does *not* enable the new data model by default, but does allow it to be used when a special environment variable is specified.
		
			
				
	
	
		
			1342 lines
		
	
	
	
		
			54 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			1342 lines
		
	
	
	
		
			54 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # pylint: disable=W0401, W0621, W0613, W0614, R0913
 | |
| import os
 | |
| import hashlib
 | |
| import tarfile
 | |
| 
 | |
| from cStringIO import StringIO
 | |
| 
 | |
| import binascii
 | |
| import bencode
 | |
| import resumablehashlib
 | |
| 
 | |
| from werkzeug.datastructures import Accept
 | |
| 
 | |
| from test.fixtures import *
 | |
| from test.registry.liveserverfixture import *
 | |
| from test.registry.fixtures import *
 | |
| from test.registry.protocol_fixtures import *
 | |
| 
 | |
| from test.registry.protocols import Failures, Image, layer_bytes_for_contents, ProtocolOptions
 | |
| 
 | |
| from app import instance_keys
 | |
| from data.model.tag import list_repository_tags
 | |
| from image.docker.schema1 import DOCKER_SCHEMA1_MANIFEST_CONTENT_TYPE
 | |
| from util.security.registry_jwt import decode_bearer_header
 | |
| from util.timedeltastring import convert_to_timedelta
 | |
| 
 | |
| 
 | |
| def test_basic_push_pull(pusher, puller, basic_images, liveserver_session, app_reloader):
 | |
|   """ Test: Basic push and pull of an image to a new repository. """
 | |
|   credentials = ('devtable', 'password')
 | |
| 
 | |
|   # Push a new repository.
 | |
|   pusher.push(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images,
 | |
|               credentials=credentials)
 | |
| 
 | |
|   # Pull the repository to verify.
 | |
|   puller.pull(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images,
 | |
|               credentials=credentials)
 | |
| 
 | |
| 
 | |
| def test_multi_layer_images_push_pull(pusher, puller, multi_layer_images, liveserver_session,
 | |
|                                       app_reloader):
 | |
|   """ Test: Basic push and pull of a multi-layered image to a new repository. """
 | |
|   credentials = ('devtable', 'password')
 | |
| 
 | |
|   # Push a new repository.
 | |
|   pusher.push(liveserver_session, 'devtable', 'newrepo', 'latest', multi_layer_images,
 | |
|               credentials=credentials)
 | |
| 
 | |
|   # Pull the repository to verify.
 | |
|   puller.pull(liveserver_session, 'devtable', 'newrepo', 'latest', multi_layer_images,
 | |
|               credentials=credentials)
 | |
| 
 | |
| 
 | |
| def test_overwrite_tag(pusher, puller, basic_images, different_images, liveserver_session,
 | |
|                        app_reloader):
 | |
|   """ Test: Basic push and pull of an image to a new repository, followed by a push to the same
 | |
|       tag with different images. """
 | |
|   credentials = ('devtable', 'password')
 | |
| 
 | |
|   # Push a new repository.
 | |
|   pusher.push(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images,
 | |
|               credentials=credentials)
 | |
| 
 | |
|   # Pull the repository to verify.
 | |
|   puller.pull(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images,
 | |
|               credentials=credentials)
 | |
| 
 | |
|   # Push a new repository.
 | |
|   pusher.push(liveserver_session, 'devtable', 'newrepo', 'latest', different_images,
 | |
|               credentials=credentials)
 | |
| 
 | |
|   # Pull the repository to verify.
 | |
|   puller.pull(liveserver_session, 'devtable', 'newrepo', 'latest', different_images,
 | |
|               credentials=credentials)
 | |
| 
 | |
| 
 | |
| @pytest.mark.skipif(os.getenv('OCI_DATA_MODEL') == 'true', reason="no backfill in new model")
 | |
| def test_no_tag_manifests(pusher, puller, basic_images, liveserver_session, app_reloader,
 | |
|                           liveserver, registry_server_executor):
 | |
|   """ Test: Basic pull without manifests. """
 | |
|   credentials = ('devtable', 'password')
 | |
| 
 | |
|   # Push a new repository.
 | |
|   pusher.push(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images,
 | |
|               credentials=credentials)
 | |
| 
 | |
|   # Delete all tag manifests.
 | |
|   registry_server_executor.on(liveserver).delete_manifests()
 | |
| 
 | |
|   # Ensure we can still pull.
 | |
|   puller.pull(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images,
 | |
|               credentials=credentials)
 | |
| 
 | |
| 
 | |
| def test_basic_push_pull_by_manifest(manifest_protocol, basic_images, liveserver_session,
 | |
|                                      app_reloader):
 | |
|   """ Test: Basic push and pull-by-manifest of an image to a new repository. """
 | |
|   credentials = ('devtable', 'password')
 | |
| 
 | |
|   # Push a new repository.
 | |
|   result = manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images,
 | |
|                                   credentials=credentials)
 | |
| 
 | |
|   # Pull the repository by digests to verify.
 | |
|   digests = [str(manifest.digest) for manifest in result.manifests.values()]
 | |
|   manifest_protocol.pull(liveserver_session, 'devtable', 'newrepo', digests, basic_images,
 | |
|                          credentials=credentials)
 | |
| 
 | |
| 
 | |
| def test_basic_push_by_manifest_digest(manifest_protocol, basic_images, liveserver_session,
 | |
|                                        app_reloader):
 | |
|   """ Test: Basic push-by-manifest and pull-by-manifest of an image to a new repository. """
 | |
|   credentials = ('devtable', 'password')
 | |
| 
 | |
|   # Push a new repository.
 | |
|   options = ProtocolOptions()
 | |
|   options.push_by_manifest_digest = True
 | |
| 
 | |
|   result = manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images,
 | |
|                                   credentials=credentials, options=options)
 | |
| 
 | |
|   # Pull the repository by digests to verify.
 | |
|   digests = [str(manifest.digest) for manifest in result.manifests.values()]
 | |
|   manifest_protocol.pull(liveserver_session, 'devtable', 'newrepo', digests, basic_images,
 | |
|                          credentials=credentials)
 | |
| 
 | |
| 
 | |
| 
 | |
| def test_push_invalid_credentials(pusher, basic_images, liveserver_session, app_reloader):
 | |
|   """ Test: Ensure we get auth errors when trying to push with invalid credentials. """
 | |
|   invalid_credentials = ('devtable', 'notcorrectpassword')
 | |
| 
 | |
|   pusher.push(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images,
 | |
|               credentials=invalid_credentials, expected_failure=Failures.UNAUTHENTICATED)
 | |
| 
 | |
| 
 | |
| def test_pull_invalid_credentials(puller, basic_images, liveserver_session, app_reloader):
 | |
|   """ Test: Ensure we get auth errors when trying to pull with invalid credentials. """
 | |
|   invalid_credentials = ('devtable', 'notcorrectpassword')
 | |
| 
 | |
|   puller.pull(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images,
 | |
|               credentials=invalid_credentials, expected_failure=Failures.UNAUTHENTICATED)
 | |
| 
 | |
| 
 | |
| def test_push_pull_formerly_bad_repo_name(pusher, puller, basic_images, liveserver_session,
 | |
|                                           app_reloader):
 | |
|   """ Test: Basic push and pull of an image to a new repository with a name that formerly
 | |
|             failed. """
 | |
|   credentials = ('devtable', 'password')
 | |
| 
 | |
|   # Push a new repository.
 | |
|   pusher.push(liveserver_session, 'devtable', 'foo.bar', 'latest', basic_images,
 | |
|               credentials=credentials)
 | |
| 
 | |
|   # Pull the repository to verify.
 | |
|   puller.pull(liveserver_session, 'devtable', 'foo.bar', 'latest', basic_images,
 | |
|               credentials=credentials)
 | |
| 
 | |
| 
 | |
| def test_application_repo(pusher, puller, basic_images, liveserver_session, app_reloader,
 | |
|                           registry_server_executor, liveserver):
 | |
|   """ Test: Attempting to push or pull from an *application* repository raises a 405. """
 | |
|   credentials = ('devtable', 'password')
 | |
|   registry_server_executor.on(liveserver).create_app_repository('devtable', 'someapprepo')
 | |
| 
 | |
|   # Attempt to push to the repository.
 | |
|   pusher.push(liveserver_session, 'devtable', 'someapprepo', 'latest', basic_images,
 | |
|               credentials=credentials, expected_failure=Failures.APP_REPOSITORY)
 | |
| 
 | |
|   # Attempt to pull from the repository.
 | |
|   puller.pull(liveserver_session, 'devtable', 'someapprepo', 'latest', basic_images,
 | |
|               credentials=credentials, expected_failure=Failures.APP_REPOSITORY)
 | |
| 
 | |
| 
 | |
| def test_middle_layer_different_sha(manifest_protocol, v1_protocol, liveserver_session,
 | |
|                                     app_reloader):
 | |
|   """ Test: Pushing of a 3-layer image with the *same* V1 ID's, but the middle layer having
 | |
|             different bytes, must result in new IDs being generated for the leaf layer, as
 | |
|             they point to different "images".
 | |
|   """
 | |
|   credentials = ('devtable', 'password')
 | |
|   first_images = [
 | |
|     Image(id='baseimage', parent_id=None, size=None, bytes=layer_bytes_for_contents('base')),
 | |
|     Image(id='middleimage', parent_id='baseimage', size=None,
 | |
|           bytes=layer_bytes_for_contents('middle')),
 | |
|     Image(id='leafimage', parent_id='middleimage', size=None,
 | |
|           bytes=layer_bytes_for_contents('leaf')),
 | |
|   ]
 | |
| 
 | |
|   # First push and pull the images, to ensure we have the basics setup and working.
 | |
|   manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest', first_images,
 | |
|                          credentials=credentials)
 | |
|   first_pull_result = manifest_protocol.pull(liveserver_session, 'devtable', 'newrepo', 'latest',
 | |
|                                              first_images, credentials=credentials)
 | |
|   first_manifest = first_pull_result.manifests['latest']
 | |
|   assert set([image.id for image in first_images]) == set(first_manifest.image_ids)
 | |
|   assert first_pull_result.image_ids['latest'] == 'leafimage'
 | |
| 
 | |
|   # Next, create an image list with the middle image's *bytes* changed.
 | |
|   second_images = list(first_images)
 | |
|   second_images[1] = Image(id='middleimage', parent_id='baseimage', size=None,
 | |
|                            bytes=layer_bytes_for_contents('different middle bytes'))
 | |
| 
 | |
|   # Push and pull the image, ensuring that the produced ID for the middle and leaf layers
 | |
|   # are synthesized.
 | |
|   options = ProtocolOptions()
 | |
|   options.munge_shas = True
 | |
|   options.skip_head_checks = True
 | |
| 
 | |
|   manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest', second_images,
 | |
|                          credentials=credentials, options=options)
 | |
|   second_pull_result = v1_protocol.pull(liveserver_session, 'devtable', 'newrepo', 'latest',
 | |
|                                         second_images, credentials=credentials, options=options)
 | |
| 
 | |
|   assert second_pull_result.image_ids['latest'] != 'leafimage'
 | |
| 
 | |
| 
 | |
| def add_robot(api_caller, _):
 | |
|   api_caller.conduct_auth('devtable', 'password')
 | |
|   resp = api_caller.get('/api/v1/organization/buynlarge/robots/ownerbot')
 | |
|   return ('buynlarge+ownerbot', resp.json()['token'])
 | |
| 
 | |
| 
 | |
| def add_token(_, executor):
 | |
|   return ('$token', executor.add_token().text)
 | |
| 
 | |
| 
 | |
| @pytest.mark.parametrize('credentials, namespace, expected_performer', [
 | |
|   (('devtable', 'password'), 'devtable', 'devtable'),
 | |
|   (add_robot, 'buynlarge', 'buynlarge+ownerbot'),
 | |
|   (('$oauthtoken', 'test'), 'devtable', 'devtable'),
 | |
|   (('$app', 'test'), 'devtable', 'devtable'),
 | |
|   (add_token, 'devtable', None),
 | |
| ])
 | |
| def test_push_pull_logging(credentials, namespace, expected_performer, pusher, puller, basic_images,
 | |
|                            liveserver_session, liveserver, api_caller, app_reloader,
 | |
|                            registry_server_executor):
 | |
|   """ Test: Basic push and pull, ensuring that logs are added for each operation. """
 | |
| 
 | |
|   # Create the repository before the test push.
 | |
|   start_images = [Image(id='startimage', parent_id=None, size=None,
 | |
|                         bytes=layer_bytes_for_contents('start image'))]
 | |
|   pusher.push(liveserver_session, namespace, 'newrepo', 'latest', start_images,
 | |
|               credentials=('devtable', 'password'))
 | |
| 
 | |
|   # Retrieve the credentials to use. This must be after the repo is created, because
 | |
|   # some credentials creation code adds the new entity to the repository.
 | |
|   if not isinstance(credentials, tuple):
 | |
|     credentials = credentials(api_caller, registry_server_executor.on(liveserver))
 | |
| 
 | |
|   # Push to the repository with the specified credentials.
 | |
|   pusher.push(liveserver_session, namespace, 'newrepo', 'anothertag', basic_images,
 | |
|               credentials=credentials)
 | |
| 
 | |
|   # Check the logs for the push.
 | |
|   api_caller.conduct_auth('devtable', 'password')
 | |
| 
 | |
|   result = api_caller.get('/api/v1/repository/%s/newrepo/logs' % namespace)
 | |
|   logs = result.json()['logs']
 | |
|   assert len(logs) == 2
 | |
|   assert logs[0]['kind'] == 'push_repo'
 | |
|   assert logs[0]['metadata']['namespace'] == namespace
 | |
|   assert logs[0]['metadata']['repo'] == 'newrepo'
 | |
| 
 | |
|   if expected_performer is not None:
 | |
|     assert logs[0]['performer']['name'] == expected_performer
 | |
| 
 | |
|   # Pull the repository to verify.
 | |
|   puller.pull(liveserver_session, namespace, 'newrepo', 'anothertag', basic_images,
 | |
|               credentials=credentials)
 | |
| 
 | |
|   # Check the logs for the pull.
 | |
|   result = api_caller.get('/api/v1/repository/%s/newrepo/logs' % namespace)
 | |
|   logs = result.json()['logs']
 | |
|   assert len(logs) == 3
 | |
|   assert logs[0]['kind'] == 'pull_repo'
 | |
|   assert logs[0]['metadata']['namespace'] == namespace
 | |
|   assert logs[0]['metadata']['repo'] == 'newrepo'
 | |
| 
 | |
|   if expected_performer is not None:
 | |
|     assert logs[0]['performer']['name'] == expected_performer
 | |
| 
 | |
| 
 | |
| def test_pull_publicrepo_anonymous(pusher, puller, basic_images, liveserver_session,
 | |
|                                    app_reloader, api_caller, liveserver):
 | |
|   """ Test: Pull a public repository anonymously. """
 | |
|   # Add a new repository under the public user, so we have a repository to pull.
 | |
|   pusher.push(liveserver_session, 'public', 'newrepo', 'latest', basic_images,
 | |
|               credentials=('public', 'password'))
 | |
| 
 | |
|   # First try to pull the (currently private) repo anonymously, which should fail (since it is
 | |
|   # private)
 | |
|   puller.pull(liveserver_session, 'public', 'newrepo', 'latest', basic_images,
 | |
|               expected_failure=Failures.UNAUTHORIZED)
 | |
| 
 | |
|   # Using a non-public user should also fail.
 | |
|   puller.pull(liveserver_session, 'public', 'newrepo', 'latest', basic_images,
 | |
|               credentials=('devtable', 'password'),
 | |
|               expected_failure=Failures.UNAUTHORIZED)
 | |
| 
 | |
|   # Make the repository public.
 | |
|   api_caller.conduct_auth('public', 'password')
 | |
|   api_caller.change_repo_visibility('public', 'newrepo', 'public')
 | |
| 
 | |
|   # Pull the repository anonymously, which should succeed because the repository is public.
 | |
|   puller.pull(liveserver_session, 'public', 'newrepo', 'latest', basic_images)
 | |
| 
 | |
| 
 | |
| def test_pull_publicrepo_no_anonymous_access(pusher, puller, basic_images, liveserver_session,
 | |
|                                              app_reloader, api_caller, liveserver,
 | |
|                                              registry_server_executor):
 | |
|   """ Test: Attempts to pull a public repository anonymously, with the feature flag disabled. """
 | |
|   # Add a new repository under the public user, so we have a repository to pull.
 | |
|   pusher.push(liveserver_session, 'public', 'newrepo', 'latest', basic_images,
 | |
|               credentials=('public', 'password'))
 | |
| 
 | |
|   # First try to pull the (currently private) repo anonymously, which should fail (since it is
 | |
|   # private)
 | |
|   puller.pull(liveserver_session, 'public', 'newrepo', 'latest', basic_images,
 | |
|               expected_failure=Failures.UNAUTHORIZED)
 | |
| 
 | |
|   # Using a non-public user should also fail.
 | |
|   puller.pull(liveserver_session, 'public', 'newrepo', 'latest', basic_images,
 | |
|               credentials=('devtable', 'password'),
 | |
|               expected_failure=Failures.UNAUTHORIZED)
 | |
| 
 | |
|   # Make the repository public.
 | |
|   api_caller.conduct_auth('public', 'password')
 | |
|   api_caller.change_repo_visibility('public', 'newrepo', 'public')
 | |
| 
 | |
|   with FeatureFlagValue('ANONYMOUS_ACCESS', False, registry_server_executor.on(liveserver)):
 | |
|     # Attempt again to pull the (now public) repo anonymously, which should fail since
 | |
|     # the feature flag for anonymous access is turned off.
 | |
|     puller.pull(liveserver_session, 'public', 'newrepo', 'latest', basic_images,
 | |
|                 expected_failure=Failures.ANONYMOUS_NOT_ALLOWED)
 | |
| 
 | |
|     # Using a non-public user should now succeed.
 | |
|     puller.pull(liveserver_session, 'public', 'newrepo', 'latest', basic_images,
 | |
|                 credentials=('devtable', 'password'))
 | |
| 
 | |
| 
 | |
| def test_basic_organization_flow(pusher, puller, basic_images, liveserver_session, app_reloader):
 | |
|   """ Test: Basic push and pull of an image to a new repository by members of an organization. """
 | |
|   # Add a new repository under the organization via the creator user.
 | |
|   pusher.push(liveserver_session, 'buynlarge', 'newrepo', 'latest', basic_images,
 | |
|               credentials=('creator', 'password'))
 | |
| 
 | |
|   # Ensure that the creator can pull it.
 | |
|   puller.pull(liveserver_session, 'buynlarge', 'newrepo', 'latest', basic_images,
 | |
|               credentials=('creator', 'password'))
 | |
| 
 | |
|   # Ensure that the admin can pull it.
 | |
|   puller.pull(liveserver_session, 'buynlarge', 'newrepo', 'latest', basic_images,
 | |
|               credentials=('devtable', 'password'))
 | |
| 
 | |
|   # Ensure that the reader *cannot* pull it.
 | |
|   puller.pull(liveserver_session, 'buynlarge', 'newrepo', 'latest', basic_images,
 | |
|               credentials=('reader', 'password'),
 | |
|               expected_failure=Failures.UNAUTHORIZED)
 | |
| 
 | |
| 
 | |
| def test_library_support(pusher, puller, basic_images, liveserver_session, app_reloader):
 | |
|   """ Test: Pushing and pulling from the implicit library namespace. """
 | |
|   credentials = ('devtable', 'password')
 | |
| 
 | |
|   # Push a new repository.
 | |
|   pusher.push(liveserver_session, '', 'newrepo', 'latest', basic_images,
 | |
|               credentials=credentials)
 | |
| 
 | |
|   # Pull the repository to verify.
 | |
|   puller.pull(liveserver_session, '', 'newrepo', 'latest', basic_images,
 | |
|               credentials=credentials)
 | |
| 
 | |
|   # Pull the repository from the library namespace to verify.
 | |
|   puller.pull(liveserver_session, 'library', 'newrepo', 'latest', basic_images,
 | |
|               credentials=credentials)
 | |
| 
 | |
| 
 | |
| def test_library_namespace_with_support_disabled(pusher, puller, basic_images, liveserver_session,
 | |
|                                                  app_reloader, liveserver,
 | |
|                                                  registry_server_executor):
 | |
|   """ Test: Pushing and pulling from the explicit library namespace, even when the
 | |
|             implicit one is disabled.
 | |
|   """
 | |
|   credentials = ('devtable', 'password')
 | |
| 
 | |
|   with FeatureFlagValue('LIBRARY_SUPPORT', False, registry_server_executor.on(liveserver)):
 | |
|     # Push a new repository.
 | |
|     pusher.push(liveserver_session, 'library', 'newrepo', 'latest', basic_images,
 | |
|                 credentials=credentials)
 | |
| 
 | |
|     # Pull the repository from the library namespace to verify.
 | |
|     puller.pull(liveserver_session, 'library', 'newrepo', 'latest', basic_images,
 | |
|                 credentials=credentials)
 | |
| 
 | |
| 
 | |
| def test_push_library_with_support_disabled(pusher, basic_images, liveserver_session,
 | |
|                                             app_reloader, liveserver,
 | |
|                                             registry_server_executor):
 | |
|   """ Test: Pushing to the implicit library namespace, when disabled,
 | |
|             should fail.
 | |
|   """
 | |
|   credentials = ('devtable', 'password')
 | |
| 
 | |
|   with FeatureFlagValue('LIBRARY_SUPPORT', False, registry_server_executor.on(liveserver)):
 | |
|     # Attempt to push a new repository.
 | |
|     pusher.push(liveserver_session, '', 'newrepo', 'latest', basic_images,
 | |
|                 credentials=credentials,
 | |
|                 expected_failure=Failures.DISALLOWED_LIBRARY_NAMESPACE)
 | |
| 
 | |
| 
 | |
| def test_pull_library_with_support_disabled(puller, basic_images, liveserver_session,
 | |
|                                             app_reloader, liveserver,
 | |
|                                             registry_server_executor):
 | |
|   """ Test: Pushing to the implicit library namespace, when disabled,
 | |
|             should fail.
 | |
|   """
 | |
|   credentials = ('devtable', 'password')
 | |
| 
 | |
|   with FeatureFlagValue('LIBRARY_SUPPORT', False, registry_server_executor.on(liveserver)):
 | |
|     # Attempt to pull the repository from the library namespace.
 | |
|     puller.pull(liveserver_session, '', 'newrepo', 'latest', basic_images,
 | |
|                 credentials=credentials,
 | |
|                 expected_failure=Failures.DISALLOWED_LIBRARY_NAMESPACE)
 | |
| 
 | |
| 
 | |
| def test_image_replication(pusher, basic_images, liveserver_session, app_reloader, liveserver,
 | |
|                            registry_server_executor):
 | |
|   """ Test: Ensure that entries are created for replication of the images pushed. """
 | |
|   credentials = ('devtable', 'password')
 | |
| 
 | |
|   with FeatureFlagValue('STORAGE_REPLICATION', True, registry_server_executor.on(liveserver)):
 | |
|     pusher.push(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images,
 | |
|                 credentials=credentials)
 | |
| 
 | |
|     # Ensure that entries were created for each image.
 | |
|     for image in basic_images:
 | |
|       r = registry_server_executor.on(liveserver).get_storage_replication_entry(image.id)
 | |
|       assert r.text == 'OK'
 | |
| 
 | |
| 
 | |
| @pytest.mark.parametrize('repo_name, expected_failure', [
 | |
|   ('something', None),
 | |
|   ('some/slash', Failures.SLASH_REPOSITORY),
 | |
|   pytest.param('x' * 255, None, id='Valid long name'),
 | |
|   pytest.param('x' * 256, Failures.INVALID_REPOSITORY, id='Name too long'),
 | |
| ])
 | |
| def test_push_reponame(repo_name, expected_failure, pusher, puller, basic_images,
 | |
|                        liveserver_session, app_reloader):
 | |
|   """ Test: Attempt to add a repository with various names.
 | |
|   """
 | |
|   credentials = ('devtable', 'password')
 | |
| 
 | |
|   pusher.push(liveserver_session, 'devtable', repo_name, 'latest', basic_images,
 | |
|               credentials=credentials,
 | |
|               expected_failure=expected_failure)
 | |
| 
 | |
|   if expected_failure is None:
 | |
|     puller.pull(liveserver_session, 'devtable', repo_name, 'latest', basic_images,
 | |
|                 credentials=credentials)
 | |
| 
 | |
| 
 | |
| @pytest.mark.parametrize('tag_name, expected_failure', [
 | |
|   ('l', None),
 | |
|   ('1', None),
 | |
|   ('x' * 128, None),
 | |
| 
 | |
|   ('', Failures.MISSING_TAG),
 | |
|   ('x' * 129, Failures.INVALID_TAG),
 | |
|   ('.fail', Failures.INVALID_TAG),
 | |
|   ('-fail', Failures.INVALID_TAG),
 | |
| ])
 | |
| def test_tag_validaton(tag_name, expected_failure, pusher, basic_images, liveserver_session,
 | |
|                        app_reloader):
 | |
|   """ Test: Various forms of tags and whether they succeed or fail as expected. """
 | |
|   credentials = ('devtable', 'password')
 | |
| 
 | |
|   pusher.push(liveserver_session, 'devtable', 'newrepo', tag_name, basic_images,
 | |
|               credentials=credentials,
 | |
|               expected_failure=expected_failure)
 | |
| 
 | |
| 
 | |
| def test_invalid_parent(pusher, liveserver_session, app_reloader):
 | |
|   """ Test: Attempt to push an image with an invalid/missing parent. """
 | |
|   images = [
 | |
|     Image(id='childimage', parent_id='parentimage', size=None,
 | |
|           bytes=layer_bytes_for_contents('child')),
 | |
|   ]
 | |
| 
 | |
|   credentials = ('devtable', 'password')
 | |
| 
 | |
|   pusher.push(liveserver_session, 'devtable', 'newrepo', 'latest', images,
 | |
|               credentials=credentials,
 | |
|               expected_failure=Failures.INVALID_IMAGES)
 | |
| 
 | |
| 
 | |
| def test_wrong_image_order(pusher, liveserver_session, app_reloader):
 | |
|   """ Test: Attempt to push an image with its layers in the wrong order. """
 | |
|   images = [
 | |
|     Image(id='childimage', parent_id='parentimage', size=None,
 | |
|           bytes=layer_bytes_for_contents('child')),
 | |
|     Image(id='parentimage', parent_id=None, size=None,
 | |
|           bytes=layer_bytes_for_contents('parent')),
 | |
|   ]
 | |
| 
 | |
|   credentials = ('devtable', 'password')
 | |
| 
 | |
|   pusher.push(liveserver_session, 'devtable', 'newrepo', 'latest', images,
 | |
|               credentials=credentials,
 | |
|               expected_failure=Failures.INVALID_IMAGES)
 | |
| 
 | |
| 
 | |
| @pytest.mark.parametrize('labels', [
 | |
|   # Basic labels.
 | |
|   [('foo', 'bar', 'text/plain'), ('baz', 'meh', 'text/plain')],
 | |
| 
 | |
|   # Theoretically invalid, but allowed when pushed via registry protocol.
 | |
|   [('theoretically-invalid--label', 'foo', 'text/plain')],
 | |
| 
 | |
|   # JSON label.
 | |
|   [('somejson', '{"some": "json"}', 'application/json'), ('plain', '', 'text/plain')],
 | |
| 
 | |
|   # JSON-esque (but not valid JSON) labels.
 | |
|   [('foo', '[hello world]', 'text/plain'), ('bar', '{wassup?!}', 'text/plain')],
 | |
| ])
 | |
| def test_labels(labels, manifest_protocol, liveserver_session, api_caller, app_reloader):
 | |
|   """ Test: Image pushed with labels has those labels found in the database after the
 | |
|       push succeeds.
 | |
|   """
 | |
|   images = [
 | |
|     Image(id='theimage', parent_id=None, bytes=layer_bytes_for_contents('image'),
 | |
|           config={'Labels': {key: value for (key, value, _) in labels}}),
 | |
|   ]
 | |
| 
 | |
|   credentials = ('devtable', 'password')
 | |
|   result = manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest', images,
 | |
|                                   credentials=credentials)
 | |
| 
 | |
|   digest = result.manifests['latest'].digest
 | |
|   api_caller.conduct_auth('devtable', 'password')
 | |
| 
 | |
|   data = api_caller.get('/api/v1/repository/devtable/newrepo/manifest/%s/labels' % digest).json()
 | |
|   labels_found = data['labels']
 | |
|   assert len(labels_found) == len(labels)
 | |
| 
 | |
|   labels_found_map = {l['key']: l for l in labels_found}
 | |
|   assert set(images[0].config['Labels'].keys()) == set(labels_found_map.keys())
 | |
| 
 | |
|   for key, _, media_type in labels:
 | |
|     assert labels_found_map[key]['source_type'] == 'manifest'
 | |
|     assert labels_found_map[key]['media_type'] == media_type
 | |
| 
 | |
| 
 | |
| @pytest.mark.parametrize('label_value, expected_expiration', [
 | |
|   ('1d', True),
 | |
|   ('1h', True),
 | |
|   ('2w', True),
 | |
| 
 | |
|   ('1g', False),
 | |
|   ('something', False),
 | |
| ])
 | |
| def test_expiration_label(label_value, expected_expiration, manifest_protocol, liveserver_session,
 | |
|                           api_caller, app_reloader):
 | |
|   """ Test: Tag pushed with a valid `quay.expires-after` will have its expiration set to its
 | |
|             start time plus the duration specified. If the duration is invalid, no expiration will
 | |
|             be set.
 | |
|   """
 | |
|   images = [
 | |
|     Image(id='theimage', parent_id=None, bytes=layer_bytes_for_contents('image'),
 | |
|           config={'Labels': {'quay.expires-after': label_value}}),
 | |
|   ]
 | |
| 
 | |
|   credentials = ('devtable', 'password')
 | |
|   manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest', images,
 | |
|                          credentials=credentials)
 | |
| 
 | |
|   api_caller.conduct_auth('devtable', 'password')
 | |
| 
 | |
|   tag_data = api_caller.get('/api/v1/repository/devtable/newrepo/tag').json()['tags'][0]
 | |
|   if expected_expiration:
 | |
|     diff = convert_to_timedelta(label_value).total_seconds()
 | |
|     assert tag_data['end_ts'] == tag_data['start_ts'] + diff
 | |
|   else:
 | |
|     assert tag_data.get('end_ts') is None
 | |
| 
 | |
| 
 | |
| @pytest.mark.parametrize('content_type', [
 | |
|   'application/vnd.oci.image.manifest.v1+json',
 | |
|   'application/vnd.docker.distribution.manifest.v2+json',
 | |
| ])
 | |
| def test_unsupported_manifest_content_type(content_type, manifest_protocol, basic_images,
 | |
|                                            liveserver_session, app_reloader):
 | |
|   """ Test: Attempt to push a manifest with an unsupported media type. """
 | |
|   credentials = ('devtable', 'password')
 | |
| 
 | |
|   options = ProtocolOptions()
 | |
|   options.manifest_content_type = content_type
 | |
| 
 | |
|   # Attempt to push a new repository.
 | |
|   manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images,
 | |
|                          credentials=credentials,
 | |
|                          options=options,
 | |
|                          expected_failure=Failures.UNSUPPORTED_CONTENT_TYPE)
 | |
| 
 | |
| 
 | |
| @pytest.mark.parametrize('accept_mimetypes', [
 | |
|   [('application/vnd.oci.image.manifest.v1+json', 1)],
 | |
|   [('application/vnd.docker.distribution.manifest.v2+json', 1),
 | |
|    ('application/vnd.docker.distribution.manifest.list.v2+json', 1)],
 | |
|   [('application/vnd.foo.bar', 1)],
 | |
| ])
 | |
| def test_unsupported_manifest_accept_headers(accept_mimetypes, manifest_protocol, basic_images,
 | |
|                                              liveserver_session, app_reloader):
 | |
|   """ Test: Attempt to push a manifest with an unsupported accept headers. """
 | |
|   credentials = ('devtable', 'password')
 | |
| 
 | |
|   options = ProtocolOptions()
 | |
|   options.manifest_content_type = DOCKER_SCHEMA1_MANIFEST_CONTENT_TYPE
 | |
|   options.accept_mimetypes = str(Accept(accept_mimetypes))
 | |
| 
 | |
|   # Attempt to push a new repository.
 | |
|   manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images,
 | |
|                          credentials=credentials,
 | |
|                          options=options,
 | |
|                          expected_failure=Failures.UNSUPPORTED_CONTENT_TYPE)
 | |
| 
 | |
| 
 | |
| def test_invalid_blob_reference(manifest_protocol, basic_images, liveserver_session, app_reloader):
 | |
|   """ Test: Attempt to push a manifest with an invalid blob reference. """
 | |
|   credentials = ('devtable', 'password')
 | |
| 
 | |
|   options = ProtocolOptions()
 | |
|   options.manifest_invalid_blob_references = True
 | |
| 
 | |
|   # Attempt to push a new repository.
 | |
|   manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images,
 | |
|                          credentials=credentials,
 | |
|                          options=options,
 | |
|                          expected_failure=Failures.INVALID_BLOB)
 | |
| 
 | |
| 
 | |
| def test_delete_tag(pusher, puller, basic_images, liveserver_session,
 | |
|                     app_reloader):
 | |
|   """ Test: Push a repository, delete a tag, and attempt to pull. """
 | |
|   credentials = ('devtable', 'password')
 | |
| 
 | |
|   # Push the tags.
 | |
|   result = pusher.push(liveserver_session, 'devtable', 'newrepo', ['one', 'two'],
 | |
|                        basic_images, credentials=credentials)
 | |
| 
 | |
|   # Delete tag `one` by digest or tag.
 | |
|   pusher.delete(liveserver_session, 'devtable', 'newrepo',
 | |
|                 result.manifests['one'].digest if result.manifests else 'one',
 | |
|                 credentials=credentials)
 | |
| 
 | |
|   # Attempt to pull tag `one` and ensure it doesn't work.
 | |
|   puller.pull(liveserver_session, 'devtable', 'newrepo', 'one', basic_images,
 | |
|               credentials=credentials,
 | |
|               expected_failure=Failures.UNKNOWN_TAG)
 | |
| 
 | |
|   # Pull tag `two` to verify it works.
 | |
|   puller.pull(liveserver_session, 'devtable', 'newrepo', 'two', basic_images,
 | |
|               credentials=credentials)
 | |
| 
 | |
| 
 | |
| def test_cancel_upload(manifest_protocol, basic_images, liveserver_session, app_reloader):
 | |
|   """ Test: Cancelation of blob uploads. """
 | |
|   credentials = ('devtable', 'password')
 | |
| 
 | |
|   options = ProtocolOptions()
 | |
|   options.cancel_blob_upload = True
 | |
| 
 | |
|   manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest',
 | |
|                          basic_images, credentials=credentials)
 | |
| 
 | |
| 
 | |
| def test_blob_caching(manifest_protocol, basic_images, liveserver_session, app_reloader,
 | |
|                       liveserver, registry_server_executor):
 | |
|   """ Test: Pulling of blobs after initially pulled will result in the blobs being cached. """
 | |
|   credentials = ('devtable', 'password')
 | |
| 
 | |
|   # Push a tag.
 | |
|   result = manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest',
 | |
|                                   basic_images, credentials=credentials)
 | |
| 
 | |
| 
 | |
|   # Conduct the initial pull to prime the cache.
 | |
|   manifest_protocol.pull(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images,
 | |
|                          credentials=credentials)
 | |
| 
 | |
|   # Disconnect the server from the database.
 | |
|   registry_server_executor.on(liveserver).break_database()
 | |
| 
 | |
|   # Pull each blob, which should succeed due to caching. If caching is broken, this will
 | |
|   # fail when it attempts to hit the database.
 | |
|   for layer in result.manifests['latest'].layers:
 | |
|     blob_id = str(layer.digest)
 | |
|     r = liveserver_session.get('/v2/devtable/newrepo/blobs/%s' % blob_id, headers=result.headers)
 | |
|     assert r.status_code == 200
 | |
| 
 | |
| 
 | |
| @pytest.mark.parametrize('chunks', [
 | |
|   # Two chunks.
 | |
|   [(0, 100), (100, None)],
 | |
| 
 | |
|   # Multiple chunks.
 | |
|   [(0, 10), (10, 20), (20, None)],
 | |
|   [(0, 10), (10, 20), (20, 30), (30, 40), (40, 50), (50, None)],
 | |
| 
 | |
|   # Overlapping chunks.
 | |
|   [(0, 1024), (10, None)],
 | |
| ])
 | |
| def test_chunked_blob_uploading(chunks, random_layer_data, manifest_protocol, puller,
 | |
|                                 liveserver_session, app_reloader):
 | |
|   """ Test: Uploading of blobs as chunks. """
 | |
|   credentials = ('devtable', 'password')
 | |
| 
 | |
|   adjusted_chunks = []
 | |
|   for start_byte, end_byte in chunks:
 | |
|     adjusted_chunks.append((start_byte, end_byte if end_byte else len(random_layer_data)))
 | |
| 
 | |
|   images = [
 | |
|     Image(id='theimage', parent_id=None, bytes=random_layer_data),
 | |
|   ]
 | |
| 
 | |
|   options = ProtocolOptions()
 | |
|   options.chunks_for_upload = adjusted_chunks
 | |
| 
 | |
|   # Push the image, using the specified chunking.
 | |
|   manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest',
 | |
|                          images, credentials=credentials, options=options)
 | |
| 
 | |
|   # Pull to verify the image was created.
 | |
|   puller.pull(liveserver_session, 'devtable', 'newrepo', 'latest', images,
 | |
|               credentials=credentials)
 | |
| 
 | |
| 
 | |
| def test_chunked_uploading_mismatched_chunks(manifest_protocol, random_layer_data,
 | |
|                                              liveserver_session, app_reloader):
 | |
|   """ Test: Attempt to upload chunks with data missing. """
 | |
|   credentials = ('devtable', 'password')
 | |
| 
 | |
|   images = [
 | |
|     Image(id='theimage', parent_id=None, bytes=random_layer_data),
 | |
|   ]
 | |
| 
 | |
|   # Note: Byte #100 is missing.
 | |
|   options = ProtocolOptions()
 | |
|   options.chunks_for_upload = [(0, 100), (101, len(random_layer_data), 416)]
 | |
| 
 | |
|   # Attempt to push, with the chunked upload failing.
 | |
|   manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest',
 | |
|                          images, credentials=credentials, options=options)
 | |
| 
 | |
| 
 | |
| def test_pull_disabled_namespace(pusher, puller, basic_images, liveserver_session,
 | |
|                                  app_reloader, liveserver, registry_server_executor):
 | |
|   """ Test: Attempt to pull a repository from a disabled namespace results in an error. """
 | |
|   credentials = ('devtable', 'password')
 | |
| 
 | |
|   # Push a new repository.
 | |
|   pusher.push(liveserver_session, 'buynlarge', 'someneworgrepo', 'latest', basic_images,
 | |
|               credentials=credentials)
 | |
| 
 | |
|   # Disable the namespace.
 | |
|   registry_server_executor.on(liveserver).disable_namespace('buynlarge')
 | |
| 
 | |
|   # Attempt to pull, which should fail.
 | |
|   puller.pull(liveserver_session, 'buynlarge', 'someneworgrepo', 'latest', basic_images,
 | |
|               credentials=credentials, expected_failure=Failures.NAMESPACE_DISABLED)
 | |
| 
 | |
| 
 | |
| def test_push_disabled_namespace(pusher, basic_images, liveserver_session,
 | |
|                                  app_reloader, liveserver, registry_server_executor):
 | |
|   """ Test: Attempt to push a repository from a disabled namespace results in an error. """
 | |
|   credentials = ('devtable', 'password')
 | |
| 
 | |
|   # Disable the namespace.
 | |
|   registry_server_executor.on(liveserver).disable_namespace('buynlarge')
 | |
| 
 | |
|   # Attempt to push, which should fail.
 | |
|   pusher.push(liveserver_session, 'buynlarge', 'someneworgrepo', 'latest', basic_images,
 | |
|               credentials=credentials, expected_failure=Failures.NAMESPACE_DISABLED)
 | |
| 
 | |
| 
 | |
| def test_private_catalog_no_access(v2_protocol, liveserver_session, app_reloader, liveserver,
 | |
|                                    registry_server_executor):
 | |
|   """ Test: Ensure that accessing a private catalog with anonymous access results in no database
 | |
|       connections.
 | |
|   """
 | |
|   with FeatureFlagValue('PUBLIC_CATALOG', False, registry_server_executor.on(liveserver)):
 | |
|     # Disconnect the server from the database.
 | |
|     registry_server_executor.on(liveserver).break_database()
 | |
| 
 | |
|     results = v2_protocol.catalog(liveserver_session)
 | |
|     assert not results
 | |
| 
 | |
| 
 | |
| @pytest.mark.parametrize('public_catalog, credentials, expected_repos', [
 | |
|   # No public access and no credentials => No results.
 | |
|   (False, None, None),
 | |
| 
 | |
|   # Public access and no credentials => public repositories.
 | |
|   (True, None, ['public/publicrepo']),
 | |
| 
 | |
|   # Private creds => private repositories.
 | |
|   (False, ('devtable', 'password'), ['devtable/simple', 'devtable/complex', 'devtable/gargantuan']),
 | |
|   (True, ('devtable', 'password'), ['devtable/simple', 'devtable/complex', 'devtable/gargantuan']),
 | |
| ])
 | |
| @pytest.mark.parametrize('page_size', [
 | |
|   1,
 | |
|   2,
 | |
|   10,
 | |
|   50,
 | |
|   100,
 | |
| ])
 | |
| def test_catalog(public_catalog, credentials, expected_repos, page_size, v2_protocol,
 | |
|                  liveserver_session, app_reloader, liveserver, registry_server_executor):
 | |
|   """ Test: Retrieving results from the V2 catalog. """
 | |
|   with FeatureFlagValue('PUBLIC_CATALOG', public_catalog, registry_server_executor.on(liveserver)):
 | |
|     results = v2_protocol.catalog(liveserver_session, page_size=page_size, credentials=credentials,
 | |
|                                   namespace='devtable', repo_name='simple')
 | |
| 
 | |
|   if expected_repos is None:
 | |
|     assert len(results) == 0
 | |
|   else:
 | |
|     assert set(expected_repos).issubset(set(results))
 | |
| 
 | |
| 
 | |
| def test_catalog_caching(v2_protocol, basic_images, liveserver_session, app_reloader,
 | |
|                          liveserver, registry_server_executor):
 | |
|   """ Test: Calling the catalog after initially pulled will result in the catalog being cached. """
 | |
|   credentials = ('devtable', 'password')
 | |
| 
 | |
|   # Conduct the initial catalog call to prime the cache.
 | |
|   results = v2_protocol.catalog(liveserver_session, credentials=credentials,
 | |
|                                 namespace='devtable', repo_name='simple')
 | |
| 
 | |
|   token, _ = v2_protocol.auth(liveserver_session, credentials, 'devtable', 'simple')
 | |
| 
 | |
|   # Disconnect the server from the database.
 | |
|   registry_server_executor.on(liveserver).break_database()
 | |
| 
 | |
|   # Call the catalog again, which should now be cached.
 | |
|   cached_results = v2_protocol.catalog(liveserver_session, bearer_token=token)
 | |
|   assert len(cached_results) == len(results)
 | |
|   assert set(cached_results) == set(results)
 | |
| 
 | |
| 
 | |
| def test_catalog_disabled_namespace(v2_protocol, basic_images, liveserver_session, app_reloader,
 | |
|                                     liveserver, registry_server_executor):
 | |
|   credentials = ('devtable', 'password')
 | |
| 
 | |
|   # Get a valid token.
 | |
|   token, _ = v2_protocol.auth(liveserver_session, credentials, 'devtable', 'simple')
 | |
| 
 | |
|   # Disable the devtable namespace.
 | |
|   registry_server_executor.on(liveserver).disable_namespace('devtable')
 | |
| 
 | |
|   # Try to retrieve the catalog and ensure it fails to return any results.
 | |
|   results = v2_protocol.catalog(liveserver_session, bearer_token=token)
 | |
|   assert len(results) == 0
 | |
| 
 | |
| 
 | |
| @pytest.mark.parametrize('username, namespace, repository', [
 | |
|   ('devtable', 'devtable', 'simple'),
 | |
|   ('devtable', 'devtable', 'gargantuan'),
 | |
|   ('public', 'public', 'publicrepo'),
 | |
|   ('devtable', 'buynlarge', 'orgrepo'),
 | |
| ])
 | |
| @pytest.mark.parametrize('page_size', [
 | |
|   1,
 | |
|   2,
 | |
|   10,
 | |
|   50,
 | |
|   100,
 | |
| ])
 | |
| def test_tags(username, namespace, repository, page_size, v2_protocol, liveserver_session,
 | |
|               app_reloader, liveserver, registry_server_executor):
 | |
|   """ Test: Retrieving results from the V2 catalog. """
 | |
|   credentials = (username, 'password')
 | |
|   results = v2_protocol.tags(liveserver_session, page_size=page_size, credentials=credentials,
 | |
|                              namespace=namespace, repo_name=repository)
 | |
| 
 | |
|   expected_tags = [tag.name for tag in list_repository_tags(namespace, repository)]
 | |
|   assert len(results) == len(expected_tags)
 | |
|   assert set([r for r in results]) == set(expected_tags)
 | |
| 
 | |
| 
 | |
| def test_tags_disabled_namespace(v2_protocol, basic_images, liveserver_session, app_reloader,
 | |
|                                  liveserver, registry_server_executor):
 | |
|   credentials = ('devtable', 'password')
 | |
| 
 | |
|   # Disable the buynlarge namespace.
 | |
|   registry_server_executor.on(liveserver).disable_namespace('buynlarge')
 | |
| 
 | |
|   # Try to retrieve the tags and ensure it fails.
 | |
|   v2_protocol.tags(liveserver_session, credentials=credentials, namespace='buynlarge',
 | |
|                    repo_name='orgrepo', expected_failure=Failures.NAMESPACE_DISABLED)
 | |
| 
 | |
| 
 | |
| def test_pull_torrent(pusher, basic_images, liveserver_session, liveserver,
 | |
|                       registry_server_executor, app_reloader):
 | |
|   """ Test: Retrieve a torrent for pulling the image via the Quay CLI. """
 | |
|   credentials = ('devtable', 'password')
 | |
| 
 | |
|   # Push an image to download.
 | |
|   pusher.push(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images,
 | |
|               credentials=credentials)
 | |
| 
 | |
|   # Required for torrent.
 | |
|   registry_server_executor.on(liveserver).set_supports_direct_download(True)
 | |
| 
 | |
|   # For each layer, retrieve a torrent for the blob.
 | |
|   for image in basic_images:
 | |
|     digest = 'sha256:' + hashlib.sha256(image.bytes).hexdigest()
 | |
|     response = liveserver_session.get('/c1/torrent/devtable/newrepo/blobs/%s' % digest,
 | |
|                                       auth=credentials)
 | |
|     torrent_info = bencode.bdecode(response.content)
 | |
| 
 | |
|     # Check the announce URL.
 | |
|     assert torrent_info['url-list'] == 'http://somefakeurl?goes=here'
 | |
| 
 | |
|     # Check the metadata.
 | |
|     assert torrent_info.get('info', {}).get('pieces') is not None
 | |
|     assert torrent_info.get('announce') is not None
 | |
| 
 | |
|     # Check the pieces.
 | |
|     sha = resumablehashlib.sha1()
 | |
|     sha.update(image.bytes)
 | |
| 
 | |
|     expected = binascii.hexlify(sha.digest())
 | |
|     found = binascii.hexlify(torrent_info['info']['pieces'])
 | |
|     assert expected == found
 | |
| 
 | |
| 
 | |
| def test_squashed_image_disabled_namespace(pusher, sized_images, liveserver_session,
 | |
|                                            liveserver, registry_server_executor, app_reloader):
 | |
|   """ Test: Attempting to pull a squashed image from a disabled namespace. """
 | |
|   credentials = ('devtable', 'password')
 | |
| 
 | |
|   # Push an image to download.
 | |
|   pusher.push(liveserver_session, 'buynlarge', 'newrepo', 'latest', sized_images,
 | |
|               credentials=credentials)
 | |
| 
 | |
|   # Disable the buynlarge namespace.
 | |
|   registry_server_executor.on(liveserver).disable_namespace('buynlarge')
 | |
| 
 | |
|   # Attempt to pull the squashed version.
 | |
|   response = liveserver_session.get('/c1/squash/buynlarge/newrepo/latest', auth=credentials)
 | |
|   assert response.status_code == 400
 | |
| 
 | |
| 
 | |
| def test_squashed_image_disabled_user(pusher, sized_images, liveserver_session,
 | |
|                                       liveserver, registry_server_executor, app_reloader):
 | |
|   """ Test: Attempting to pull a squashed image via a disabled user. """
 | |
|   credentials = ('devtable', 'password')
 | |
| 
 | |
|   # Push an image to download.
 | |
|   pusher.push(liveserver_session, 'buynlarge', 'newrepo', 'latest', sized_images,
 | |
|               credentials=credentials)
 | |
| 
 | |
|   # Disable the devtable namespace.
 | |
|   registry_server_executor.on(liveserver).disable_namespace('devtable')
 | |
| 
 | |
|   # Attempt to pull the squashed version.
 | |
|   response = liveserver_session.get('/c1/squash/buynlarge/newrepo/latest', auth=credentials)
 | |
|   assert response.status_code == 403
 | |
| 
 | |
| 
 | |
| @pytest.mark.parametrize('use_estimates', [
 | |
|   False,
 | |
|   True,
 | |
| ])
 | |
| def test_multilayer_squashed_images(use_estimates, pusher, multi_layer_images, liveserver_session,
 | |
|                                     liveserver, registry_server_executor, app_reloader):
 | |
|   """ Test: Pulling of multilayer, complex squashed images. """
 | |
|   credentials = ('devtable', 'password')
 | |
| 
 | |
|   # Push an image to download.
 | |
|   pusher.push(liveserver_session, 'devtable', 'newrepo', 'latest', multi_layer_images,
 | |
|               credentials=credentials)
 | |
| 
 | |
|   if use_estimates:
 | |
|     # Clear the uncompressed size stored for the images, to ensure that we estimate instead.
 | |
|     for image in multi_layer_images:
 | |
|       registry_server_executor.on(liveserver).clear_uncompressed_size(image.id)
 | |
| 
 | |
|   # Pull the squashed version.
 | |
|   response = liveserver_session.get('/c1/squash/devtable/newrepo/latest', auth=credentials)
 | |
|   assert response.status_code == 200
 | |
| 
 | |
|   tar = tarfile.open(fileobj=StringIO(response.content))
 | |
| 
 | |
|   # Verify the squashed image.
 | |
|   expected_image_id = 'cdc6d6c0d07d2cbacfc579e49ce0c256c5084b9b2b16c1b1b0c45f26a12a4ba5'
 | |
|   expected_names = ['repositories',
 | |
|                     expected_image_id,
 | |
|                     '%s/json' % expected_image_id,
 | |
|                     '%s/VERSION' % expected_image_id,
 | |
|                     '%s/layer.tar' % expected_image_id]
 | |
| 
 | |
|   assert tar.getnames() == expected_names
 | |
| 
 | |
|   # Verify the JSON image data.
 | |
|   json_data = (tar.extractfile(tar.getmember('%s/json' % expected_image_id)).read())
 | |
| 
 | |
|   # Ensure the JSON loads and parses.
 | |
|   result = json.loads(json_data)
 | |
|   assert result['id'] == expected_image_id
 | |
|   assert result['config']['internal_id'] == 'layer5'
 | |
| 
 | |
|   # Ensure that squashed layer tar can be opened.
 | |
|   tar = tarfile.open(fileobj=tar.extractfile(tar.getmember('%s/layer.tar' % expected_image_id)))
 | |
|   assert set(tar.getnames()) == {'contents', 'file1', 'file2', 'file3', 'file4'}
 | |
| 
 | |
|   # Check the contents of various files.
 | |
|   assert tar.extractfile('contents').read() == 'layer 5 contents'
 | |
|   assert tar.extractfile('file1').read() == 'from-layer-3'
 | |
|   assert tar.extractfile('file2').read() == 'from-layer-2'
 | |
|   assert tar.extractfile('file3').read() == 'from-layer-4'
 | |
|   assert tar.extractfile('file4').read() == 'from-layer-5'
 | |
| 
 | |
| 
 | |
| @pytest.mark.parametrize('use_estimates', [
 | |
|   False,
 | |
|   True,
 | |
| ])
 | |
| def test_squashed_images(use_estimates, pusher, sized_images, liveserver_session,
 | |
|                          liveserver, registry_server_executor, app_reloader):
 | |
|   """ Test: Pulling of squashed images. """
 | |
|   credentials = ('devtable', 'password')
 | |
| 
 | |
|   # Push an image to download.
 | |
|   pusher.push(liveserver_session, 'devtable', 'newrepo', 'latest', sized_images,
 | |
|               credentials=credentials)
 | |
| 
 | |
|   if use_estimates:
 | |
|     # Clear the uncompressed size stored for the images, to ensure that we estimate instead.
 | |
|     for image in sized_images:
 | |
|       registry_server_executor.on(liveserver).clear_uncompressed_size(image.id)
 | |
| 
 | |
|   # Pull the squashed version.
 | |
|   response = liveserver_session.get('/c1/squash/devtable/newrepo/latest', auth=credentials)
 | |
|   assert response.status_code == 200
 | |
| 
 | |
|   tar = tarfile.open(fileobj=StringIO(response.content))
 | |
| 
 | |
|   # Verify the squashed image.
 | |
|   expected_image_id = 'cdc6d6c0d07d2cbacfc579e49ce0c256c5084b9b2b16c1b1b0c45f26a12a4ba5'
 | |
|   expected_names = ['repositories',
 | |
|                     expected_image_id,
 | |
|                     '%s/json' % expected_image_id,
 | |
|                     '%s/VERSION' % expected_image_id,
 | |
|                     '%s/layer.tar' % expected_image_id]
 | |
| 
 | |
|   assert tar.getnames() == expected_names
 | |
| 
 | |
|   # Verify the JSON image data.
 | |
|   json_data = (tar.extractfile(tar.getmember('%s/json' % expected_image_id)).read())
 | |
| 
 | |
|   # Ensure the JSON loads and parses.
 | |
|   result = json.loads(json_data)
 | |
|   assert result['id'] == expected_image_id
 | |
|   assert result['config']['foo'] == 'childbar'
 | |
| 
 | |
|   # Ensure that squashed layer tar can be opened.
 | |
|   tar = tarfile.open(fileobj=tar.extractfile(tar.getmember('%s/layer.tar' % expected_image_id)))
 | |
|   assert tar.getnames() == ['contents']
 | |
| 
 | |
|   # Check the contents.
 | |
|   assert tar.extractfile('contents').read() == 'some contents'
 | |
| 
 | |
| 
 | |
| EXPECTED_ACI_MANIFEST = {
 | |
|   "acKind": "ImageManifest",
 | |
|   "app": {
 | |
|     "environment": [],
 | |
|     "mountPoints": [],
 | |
|     "group": "root",
 | |
|     "user": "root",
 | |
|     "workingDirectory": "/",
 | |
|     "exec": [u'/bin/sh', u'-c', u'""hello""'],
 | |
|     "isolators": [],
 | |
|     "eventHandlers": [],
 | |
|     "ports": [],
 | |
|     "annotations": [
 | |
|       {"name": "created", "value": ""},
 | |
|       {"name": "homepage", "value": "http://localhost:5000/devtable/newrepo:latest"},
 | |
|       {"name": "quay.io/derived-image",
 | |
|        "value": "035333848582cdb72d2bac4a0809bc7eed9d88004cfb3463562013fce53c2499"},
 | |
|     ]
 | |
|   },
 | |
|   "labels": [
 | |
|     {"name": "version", "value": "latest"},
 | |
|     {"name": "arch", "value": "amd64"},
 | |
|     {"name": "os", "value": "linux"}
 | |
|   ],
 | |
|   "acVersion": "0.6.1",
 | |
|   "name": "localhost/devtable/newrepo",
 | |
| }
 | |
| 
 | |
| 
 | |
| def test_aci_conversion(pusher, sized_images, liveserver_session,
 | |
|                         liveserver, registry_server_executor, app_reloader):
 | |
|   """ Test: Pulling of ACI converted images. """
 | |
|   credentials = ('devtable', 'password')
 | |
| 
 | |
|   # Push an image to download.
 | |
|   pusher.push(liveserver_session, 'devtable', 'newrepo', 'latest', sized_images,
 | |
|               credentials=credentials)
 | |
| 
 | |
|   # Pull the ACI version.
 | |
|   response = liveserver_session.get('/c1/aci/server_name/devtable/newrepo/latest/aci/linux/amd64',
 | |
|                                     auth=credentials)
 | |
|   assert response.status_code == 200
 | |
|   tar = tarfile.open(fileobj=StringIO(response.content))
 | |
|   assert set(tar.getnames()) == {'manifest', 'rootfs', 'rootfs/contents'}
 | |
| 
 | |
|   assert tar.extractfile('rootfs/contents').read() == 'some contents'
 | |
|   assert json.loads(tar.extractfile('manifest').read()) == EXPECTED_ACI_MANIFEST
 | |
| 
 | |
|   # Pull the ACI signature.
 | |
|   response = liveserver_session.get('/c1/aci/server_name/devtable/newrepo/latest/aci.asc/linux/amd64',
 | |
|                                     auth=credentials)
 | |
|   assert response.status_code == 200
 | |
| 
 | |
| 
 | |
| @pytest.mark.parametrize('push_user, push_namespace, push_repo, mount_repo_name, expected_failure', [
 | |
|   # Successful mount, same namespace.
 | |
|   ('devtable', 'devtable', 'baserepo', 'devtable/baserepo', None),
 | |
| 
 | |
|   # Successful mount, cross namespace.
 | |
|   ('devtable', 'buynlarge', 'baserepo', 'buynlarge/baserepo', None),
 | |
| 
 | |
|   # Unsuccessful mount, unknown repo.
 | |
|   ('devtable', 'devtable', 'baserepo', 'unknown/repohere', Failures.UNAUTHORIZED_FOR_MOUNT),
 | |
| 
 | |
|   # Unsuccessful mount, no access.
 | |
|   ('public', 'public', 'baserepo', 'public/baserepo', Failures.UNAUTHORIZED_FOR_MOUNT),
 | |
| ])
 | |
| def test_blob_mounting(push_user, push_namespace, push_repo, mount_repo_name, expected_failure,
 | |
|                        manifest_protocol, pusher, puller, basic_images, liveserver_session,
 | |
|                        app_reloader):
 | |
|   # Push an image so we can attempt to mount it.
 | |
|   pusher.push(liveserver_session, push_namespace, push_repo, 'latest', basic_images,
 | |
|               credentials=(push_user, 'password'))
 | |
| 
 | |
|   # Push again, trying to mount the image layer(s) from the mount repo.
 | |
|   options = ProtocolOptions()
 | |
|   options.scopes = ['repository:devtable/newrepo:push,pull',
 | |
|                     'repository:%s:pull' % (mount_repo_name)]
 | |
|   options.mount_blobs = {image.id: mount_repo_name for image in basic_images}
 | |
| 
 | |
|   manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images,
 | |
|                          credentials=('devtable', 'password'),
 | |
|                          options=options,
 | |
|                          expected_failure=expected_failure)
 | |
| 
 | |
|   if expected_failure is None:
 | |
|     # Pull to ensure it worked.
 | |
|     puller.pull(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images,
 | |
|                 credentials=('devtable', 'password'))
 | |
| 
 | |
| 
 | |
| def get_robot_password(api_caller):
 | |
|   api_caller.conduct_auth('devtable', 'password')
 | |
|   resp = api_caller.get('/api/v1/organization/buynlarge/robots/ownerbot')
 | |
|   return resp.json()['token']
 | |
| 
 | |
| 
 | |
| def get_encrypted_password(api_caller):
 | |
|   api_caller.conduct_auth('devtable', 'password')
 | |
|   resp = api_caller.post('/api/v1/user/clientkey', data=json.dumps(dict(password='password')),
 | |
|                          headers={'Content-Type': 'application/json'})
 | |
|   return resp.json()['key']
 | |
| 
 | |
| 
 | |
| @pytest.mark.parametrize('username, password, expect_success', [
 | |
|   # Invalid username.
 | |
|   ('invaliduser', 'somepassword', False),
 | |
| 
 | |
|   # Invalid password.
 | |
|   ('devtable', 'invalidpassword', False),
 | |
| 
 | |
|   # Invalid OAuth token.
 | |
|   ('$oauthtoken', 'unknown', False),
 | |
| 
 | |
|   # Invalid CLI token.
 | |
|   ('$app', 'invalid', False),
 | |
| 
 | |
|   # Valid.
 | |
|   ('devtable', 'password', True),
 | |
| 
 | |
|   # Robot.
 | |
|   ('buynlarge+ownerbot', get_robot_password, True),
 | |
| 
 | |
|   # Encrypted password.
 | |
|   ('devtable', get_encrypted_password, True),
 | |
| 
 | |
|   # OAuth.
 | |
|   ('$oauthtoken', 'test', True),
 | |
| 
 | |
|   # CLI Token.
 | |
|   ('$app', 'test', True),
 | |
| ])
 | |
| def test_login(username, password, expect_success, loginer, liveserver_session,
 | |
|                api_caller, app_reloader):
 | |
|   """ Test: Login flow. """
 | |
|   if not isinstance(password, str):
 | |
|     password = password(api_caller)
 | |
| 
 | |
|   loginer.login(liveserver_session, username, password, [], expect_success)
 | |
| 
 | |
| 
 | |
| @pytest.mark.parametrize('username, password, scopes, expected_access, expect_success', [
 | |
|   # No scopes.
 | |
|   ('devtable', 'password', [], [], True),
 | |
| 
 | |
|   # Basic pull.
 | |
|   ('devtable', 'password', ['repository:devtable/simple:pull'], [
 | |
|     {'type': 'repository', 'name': 'devtable/simple', 'actions': ['pull']},
 | |
|   ], True),
 | |
| 
 | |
|   # Basic push.
 | |
|   ('devtable', 'password', ['repository:devtable/simple:push'], [
 | |
|     {'type': 'repository', 'name': 'devtable/simple', 'actions': ['push']},
 | |
|   ], True),
 | |
| 
 | |
|   # Basic push/pull.
 | |
|   ('devtable', 'password', ['repository:devtable/simple:push,pull'], [
 | |
|     {'type': 'repository', 'name': 'devtable/simple', 'actions': ['push', 'pull']},
 | |
|   ], True),
 | |
| 
 | |
|   # Admin.
 | |
|   ('devtable', 'password', ['repository:devtable/simple:push,pull,*'], [
 | |
|     {'type': 'repository', 'name': 'devtable/simple', 'actions': ['push', 'pull', '*']},
 | |
|   ], True),
 | |
| 
 | |
|   # Basic pull with endpoint.
 | |
|   ('devtable', 'password', ['repository:localhost:5000/devtable/simple:pull'], [
 | |
|     {'type': 'repository', 'name': 'localhost:5000/devtable/simple', 'actions': ['pull']},
 | |
|   ], True),
 | |
| 
 | |
|   # Basic pull with invalid endpoint.
 | |
|   ('devtable', 'password', ['repository:someinvalid/devtable/simple:pull'], [], False),
 | |
| 
 | |
|   # Pull with no access.
 | |
|   ('public', 'password', ['repository:devtable/simple:pull'], [
 | |
|     {'type': 'repository', 'name': 'devtable/simple', 'actions': []},
 | |
|   ], True),
 | |
| 
 | |
|   # Anonymous push and pull on a private repository.
 | |
|   ('', '', ['repository:devtable/simple:pull,push'], [
 | |
|     {'type': 'repository', 'name': 'devtable/simple', 'actions': []},
 | |
|   ], True),
 | |
| 
 | |
|   # Pull and push with no push access.
 | |
|   ('reader', 'password', ['repository:buynlarge/orgrepo:pull,push'], [
 | |
|     {'type': 'repository', 'name': 'buynlarge/orgrepo', 'actions': ['pull']},
 | |
|   ], True),
 | |
| 
 | |
|   # OAuth.
 | |
|   ('$oauthtoken', 'test', ['repository:public/publicrepo:pull,push'], [
 | |
|     {'type': 'repository', 'name': 'public/publicrepo', 'actions': ['pull']},
 | |
|   ], True),
 | |
| 
 | |
|   # Anonymous public repo.
 | |
|   ('', '', ['repository:public/publicrepo:pull,push'], [
 | |
|     {'type': 'repository', 'name': 'public/publicrepo', 'actions': ['pull']},
 | |
|   ], True),
 | |
| 
 | |
|   # Multiple scopes.
 | |
|   ('devtable', 'password',
 | |
|    ['repository:devtable/simple:push,pull,*', 'repository:buynlarge/orgrepo:pull'], [
 | |
|      {'type': 'repository', 'name': 'devtable/simple', 'actions': ['push', 'pull', '*']},
 | |
|      {'type': 'repository', 'name': 'buynlarge/orgrepo', 'actions': ['pull']},
 | |
|    ], True),
 | |
| 
 | |
|   # Multiple scopes.
 | |
|   ('devtable', 'password',
 | |
|    ['repository:devtable/simple:push,pull,*', 'repository:public/publicrepo:push,pull'], [
 | |
|      {'type': 'repository', 'name': 'devtable/simple', 'actions': ['push', 'pull', '*']},
 | |
|      {'type': 'repository', 'name': 'public/publicrepo', 'actions': ['pull']},
 | |
|    ], True),
 | |
| ])
 | |
| def test_login_scopes(username, password, scopes, expected_access, expect_success, v2_protocol,
 | |
|                       liveserver_session, api_caller, app_reloader):
 | |
|   """ Test: Login via the V2 auth protocol reacts correctly to requested scopes. """
 | |
|   if not isinstance(password, str):
 | |
|     password = password(api_caller)
 | |
| 
 | |
|   response = v2_protocol.login(liveserver_session, username, password, scopes, expect_success)
 | |
|   if not expect_success:
 | |
|     return
 | |
| 
 | |
|   # Validate the returned token.
 | |
|   encoded = response.json()['token']
 | |
|   payload = decode_bearer_header('Bearer ' + encoded, instance_keys,
 | |
|                                  {'SERVER_HOSTNAME': 'localhost:5000'})
 | |
|   assert payload is not None
 | |
|   assert payload['access'] == expected_access
 | |
| 
 | |
| 
 | |
| def test_push_pull_same_blobs(pusher, puller, liveserver_session, app_reloader):
 | |
|   """ Test: Push and pull of an image to a new repository where a blob is shared between layers. """
 | |
|   credentials = ('devtable', 'password')
 | |
| 
 | |
|   layer_bytes = layer_bytes_for_contents('some contents')
 | |
|   images = [
 | |
|     Image(id='parentid', bytes=layer_bytes, parent_id=None),
 | |
|     Image(id='someid', bytes=layer_bytes, parent_id='parentid'),
 | |
|   ]
 | |
| 
 | |
|   options = ProtocolOptions()
 | |
|   options.skip_head_checks = True # Since the blob will already exist.
 | |
| 
 | |
|   # Push a new repository.
 | |
|   pusher.push(liveserver_session, 'devtable', 'newrepo', 'latest', images,
 | |
|               credentials=credentials, options=options)
 | |
| 
 | |
|   # Pull the repository to verify.
 | |
|   puller.pull(liveserver_session, 'devtable', 'newrepo', 'latest', images,
 | |
|               credentials=credentials, options=options)
 | |
| 
 | |
| 
 | |
| def test_push_tag_existing_image(v1_protocol, puller, basic_images, liveserver_session, app_reloader):
 | |
|   """ Test: Push a new tag on an existing manifest/image. """
 | |
|   credentials = ('devtable', 'password')
 | |
| 
 | |
|   # Push a new repository.
 | |
|   result = v1_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images,
 | |
|                             credentials=credentials)
 | |
| 
 | |
|   # Push the same image/manifest to another tag in the repository.
 | |
|   v1_protocol.tag(liveserver_session, 'devtable', 'newrepo', 'anothertag', basic_images[-1],
 | |
|                   credentials=credentials)
 | |
| 
 | |
|   # Pull the repository to verify.
 | |
|   puller.pull(liveserver_session, 'devtable', 'newrepo', 'anothertag', basic_images,
 | |
|               credentials=credentials)
 |