import unittest import requests import os import math import random import string import Crypto.Random from flask import request, jsonify from flask.blueprints import Blueprint from flask.ext.testing import LiveServerTestCase from app import app from data.database import close_db_filter, configure from endpoints.v1 import v1_bp from endpoints.v2 import v2_bp from endpoints.verbs import verbs from endpoints.v2.manifest import SignedManifestBuilder from endpoints.api import api_bp from initdb import wipe_database, initialize_database, populate_database from endpoints.csrf import generate_csrf_token from tempfile import NamedTemporaryFile from jsonschema import validate as validate_schema import endpoints.decorated import json import features import hashlib import logging import tarfile import shutil from jwkest.jws import SIGNER_ALGS from jwkest.jwk import RSAKey from Crypto.PublicKey import RSA from cStringIO import StringIO from digest.checksums import compute_simple try: app.register_blueprint(v1_bp, url_prefix='/v1') app.register_blueprint(v2_bp, url_prefix='/v2') app.register_blueprint(verbs, url_prefix='/c1') app.register_blueprint(api_bp, url_prefix='/api') except ValueError: # Blueprint was already registered pass # Add a test blueprint for generating CSRF tokens, setting feature flags and reloading the # DB connection. testbp = Blueprint('testbp', __name__) logger = logging.getLogger(__name__) @testbp.route('/csrf', methods=['GET']) def generate_csrf(): return generate_csrf_token() @testbp.route('/feature/', methods=['POST']) def set_feature(feature_name): import features old_value = features._FEATURES[feature_name].value features._FEATURES[feature_name].value = request.get_json()['value'] return jsonify({'old_value': old_value}) @testbp.route('/reloadapp', methods=['POST']) def reload_app(): # Close any existing connection. close_db_filter(None) # Reload the database config. configure(app.config) # Reload random after the process split, as it cannot be used uninitialized across forks. Crypto.Random.atfork() return 'OK' app.register_blueprint(testbp, url_prefix='/__test') class TestFeature(object): """ Helper object which temporarily sets the value of a feature flag. """ def __init__(self, test_case, feature_flag, test_value): self.test_case = test_case self.feature_flag = feature_flag self.test_value = test_value self.old_value = None def __enter__(self): result = self.test_case.conduct('POST', '/__test/feature/' + self.feature_flag, data=json.dumps(dict(value=self.test_value)), headers={'Content-Type': 'application/json'}) result_data = json.loads(result.text) self.old_value = result_data['old_value'] def __exit__(self, type, value, traceback): self.test_case.conduct('POST', '/__test/feature/' + self.feature_flag, data=json.dumps(dict(value=self.old_value)), headers={'Content-Type': 'application/json'}) _PORT_NUMBER = 5001 _CLEAN_DATABASE_PATH = None _JWK = RSAKey(key=RSA.generate(2048)) def get_new_database_uri(): # If a clean copy of the database has not yet been created, create one now. global _CLEAN_DATABASE_PATH if not _CLEAN_DATABASE_PATH: wipe_database() initialize_database() populate_database() close_db_filter(None) # Save the path of the clean database. _CLEAN_DATABASE_PATH = app.config['TEST_DB_FILE'].name # Create a new temp file to be used as the actual backing database for the test. # Note that we have the close() the file to ensure we can copy to it via shutil. local_db_file = NamedTemporaryFile(delete=True) local_db_file.close() # Copy the clean database to the path. shutil.copy2(_CLEAN_DATABASE_PATH, local_db_file.name) return 'sqlite:///{0}'.format(local_db_file.name) class RegistryTestCaseMixin(LiveServerTestCase): def create_app(self): global _PORT_NUMBER _PORT_NUMBER = _PORT_NUMBER + 1 if os.environ.get('DEBUG') == 'true': app.config['DEBUG'] = True app.config['TESTING'] = True app.config['LIVESERVER_PORT'] = _PORT_NUMBER app.config['DB_URI'] = get_new_database_uri() return app def setUp(self): self.clearSession() # Tell the remote running app to reload the database and app. By default, the app forks from the # current context and has already loaded the DB config with the *original* DB URL. We call # the remote reload method to force it to pick up the changes to DB_URI set in the create_app # method. self.conduct('POST', '/__test/reloadapp') def clearSession(self): self.session = requests.Session() self.signature = None self.docker_token = 'true' self.jwt = None # Load the CSRF token. self.csrf_token = '' self.csrf_token = self.conduct('GET', '/__test/csrf').text def do_tag(self, namespace, repository, tag, image_id, expected_code=200): self.conduct('PUT', '/v1/repositories/%s/%s/tags/%s' % (namespace, repository, tag), data='"%s"' % image_id, expected_code=expected_code, auth='sig') def conduct_api_login(self, username, password): self.conduct('POST', '/api/v1/signin', data=json.dumps(dict(username=username, password=password)), headers={'Content-Type': 'application/json'}) def change_repo_visibility(self, repository, namespace, visibility): self.conduct('POST', '/api/v1/repository/%s/%s/changevisibility' % (repository, namespace), data=json.dumps(dict(visibility=visibility)), headers={'Content-Type': 'application/json'}) class BaseRegistryMixin(object): def conduct(self, method, url, headers=None, data=None, auth=None, params=None, expected_code=200, json_data=None): params = params or {} params['_csrf_token'] = self.csrf_token headers = headers or {} auth_tuple = None if self.docker_token: headers['X-Docker-Token'] = self.docker_token if auth == 'sig': if self.signature: headers['Authorization'] = 'token ' + self.signature elif auth == 'jwt': if self.jwt: headers['Authorization'] = 'Bearer ' + self.jwt elif auth: auth_tuple = auth if json_data is not None: data = json.dumps(json_data) headers['Content-Type'] = 'application/json' response = self.session.request(method, self.get_server_url() + url, headers=headers, data=data, auth=auth_tuple, params=params) if response.status_code != expected_code: print response.text if 'www-authenticate' in response.headers: self.signature = response.headers['www-authenticate'] if 'X-Docker-Token' in response.headers: self.docker_token = response.headers['X-Docker-Token'] self.assertEquals(response.status_code, expected_code) return response def _get_default_images(self): return [{'id': 'someid', 'contents': 'somecontent'}] class V1RegistryMixin(BaseRegistryMixin): def v1_ping(self): self.conduct('GET', '/v1/_ping') class V1RegistryPushMixin(V1RegistryMixin): def do_push(self, namespace, repository, username, password, images=None, expected_code=201): images = images or self._get_default_images() auth = (username, password) # Ping! self.v1_ping() # PUT /v1/repositories/{namespace}/{repository}/ self.conduct('PUT', '/v1/repositories/%s/%s' % (namespace, repository), data=json.dumps(images), auth=auth, expected_code=expected_code) if expected_code != 201: return last_image_id = None for image_data in images: image_id = image_data['id'] last_image_id = image_id # PUT /v1/images/{imageID}/json self.conduct('PUT', '/v1/images/%s/json' % image_id, data=json.dumps({'id': image_id}), auth='sig') # PUT /v1/images/{imageID}/layer tar_file_info = tarfile.TarInfo(name='image_name') tar_file_info.type = tarfile.REGTYPE tar_file_info.size = len(image_id) layer_data = StringIO() tar_file = tarfile.open(fileobj=layer_data, mode='w|gz') tar_file.addfile(tar_file_info, StringIO(image_id)) tar_file.close() layer_bytes = layer_data.getvalue() layer_data.close() self.conduct('PUT', '/v1/images/%s/layer' % image_id, data=StringIO(layer_bytes), auth='sig') # PUT /v1/images/{imageID}/checksum checksum = compute_simple(StringIO(layer_bytes), json.dumps({'id': image_id})) self.conduct('PUT', '/v1/images/%s/checksum' % image_id, headers={'X-Docker-Checksum-Payload': checksum}, auth='sig') # PUT /v1/repositories/{namespace}/{repository}/tags/latest self.do_tag(namespace, repository, 'latest', images[0]['id']) # PUT /v1/repositories/{namespace}/{repository}/images self.conduct('PUT', '/v1/repositories/%s/%s/images' % (namespace, repository), expected_code=204, auth='sig') class V1RegistryPullMixin(V1RegistryMixin): def do_pull(self, namespace, repository, username=None, password='password', expected_code=200, images=None): images = images or self._get_default_images() auth = None if username: auth = (username, password) # Ping! self.v1_ping() prefix = '/v1/repositories/%s/%s/' % (namespace, repository) # GET /v1/repositories/{namespace}/{repository}/ self.conduct('GET', prefix + 'images', auth=auth, expected_code=expected_code) if expected_code != 200: return # GET /v1/repositories/{namespace}/{repository}/ result = json.loads(self.conduct('GET', prefix + 'tags', auth='sig').text) self.assertEquals(len(images), len(result.values())) for image_data in images: image_id = image_data['id'] self.assertIn(image_id, result.values()) # /v1/images/{imageID}/{ancestry, json, layer} image_prefix = '/v1/images/%s/' % image_id self.conduct('GET', image_prefix + 'ancestry', auth='sig') self.conduct('GET', image_prefix + 'json', auth='sig') self.conduct('GET', image_prefix + 'layer', auth='sig') class V2RegistryMixin(BaseRegistryMixin): MANIFEST_SCHEMA = { 'type': 'object', 'properties': { 'name': { 'type': 'string', }, 'tag': { 'type': 'string', }, 'signatures': { 'type': 'array', 'itemType': { 'type': 'object', }, }, 'fsLayers': { 'type': 'array', 'itemType': { 'type': 'object', 'properties': { 'blobSum': { 'type': 'string', }, }, 'required': 'blobSum', }, }, 'history': { 'type': 'array', 'itemType': { 'type': 'object', 'properties': { 'v1Compatibility': { 'type': 'object', }, }, 'required': ['v1Compatibility'], }, }, }, 'required': ['name', 'tag', 'fsLayers', 'history', 'signatures'], } def v2_ping(self): response = self.conduct('GET', '/v2/', expected_code=200 if self.jwt else 401, auth='jwt') self.assertEquals(response.headers['Docker-Distribution-API-Version'], 'registry/2.0') def do_auth(self, username, password, namespace, repository, expected_code=200, scopes=[]): auth = (username, password) params = { 'account': username, 'scope': 'repository:%s/%s:%s' % (namespace, repository, ','.join(scopes)), 'service': app.config['SERVER_HOSTNAME'], } response = self.conduct('GET', '/v2/auth', params=params, auth=(username, password), expected_code=expected_code) if expected_code == 200: response_json = json.loads(response.text) self.assertIsNotNone(response_json.get('token')) self.jwt = response_json['token'] return response class V2RegistryPushMixin(V2RegistryMixin): def do_push(self, namespace, repository, username, password, images=None, tag_name=None, cancel=False, invalid=False, expected_manifest_code=202, expected_auth_code=200, scopes=None): images = images or self._get_default_images() # Ping! self.v2_ping() # Auth. self.do_auth(username, password, namespace, repository, scopes=scopes or ['push', 'pull'], expected_code=expected_auth_code) if expected_auth_code != 200: return # Build a fake manifest. tag_name = tag_name or 'latest' builder = SignedManifestBuilder(namespace, repository, tag_name) for image_data in images: checksum = 'sha256:' + hashlib.sha256(image_data['contents']).hexdigest() if invalid: checksum = 'sha256:' + hashlib.sha256('foobarbaz').hexdigest() builder.add_layer(checksum, json.dumps(image_data)) # Build the manifest. manifest = builder.build(_JWK) # Push the image's layers. checksums = {} for image_data in images: image_id = image_data['id'] full_contents = image_data['contents'] chunks = image_data.get('chunks') # Layer data should not yet exist. checksum = 'sha256:' + hashlib.sha256(full_contents).hexdigest() self.conduct('HEAD', '/v2/%s/%s/blobs/%s' % (namespace, repository, checksum), expected_code=404, auth='jwt') # Start a new upload of the layer data. response = self.conduct('POST', '/v2/%s/%s/blobs/uploads/' % (namespace, repository), expected_code=202, auth='jwt') upload_uuid = response.headers['Docker-Upload-UUID'] location = response.headers['Location'][len(self.get_server_url()):] # PATCH the image data into the layer. if chunks is None: self.conduct('PATCH', location, data=full_contents, expected_code=204, auth='jwt') else: for chunk in chunks: if len(chunk) == 3: (start_byte, end_byte, expected_code) = chunk else: (start_byte, end_byte) = chunk expected_code = 204 contents_chunk = full_contents[start_byte:end_byte] self.conduct('PATCH', location, data=contents_chunk, expected_code=expected_code, auth='jwt', headers={'Range': 'bytes=%s-%s' % (start_byte, end_byte)}) if expected_code != 204: return # Retrieve the upload status at each point. status_url = '/v2/%s/%s/blobs/uploads/%s' % (namespace, repository, upload_uuid) response = self.conduct('GET', status_url, expected_code=204, auth='jwt', headers=dict(host=self.get_server_url())) self.assertEquals(response.headers['Docker-Upload-UUID'], upload_uuid) self.assertEquals(response.headers['Range'], "bytes=0-%s" % end_byte) if cancel: self.conduct('DELETE', location, params=dict(digest=checksum), expected_code=204, auth='jwt') # Ensure the upload was canceled. status_url = '/v2/%s/%s/blobs/uploads/%s' % (namespace, repository, upload_uuid) self.conduct('GET', status_url, expected_code=404, auth='jwt', headers=dict(host=self.get_server_url())) return # Finish the layer upload with a PUT. response = self.conduct('PUT', location, params=dict(digest=checksum), expected_code=201, auth='jwt') self.assertEquals(response.headers['Docker-Content-Digest'], checksum) checksums[image_id] = checksum # Ensure the layer exists now. response = self.conduct('HEAD', '/v2/%s/%s/blobs/%s' % (namespace, repository, checksum), expected_code=200, auth='jwt') self.assertEquals(response.headers['Docker-Content-Digest'], checksum) self.assertEquals(response.headers['Content-Length'], str(len(full_contents))) # Write the manifest. put_code = 404 if invalid else expected_manifest_code self.conduct('PUT', '/v2/%s/%s/manifests/%s' % (namespace, repository, tag_name), data=manifest.bytes, expected_code=put_code, headers={'Content-Type': 'application/json'}, auth='jwt') return checksums, manifest.digest class V2RegistryPullMixin(V2RegistryMixin): def do_pull(self, namespace, repository, username=None, password='password', expected_code=200, manifest_id=None, expected_manifest_code=200, images=None): images = images or self._get_default_images() # Ping! self.v2_ping() # Auth. self.do_auth(username, password, namespace, repository, scopes=['pull'], expected_code=expected_code) if expected_code != 200: return # Retrieve the manifest for the tag or digest. manifest_id = manifest_id or 'latest' response = self.conduct('GET', '/v2/%s/%s/manifests/%s' % (namespace, repository, manifest_id), auth='jwt', expected_code=expected_manifest_code) if expected_manifest_code != 200: return manifest_data = json.loads(response.text) # Ensure the manifest returned by us is valid. validate_schema(manifest_data, V2RegistryMixin.MANIFEST_SCHEMA) # Verify the layers. blobs = {} for layer in manifest_data['fsLayers']: blob_id = layer['blobSum'] result = self.conduct('GET', '/v2/%s/%s/blobs/%s' % (namespace, repository, blob_id), expected_code=200, auth='jwt') blobs[blob_id] = result.text # Verify the V1 metadata is present for each expected image. found_v1_layers = set() history = manifest_data['history'] for entry in history: v1_history = json.loads(entry['v1Compatibility']) found_v1_layers.add(v1_history['id']) for image in images: self.assertIn(image['id'], found_v1_layers) return blobs class RegistryTestsMixin(object): def test_push_pull_logging(self): # Push a new repository. self.do_push('public', 'newrepo', 'public', 'password') # Retrieve the logs and ensure the push was added. self.conduct_api_login('public', 'password') result = self.conduct('GET', '/api/v1/repository/public/newrepo/logs') logs = result.json()['logs'] self.assertEquals(1, len(logs)) self.assertEquals('push_repo', logs[0]['kind']) self.assertEquals('public', logs[0]['performer']['name']) # Pull the repository. self.do_pull('public', 'newrepo', 'public', 'password') # Retrieve the logs and ensure the pull was added. result = self.conduct('GET', '/api/v1/repository/public/newrepo/logs') logs = result.json()['logs'] self.assertEquals(2, len(logs)) self.assertEquals('pull_repo', logs[0]['kind']) self.assertEquals('public', logs[0]['performer']['name']) def test_push_pull_logging_byrobot(self): # Lookup the robot's password. self.conduct_api_login('devtable', 'password') resp = self.conduct('GET', '/api/v1/organization/buynlarge/robots/ownerbot') robot_token = json.loads(resp.text)['token'] # Push a new repository. self.do_push('buynlarge', 'newrepo', 'buynlarge+ownerbot', robot_token) # Retrieve the logs and ensure the push was added. result = self.conduct('GET', '/api/v1/repository/buynlarge/newrepo/logs') logs = result.json()['logs'] self.assertEquals(1, len(logs)) self.assertEquals('push_repo', logs[0]['kind']) self.assertEquals('buynlarge+ownerbot', logs[0]['performer']['name']) # Pull the repository. self.do_pull('buynlarge', 'newrepo', 'buynlarge+ownerbot', robot_token) # Retrieve the logs and ensure the pull was added. result = self.conduct('GET', '/api/v1/repository/buynlarge/newrepo/logs') logs = result.json()['logs'] self.assertEquals(2, len(logs)) self.assertEquals('pull_repo', logs[0]['kind']) self.assertEquals('buynlarge+ownerbot', logs[0]['performer']['name']) def test_push_pull_logging_byoauth(self): # Push the repository. self.do_push('devtable', 'newrepo', 'devtable', 'password') # Pull the repository. self.do_pull('devtable', 'newrepo', '$oauthtoken', 'test') # Retrieve the logs and ensure the pull was added. self.conduct_api_login('devtable', 'password') result = self.conduct('GET', '/api/v1/repository/devtable/newrepo/logs') logs = result.json()['logs'] self.assertEquals(2, len(logs)) self.assertEquals('pull_repo', logs[0]['kind']) self.assertEquals('devtable', logs[0]['performer']['name']) self.assertEquals(1, logs[0]['metadata']['oauth_token_id']) def test_pull_publicrepo_anonymous(self): # Add a new repository under the public user, so we have a real repository to pull. self.do_push('public', 'newrepo', 'public', 'password') self.clearSession() # First try to pull the (currently private) repo anonymously, which should fail (since it is # private) self.do_pull('public', 'newrepo', expected_code=403) # Make the repository public. self.conduct_api_login('public', 'password') self.change_repo_visibility('public', 'newrepo', 'public') self.clearSession() # Pull the repository anonymously, which should succeed because the repository is public. self.do_pull('public', 'newrepo') def test_pull_publicrepo_devtable(self): # Add a new repository under the public user, so we have a real repository to pull. self.do_push('public', 'newrepo', 'public', 'password') self.clearSession() # First try to pull the (currently private) repo as devtable, which should fail as it belongs # to public. self.do_pull('public', 'newrepo', 'devtable', 'password', expected_code=403) # Make the repository public. self.conduct_api_login('public', 'password') self.change_repo_visibility('public', 'newrepo', 'public') self.clearSession() # Pull the repository as devtable, which should succeed because the repository is public. self.do_pull('public', 'newrepo', 'devtable', 'password') def test_pull_private_repo(self): # Add a new repository under the devtable user, so we have a real repository to pull. self.do_push('devtable', 'newrepo', 'devtable', 'password') self.clearSession() # First try to pull the (currently private) repo as public, which should fail as it belongs # to devtable. self.do_pull('devtable', 'newrepo', 'public', 'password', expected_code=403) # Pull the repository as devtable, which should succeed because the repository is owned by # devtable. self.do_pull('devtable', 'newrepo', 'devtable', 'password') def test_public_no_anonymous_access_with_auth(self): # Turn off anonymous access. with TestFeature(self, 'ANONYMOUS_ACCESS', False): # Add a new repository under the public user, so we have a real repository to pull. self.do_push('public', 'newrepo', 'public', 'password') self.clearSession() # First try to pull the (currently private) repo as devtable, which should fail as it belongs # to public. self.do_pull('public', 'newrepo', 'devtable', 'password', expected_code=403) # Make the repository public. self.conduct_api_login('public', 'password') self.change_repo_visibility('public', 'newrepo', 'public') self.clearSession() # Pull the repository as devtable, which should succeed because the repository is public. self.do_pull('public', 'newrepo', 'devtable', 'password') def test_private_no_anonymous_access(self): # Turn off anonymous access. with TestFeature(self, 'ANONYMOUS_ACCESS', False): # Add a new repository under the public user, so we have a real repository to pull. self.do_push('public', 'newrepo', 'public', 'password') self.clearSession() # First try to pull the (currently private) repo as devtable, which should fail as it belongs # to public. self.do_pull('public', 'newrepo', 'devtable', 'password', expected_code=403) # Pull the repository as public, which should succeed because the repository is owned by public. self.do_pull('public', 'newrepo', 'public', 'password') def test_public_no_anonymous_access_no_auth(self): # Turn off anonymous access. with TestFeature(self, 'ANONYMOUS_ACCESS', False): # Add a new repository under the public user, so we have a real repository to pull. self.do_push('public', 'newrepo', 'public', 'password') self.clearSession() # First try to pull the (currently private) repo as anonymous, which should fail as it # is private. self.do_pull('public', 'newrepo', expected_code=401) # Make the repository public. self.conduct_api_login('public', 'password') self.change_repo_visibility('public', 'newrepo', 'public') self.clearSession() # Try again to pull the (currently public) repo as anonymous, which should fail as # anonymous access is disabled. self.do_pull('public', 'newrepo', expected_code=401) # Pull the repository as public, which should succeed because the repository is owned by public. self.do_pull('public', 'newrepo', 'public', 'password') # Pull the repository as devtable, which should succeed because the repository is public. self.do_pull('public', 'newrepo', 'devtable', 'password') def test_create_repo_creator_user(self): self.do_push('buynlarge', 'newrepo', 'creator', 'password') # Pull the repository as devtable, which should succeed because the repository is owned by the # org. self.do_pull('buynlarge', 'newrepo', 'devtable', 'password') def test_create_repo_robot_owner(self): # Lookup the robot's password. self.conduct_api_login('devtable', 'password') resp = self.conduct('GET', '/api/v1/organization/buynlarge/robots/ownerbot') robot_token = json.loads(resp.text)['token'] self.do_push('buynlarge', 'newrepo', 'buynlarge+ownerbot', robot_token) # Pull the repository as devtable, which should succeed because the repository is owned by the # org. self.do_pull('buynlarge', 'newrepo', 'devtable', 'password') def test_create_repo_robot_creator(self): # Lookup the robot's password. self.conduct_api_login('devtable', 'password') resp = self.conduct('GET', '/api/v1/organization/buynlarge/robots/creatorbot') robot_token = json.loads(resp.text)['token'] self.do_push('buynlarge', 'newrepo', 'buynlarge+creatorbot', robot_token) # Pull the repository as devtable, which should succeed because the repository is owned by the # org. self.do_pull('buynlarge', 'newrepo', 'devtable', 'password') class V1RegistryTests(V1RegistryPullMixin, V1RegistryPushMixin, RegistryTestsMixin, RegistryTestCaseMixin, LiveServerTestCase): """ Tests for V1 registry. """ def test_push_reponame_with_slashes(self): # Attempt to add a repository name with slashes. This should fail as we do not support it. images = [{ 'id': 'onlyimagehere' }] self.do_push('public', 'newrepo/somesubrepo', 'public', 'password', images, expected_code=400) def test_push_unicode_metadata(self): self.conduct_api_login('devtable', 'password') images = [{ 'id': 'onlyimagehere', 'comment': 'Pawe\xc5\x82 Kami\xc5\x84ski '.decode('utf-8') }] self.do_push('devtable', 'unicodetest', 'devtable', 'password', images) self.do_pull('devtable', 'unicodetest', 'devtable', 'password', images=images) def test_tag_validation(self): image_id = 'onlyimagehere' images = [{ 'id': image_id }] self.do_push('public', 'newrepo', 'public', 'password', images) self.do_tag('public', 'newrepo', '1', image_id) self.do_tag('public', 'newrepo', 'x' * 128, image_id) self.do_tag('public', 'newrepo', '', image_id, expected_code=400) self.do_tag('public', 'newrepo', 'x' * 129, image_id, expected_code=400) self.do_tag('public', 'newrepo', '.fail', image_id, expected_code=400) self.do_tag('public', 'newrepo', '-fail', image_id, expected_code=400) class V2RegistryTests(V2RegistryPullMixin, V2RegistryPushMixin, RegistryTestsMixin, RegistryTestCaseMixin, LiveServerTestCase): """ Tests for V2 registry. """ def test_delete_manifest(self): # Push a new repo with the latest tag. (_, digest) = self.do_push('devtable', 'newrepo', 'devtable', 'password') # Ensure the pull works. self.do_pull('devtable', 'newrepo', 'devtable', 'password') # Conduct auth for the write scope. self.do_auth('devtable', 'password', 'devtable', 'newrepo', scopes=['push']) # Delete the digest. self.conduct('DELETE', '/v2/devtable/newrepo/manifests/' + digest, auth='jwt', expected_code=202) # Ensure the tag no longer exists. self.do_pull('devtable', 'newrepo', 'devtable', 'password', expected_manifest_code=404) def test_push_only_push_scope(self): images = [{ 'id': 'onlyimagehere', 'contents': 'foobar', }] self.do_push('devtable', 'somenewrepo', 'devtable', 'password', images, scopes=['push']) def test_attempt_push_only_push_scope(self): images = [{ 'id': 'onlyimagehere', 'contents': 'foobar', }] self.do_push('public', 'somenewrepo', 'devtable', 'password', images, scopes=['push'], expected_auth_code=403) def test_push_reponame_with_slashes(self): # Attempt to add a repository name with slashes. This should fail as we do not support it. images = [{ 'id': 'onlyimagehere' }] self.do_push('public', 'newrepo/somesubrepo', 'devtable', 'password', images, expected_auth_code=400) def test_invalid_push(self): self.do_push('devtable', 'newrepo', 'devtable', 'password', invalid=True) def test_cancel_push(self): self.do_push('devtable', 'newrepo', 'devtable', 'password', cancel=True) def test_pull_by_checksum(self): # Add a new repository under the user, so we have a real repository to pull. _, digest = self.do_push('devtable', 'newrepo', 'devtable', 'password') # Attempt to pull by digest. self.do_pull('devtable', 'newrepo', 'devtable', 'password', manifest_id=digest) def test_pull_invalid_image_tag(self): # Add a new repository under the user, so we have a real repository to pull. self.do_push('devtable', 'newrepo', 'devtable', 'password') self.clearSession() # Attempt to pull the invalid tag. self.do_pull('devtable', 'newrepo', 'devtable', 'password', manifest_id='invalid', expected_manifest_code=404) def test_partial_upload_below_5mb(self): chunksize = 1024 * 1024 * 2 size = chunksize * 3 contents = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(size)) chunk_count = int(math.ceil((len(contents) * 1.0) / chunksize)) chunks = [(index * chunksize, (index + 1)*chunksize) for index in range(chunk_count)] images = [ { 'id':'someid', 'contents': contents, 'chunks': chunks } ] # Push the chunked upload. self.do_push('devtable', 'newrepo', 'devtable', 'password', images=images) # Pull the image back and verify the contents. blobs = self.do_pull('devtable', 'newrepo', 'devtable', 'password', images=images) self.assertEquals(len(blobs.items()), 1) self.assertEquals(blobs.items()[0][1], contents) def test_partial_upload_way_below_5mb(self): size = 1024 contents = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(size)) chunks = [(0, 100), (100, size)] images = [ { 'id':'someid', 'contents': contents, 'chunks': chunks } ] # Push the chunked upload. self.do_push('devtable', 'newrepo', 'devtable', 'password', images=images) # Pull the image back and verify the contents. blobs = self.do_pull('devtable', 'newrepo', 'devtable', 'password', images=images) self.assertEquals(len(blobs.items()), 1) self.assertEquals(blobs.items()[0][1], contents) def test_partial_upload_resend_below_5mb(self): size = 150 contents = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(size)) chunks = [(0, 100), (10, size)] images = [ { 'id':'someid', 'contents': contents, 'chunks': chunks } ] # Push the chunked upload. self.do_push('devtable', 'newrepo', 'devtable', 'password', images=images) # Pull the image back and verify the contents. blobs = self.do_pull('devtable', 'newrepo', 'devtable', 'password', images=images) self.assertEquals(len(blobs.items()), 1) self.assertEquals(blobs.items()[0][1], contents) def test_partial_upload_try_resend_with_gap(self): size = 150 contents = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(size)) chunks = [(0, 100), (101, size, 416)] images = [ { 'id':'someid', 'contents': contents, 'chunks': chunks } ] # Attempt to push the chunked upload, which should fail. self.do_push('devtable', 'newrepo', 'devtable', 'password', images=images) def test_multiple_layers_invalid(self): # Attempt to push a manifest with an image depending on an unknown base layer. images = [ { 'id': 'latestid', 'contents': 'the latest image', 'parent': 'baseid', } ] self.do_push('devtable', 'newrepo', 'devtable', 'password', images=images, expected_manifest_code=400) def test_multiple_layers(self): # Push a manifest with multiple layers. images = [ { 'id': 'latestid', 'contents': 'the latest image', 'parent': 'baseid', }, { 'id': 'baseid', 'contents': 'The base image', } ] self.do_push('devtable', 'newrepo', 'devtable', 'password', images=images) def test_invalid_regname(self): self.do_push('devtable', 'this/is/a/repo', 'devtable', 'password', expected_auth_code=400) def test_multiple_tags(self): latest_images = [ { 'id': 'latestid', 'contents': 'the latest image' } ] foobar_images = [ { 'id': 'foobarid', 'contents': 'the foobar image', } ] # Create the repo. self.do_push('devtable', 'newrepo', 'devtable', 'password', images=latest_images, tag_name='latest') self.do_push('devtable', 'newrepo', 'devtable', 'password', images=foobar_images, tag_name='foobar') # Retrieve the tags. response = self.conduct('GET', '/v2/devtable/newrepo/tags/list', auth='jwt', expected_code=200) data = json.loads(response.text) self.assertEquals(data['name'], "devtable/newrepo") self.assertIn('latest', data['tags']) self.assertIn('foobar', data['tags']) # Retrieve the tags with pagination. response = self.conduct('GET', '/v2/devtable/newrepo/tags/list', auth='jwt', params=dict(n=1), expected_code=200) data = json.loads(response.text) self.assertEquals(data['name'], "devtable/newrepo") self.assertEquals(len(data['tags']), 1) self.assertTrue(response.headers['Link'].find('n=1&last=2') > 0) # Try to get tags before a repo exists. self.conduct('GET', '/v2/devtable/doesnotexist/tags/list', auth='jwt', expected_code=401) class V1PushV2PullRegistryTests(V2RegistryPullMixin, V1RegistryPushMixin, RegistryTestsMixin, RegistryTestCaseMixin, LiveServerTestCase): """ Tests for V1 push, V2 pull registry. """ class V1PullV2PushRegistryTests(V1RegistryPullMixin, V2RegistryPushMixin, RegistryTestsMixin, RegistryTestCaseMixin, LiveServerTestCase): """ Tests for V1 pull, V2 push registry. """ class VerbTests(RegistryTestCaseMixin, V1RegistryPushMixin, LiveServerTestCase): """ Tests for registry verbs. """ def get_squashed_image(self): response = self.conduct('GET', '/c1/squash/devtable/newrepo/latest', auth='sig') tar = tarfile.open(fileobj=StringIO(response.content)) return tar def test_squashed_changes(self): initial_images = [ { 'id': 'initialid', 'contents': 'the initial image', }, ] # Create the repo. self.do_push('devtable', 'newrepo', 'devtable', 'password', images=initial_images) initial_image_id = '91081df45b58dc62dd207441785eef2b895f0383fbe601c99a3cf643c79957dc' # Pull the squashed version of the tag. tar = self.get_squashed_image() self.assertTrue(initial_image_id in tar.getnames()) # Change the images. updated_images = [ { 'id': 'updatedid', 'contents': 'the updated image', }, ] self.do_push('devtable', 'newrepo', 'devtable', 'password', images=updated_images) updated_image_id = '38df4bd4cdffc6b7d656dbd2813c73e864f2d362ad887c999ac315224ad281ac' # Pull the squashed version of the tag and ensure it has changed. tar = self.get_squashed_image() self.assertTrue(updated_image_id in tar.getnames()) def test_multilayer_squashing(self): images = [ { 'id': 'latestid', 'contents': 'the latest image', 'parent': 'baseid', }, { 'id': 'baseid', 'contents': 'The base image', } ] # Create the repo. self.do_push('devtable', 'newrepo', 'devtable', 'password', images=images) # Pull the squashed version of the tag. expected_image_id = 'bd590ae79fba5ebc6550aaf016c0bd0f49b1d78178e0f83e0ca1c56c2bb7e7bf' expected_names = ['repositories', expected_image_id, '%s/json' % expected_image_id, '%s/VERSION' % expected_image_id, '%s/layer.tar' % expected_image_id] tar = self.get_squashed_image() self.assertEquals(expected_names, tar.getnames()) self.assertEquals('1.0', tar.extractfile(tar.getmember('%s/VERSION' % expected_image_id)).read()) json_data = (tar.extractfile(tar.getmember('%s/json' % expected_image_id)).read()) # Ensure the JSON loads and parses. result = json.loads(json_data) self.assertEquals(expected_image_id, result['id']) # Ensure that the "image_name" file refers to the latest image, as it is the top layer. layer_tar = tarfile.open(fileobj=tar.extractfile(tar.getmember('%s/layer.tar' % expected_image_id))) image_name = layer_tar.extractfile(layer_tar.getmember('image_name')).read() self.assertEquals('latestid', image_name) class V1RegistryLoginMixin(object): def do_login(self, username, password, scope, expect_success=True): data = { 'username': username, 'password': password, } response = self.conduct('POST', '/v1/users/', json_data=data, expected_code=400) if expect_success: self.assertEquals(response.text, '"Username or email already exists"') else: self.assertNotEquals(response.text, '"Username or email already exists"') class V2RegistryLoginMixin(object): def do_login(self, username, password, scope, expect_success=True, expected_code=None): params = { 'account': username, 'scope': scope, 'service': app.config['SERVER_HOSTNAME'], } if expected_code is None: if expect_success: expected_code = 200 else: expected_code = 403 response = self.conduct('GET', '/v2/auth', params=params, auth=(username, password), expected_code=expected_code) return response class LoginTests(object): """ Generic tests for registry login. """ def test_invalid_username_knownrepo(self): self.do_login('invaliduser', 'somepassword', expect_success=False, scope='repository:devtable/simple:pull') def test_invalid_password_knownrepo(self): self.do_login('devtable', 'somepassword', expect_success=False, scope='repository:devtable/simple:pull') def test_validuser_knownrepo(self): self.do_login('devtable', 'password', expect_success=True, scope='repository:devtable/simple:pull') def test_validuser_encryptedpass(self): # Generate an encrypted password. self.conduct_api_login('devtable', 'password') resp = self.conduct('POST', '/api/v1/user/clientkey', json_data=dict(password='password')) encryptedpassword = resp.json()['key'] self.do_login('devtable', encryptedpassword, expect_success=True, scope='repository:devtable/simple:pull') def test_robotkey(self): # Lookup the robot's password. self.conduct_api_login('devtable', 'password') resp = self.conduct('GET', '/api/v1/user/robots/dtrobot') robot_token = resp.json()['token'] self.do_login('devtable+dtrobot', robot_token, expect_success=True, scope='repository:devtable/complex:pull') def test_oauth(self): self.do_login('$oauthtoken', 'test', expect_success=True, scope='repository:devtable/complex:pull') class V1LoginTests(V1RegistryLoginMixin, LoginTests, RegistryTestCaseMixin, BaseRegistryMixin, LiveServerTestCase): """ Tests for V1 login. """ pass # No additional tests. class V2LoginTests(V2RegistryLoginMixin, LoginTests, RegistryTestCaseMixin, BaseRegistryMixin, LiveServerTestCase): """ Tests for V2 login. """ def test_nouser_noscope(self): self.do_login('', '', expected_code=401, scope='') def test_validuser_unknownrepo(self): self.do_login('devtable', 'password', expect_success=False, scope='repository:invalidnamespace/simple:pull') def test_validuser_unknownnamespacerepo(self): self.do_login('devtable', 'password', expect_success=True, scope='repository:devtable/newrepo:push') def test_validuser_noaccess(self): self.do_login('public', 'password', expect_success=False, scope='repository:devtable/simple:pull') def test_validuser_noscope(self): self.do_login('public', 'password', expect_success=True, scope=None) def test_invaliduser_noscope(self): self.do_login('invaliduser', 'invalidpass', expected_code=401, scope=None) def test_invalidpassword_noscope(self): self.do_login('public', 'invalidpass', expected_code=401, scope=None) def test_oauth_noaccess(self): self.do_login('$oauthtoken', 'test', expect_success=False, scope='repository:public/publicrepo:pull,push') def test_nouser_pull_publicrepo(self): self.do_login('', '', expect_success=True, scope='repository:public/publicrepo:pull') def test_nouser_push_publicrepo(self): self.do_login('', '', expected_code=401, scope='repository:public/publicrepo:push') if __name__ == '__main__': unittest.main()