import unittest import requests import os import Crypto.Random from flask import request, jsonify from flask.blueprints import Blueprint from flask.ext.testing import LiveServerTestCase from app import app from data.database import close_db_filter, configure from endpoints.v1 import v1_bp from endpoints.v2 import v2_bp from endpoints.v2.manifest import SignedManifestBuilder from endpoints.api import api_bp from initdb import wipe_database, initialize_database, populate_database from endpoints.csrf import generate_csrf_token from tempfile import NamedTemporaryFile import endpoints.decorated import json import features import hashlib import tarfile import shutil from jwkest.jws import SIGNER_ALGS from jwkest.jwk import RSAKey from Crypto.PublicKey import RSA from cStringIO import StringIO from digest.checksums import compute_simple try: app.register_blueprint(v1_bp, url_prefix='/v1') app.register_blueprint(v2_bp, url_prefix='/v2') app.register_blueprint(api_bp, url_prefix='/api') except ValueError: # Blueprint was already registered pass # Add a test blueprint for generating CSRF tokens, setting feature flags and reloading the # DB connection. testbp = Blueprint('testbp', __name__) @testbp.route('/csrf', methods=['GET']) def generate_csrf(): return generate_csrf_token() @testbp.route('/feature/', methods=['POST']) def set_feature(feature_name): import features old_value = features._FEATURES[feature_name].value features._FEATURES[feature_name].value = request.get_json()['value'] return jsonify({'old_value': old_value}) @testbp.route('/reloadapp', methods=['POST']) def reload_app(): # Close any existing connection. close_db_filter(None) # Reload the database config. configure(app.config) # Reload random after the process split, as it cannot be used uninitialized across forks. Crypto.Random.atfork() return 'OK' app.register_blueprint(testbp, url_prefix='/__test') class TestFeature(object): """ Helper object which temporarily sets the value of a feature flag. """ def __init__(self, test_case, feature_flag, test_value): self.test_case = test_case self.feature_flag = feature_flag self.test_value = test_value self.old_value = None def __enter__(self): result = self.test_case.conduct('POST', '/__test/feature/' + self.feature_flag, data=json.dumps(dict(value=self.test_value)), headers={'Content-Type': 'application/json'}) result_data = json.loads(result.text) self.old_value = result_data['old_value'] def __exit__(self, type, value, traceback): self.test_case.conduct('POST', '/__test/feature/' + self.feature_flag, data=json.dumps(dict(value=self.old_value)), headers={'Content-Type': 'application/json'}) _PORT_NUMBER = 5001 _CLEAN_DATABASE_PATH = None def get_new_database_uri(): # If a clean copy of the database has not yet been created, create one now. global _CLEAN_DATABASE_PATH if not _CLEAN_DATABASE_PATH: wipe_database() initialize_database() populate_database() close_db_filter(None) # Save the path of the clean database. _CLEAN_DATABASE_PATH = app.config['TEST_DB_FILE'].name # Create a new temp file to be used as the actual backing database for the test. # Note that we have the close() the file to ensure we can copy to it via shutil. local_db_file = NamedTemporaryFile(delete=True) local_db_file.close() # Copy the clean database to the path. shutil.copy2(_CLEAN_DATABASE_PATH, local_db_file.name) return 'sqlite:///{0}'.format(local_db_file.name) class RegistryTestCaseMixin(LiveServerTestCase): maxDiff = None def create_app(self): global _PORT_NUMBER _PORT_NUMBER = _PORT_NUMBER + 1 app.config['DEBUG'] = True app.config['TESTING'] = True app.config['LIVESERVER_PORT'] = _PORT_NUMBER app.config['DB_URI'] = get_new_database_uri() return app def setUp(self): self.clearSession() # Tell the remote running app to reload the database and app. By default, the app forks from the # current context and has already loaded the DB config with the *original* DB URL. We call # the remote reload method to force it to pick up the changes to DB_URI set in the create_app # method. self.conduct('POST', '/__test/reloadapp') def clearSession(self): self.session = requests.Session() self.signature = None self.docker_token = 'true' self.jwt = None # Load the CSRF token. self.csrf_token = '' self.csrf_token = self.conduct('GET', '/__test/csrf').text def conduct_api_login(self, username, password): self.conduct('POST', '/api/v1/signin', data=json.dumps(dict(username=username, password=password)), headers={'Content-Type': 'application/json'}) def change_repo_visibility(self, repository, namespace, visibility): self.conduct('POST', '/api/v1/repository/%s/%s/changevisibility' % (repository, namespace), data=json.dumps(dict(visibility=visibility)), headers={'Content-Type': 'application/json'}) class BaseRegistryMixin(object): def conduct(self, method, url, headers=None, data=None, auth=None, params=None, expected_code=200): params = params or {} params['_csrf_token'] = self.csrf_token headers = headers or {} auth_tuple = None if self.docker_token: headers['X-Docker-Token'] = self.docker_token if auth == 'sig': if self.signature: headers['Authorization'] = 'token ' + self.signature elif auth == 'jwt': if self.jwt: headers['Authorization'] = 'Bearer ' + self.jwt elif auth: auth_tuple = auth response = self.session.request(method, self.get_server_url() + url, headers=headers, data=data, auth=auth_tuple, params=params) if response.status_code != expected_code: print response.text if 'www-authenticate' in response.headers: self.signature = response.headers['www-authenticate'] if 'X-Docker-Token' in response.headers: self.docker_token = response.headers['X-Docker-Token'] self.assertEquals(response.status_code, expected_code) return response class V1RegistryMixin(BaseRegistryMixin): def v1_ping(self): self.conduct('GET', '/v1/_ping') class V1RegistryPushMixin(V1RegistryMixin): def do_push(self, namespace, repository, username, password, images): auth = (username, password) # Ping! self.v1_ping() # PUT /v1/repositories/{namespace}/{repository}/ data = [{"id": image['id']} for image in images] self.conduct('PUT', '/v1/repositories/%s/%s' % (namespace, repository), data=json.dumps(data), auth=auth, expected_code=201) for image in images: # PUT /v1/images/{imageID}/json self.conduct('PUT', '/v1/images/%s/json' % image['id'], data=json.dumps(image), auth='sig') # PUT /v1/images/{imageID}/layer tar_file_info = tarfile.TarInfo(name='image_name') tar_file_info.type = tarfile.REGTYPE tar_file_info.size = len(image['id']) layer_data = StringIO() tar_file = tarfile.open(fileobj=layer_data, mode='w|gz') tar_file.addfile(tar_file_info, StringIO(image['id'])) tar_file.close() layer_bytes = layer_data.getvalue() layer_data.close() self.conduct('PUT', '/v1/images/%s/layer' % image['id'], data=StringIO(layer_bytes), auth='sig') # PUT /v1/images/{imageID}/checksum checksum = compute_simple(StringIO(layer_bytes), json.dumps(image)) self.conduct('PUT', '/v1/images/%s/checksum' % image['id'], headers={'X-Docker-Checksum-Payload': checksum}, auth='sig') # PUT /v1/repositories/{namespace}/{repository}/tags/latest self.conduct('PUT', '/v1/repositories/%s/%s/tags/latest' % (namespace, repository), data='"' + images[0]['id'] + '"', auth='sig') # PUT /v1/repositories/{namespace}/{repository}/images self.conduct('PUT', '/v1/repositories/%s/%s/images' % (namespace, repository), expected_code=204, auth='sig') class V1RegistryPullMixin(V1RegistryMixin): def do_pull(self, namespace, repository, username=None, password='password', expected_code=200): auth = None if username: auth = (username, password) # Ping! self.v1_ping() prefix = '/v1/repositories/%s/%s/' % (namespace, repository) # GET /v1/repositories/{namespace}/{repository}/ self.conduct('GET', prefix + 'images', auth=auth, expected_code=expected_code) if expected_code != 200: return # GET /v1/repositories/{namespace}/{repository}/ result = json.loads(self.conduct('GET', prefix + 'tags', auth='sig').text) for image_id in result.values(): # /v1/images/{imageID}/{ancestry, json, layer} image_prefix = '/v1/images/%s/' % image_id self.conduct('GET', image_prefix + 'ancestry', auth='sig') self.conduct('GET', image_prefix + 'json', auth='sig') self.conduct('GET', image_prefix + 'layer', auth='sig') class V2RegistryMixin(BaseRegistryMixin): def v2_ping(self): self.conduct('GET', '/v2/', expected_code=200 if self.jwt else 401, auth='jwt') def do_auth(self, username, password, namespace, repository, expected_code=200, scopes=[]): auth = (username, password) params = { 'account': username, 'scope': 'repository:%s/%s:%s' % (namespace, repository, ','.join(scopes)), 'service': app.config['SERVER_HOSTNAME'], } response = self.conduct('GET', '/v2/auth', params=params, auth=(username, password), expected_code=expected_code) if expected_code == 200: response_json = json.loads(response.text) self.assertIsNotNone(response_json.get('token')) self.jwt = response_json['token'] class V2RegistryPushMixin(V2RegistryMixin): def do_push(self, namespace, repository, username, password, images): # Ping! self.v2_ping() # Auth. self.do_auth(username, password, namespace, repository, scopes=['push', 'pull']) # Build a fake manifest. images = [('somelayer', 'some fake data')] tag_name = 'latest' builder = SignedManifestBuilder(namespace, repository, tag_name) for image_id, contents in images: checksum = 'sha256:' + hashlib.sha256(contents).hexdigest() builder.add_layer(checksum, json.dumps({'id': image_id, 'data': contents})) # Push the image's layers. for image_id, contents in images: # Layer data should not yet exist. checksum = 'sha256:' + hashlib.sha256(contents).hexdigest() self.conduct('HEAD', '/v2/%s/%s/blobs/%s' % (namespace, repository, checksum), expected_code=404, auth='jwt') # Start a new upload of the layer data. response = self.conduct('POST', '/v2/%s/%s/blobs/uploads/' % (namespace, repository), expected_code=202, auth='jwt') location = response.headers['Location'][len(self.get_server_url()):] # PATCH the image data into the layer. self.conduct('PATCH', location, data=contents, expected_code=204, auth='jwt') # Finish the layer upload with a PUT. self.conduct('PUT', location, params=dict(digest=checksum), expected_code=201, auth='jwt') # Write the manifest. new_key = RSA.generate(2048) jwk = RSAKey(key=new_key) manifest = builder.build(jwk) self.conduct('PUT', '/v2/%s/%s/manifests/%s' % (namespace, repository, tag_name), data=manifest.bytes, expected_code=202, headers={'Content-Type': 'application/json'}, auth='jwt') class V2RegistryPullMixin(V2RegistryMixin): def do_pull(self, namespace, repository, username=None, password='password', expected_code=200): # Ping! self.v2_ping() # Auth. self.do_auth(username, password, namespace, repository, scopes=['pull'], expected_code=expected_code) if expected_code != 200: return # Retrieve the manifest for the tag. tag_name = 'latest' response = self.conduct('GET', '/v2/%s/%s/manifests/%s' % (namespace, repository, tag_name), auth='jwt') manifest_data = json.loads(response.text) for layer in manifest_data['fsLayers']: blob_id = layer['blobSum'] self.conduct('GET', '/v2/%s/%s/blobs/%s' % (namespace, repository, blob_id), expected_code=200, auth='jwt') class RegistryTestsMixin(object): def test_pull_publicrepo_anonymous(self): # Add a new repository under the public user, so we have a real repository to pull. images = [{ 'id': 'onlyimagehere' }] self.do_push('public', 'newrepo', 'public', 'password', images) self.clearSession() # First try to pull the (currently private) repo anonymously, which should fail (since it is # private) self.do_pull('public', 'newrepo', expected_code=403) # Make the repository public. self.conduct_api_login('public', 'password') self.change_repo_visibility('public', 'newrepo', 'public') self.clearSession() # Pull the repository anonymously, which should succeed because the repository is public. self.do_pull('public', 'newrepo') def test_pull_publicrepo_devtable(self): # Add a new repository under the public user, so we have a real repository to pull. images = [{ 'id': 'onlyimagehere' }] self.do_push('public', 'newrepo', 'public', 'password', images) self.clearSession() # First try to pull the (currently private) repo as devtable, which should fail as it belongs # to public. self.do_pull('public', 'newrepo', 'devtable', 'password', expected_code=403) # Make the repository public. self.conduct_api_login('public', 'password') self.change_repo_visibility('public', 'newrepo', 'public') self.clearSession() # Pull the repository as devtable, which should succeed because the repository is public. self.do_pull('public', 'newrepo', 'devtable', 'password') def test_pull_private_repo(self): # Add a new repository under the devtable user, so we have a real repository to pull. images = [{ 'id': 'onlyimagehere' }] self.do_push('devtable', 'newrepo', 'devtable', 'password', images) self.clearSession() # First try to pull the (currently private) repo as public, which should fail as it belongs # to devtable. self.do_pull('devtable', 'newrepo', 'public', 'password', expected_code=403) # Pull the repository as devtable, which should succeed because the repository is owned by # devtable. self.do_pull('devtable', 'newrepo', 'devtable', 'password') def test_public_no_anonymous_access_with_auth(self): # Turn off anonymous access. with TestFeature(self, 'ANONYMOUS_ACCESS', False): # Add a new repository under the public user, so we have a real repository to pull. images = [{ 'id': 'onlyimagehere' }] self.do_push('public', 'newrepo', 'public', 'password', images) self.clearSession() # First try to pull the (currently private) repo as devtable, which should fail as it belongs # to public. self.do_pull('public', 'newrepo', 'devtable', 'password', expected_code=403) # Make the repository public. self.conduct_api_login('public', 'password') self.change_repo_visibility('public', 'newrepo', 'public') self.clearSession() # Pull the repository as devtable, which should succeed because the repository is public. self.do_pull('public', 'newrepo', 'devtable', 'password') def test_private_no_anonymous_access(self): # Turn off anonymous access. with TestFeature(self, 'ANONYMOUS_ACCESS', False): # Add a new repository under the public user, so we have a real repository to pull. images = [{ 'id': 'onlyimagehere' }] self.do_push('public', 'newrepo', 'public', 'password', images) self.clearSession() # First try to pull the (currently private) repo as devtable, which should fail as it belongs # to public. self.do_pull('public', 'newrepo', 'devtable', 'password', expected_code=403) # Pull the repository as public, which should succeed because the repository is owned by public. self.do_pull('public', 'newrepo', 'public', 'password') def test_public_no_anonymous_access_no_auth(self): # Turn off anonymous access. with TestFeature(self, 'ANONYMOUS_ACCESS', False): # Add a new repository under the public user, so we have a real repository to pull. images = [{ 'id': 'onlyimagehere' }] self.do_push('public', 'newrepo', 'public', 'password', images) self.clearSession() # First try to pull the (currently private) repo as anonymous, which should fail as it # is private. self.do_pull('public', 'newrepo', expected_code=401) # Make the repository public. self.conduct_api_login('public', 'password') self.change_repo_visibility('public', 'newrepo', 'public') self.clearSession() # Try again to pull the (currently public) repo as anonymous, which should fail as # anonymous access is disabled. self.do_pull('public', 'newrepo', expected_code=401) # Pull the repository as public, which should succeed because the repository is owned by public. self.do_pull('public', 'newrepo', 'public', 'password') # Pull the repository as devtable, which should succeed because the repository is public. self.do_pull('public', 'newrepo', 'devtable', 'password') def test_create_repo_creator_user(self): images = [{ 'id': 'onlyimagehere' }] self.do_push('buynlarge', 'newrepo', 'creator', 'password', images) # Pull the repository as devtable, which should succeed because the repository is owned by the # org. self.do_pull('buynlarge', 'newrepo', 'devtable', 'password') def test_create_repo_robot_owner(self): # Lookup the robot's password. self.conduct_api_login('devtable', 'password') resp = self.conduct('GET', '/api/v1/organization/buynlarge/robots/ownerbot') robot_token = json.loads(resp.text)['token'] images = [{ 'id': 'onlyimagehere' }] self.do_push('buynlarge', 'newrepo', 'buynlarge+ownerbot', robot_token, images) # Pull the repository as devtable, which should succeed because the repository is owned by the # org. self.do_pull('buynlarge', 'newrepo', 'devtable', 'password') def test_create_repo_robot_creator(self): # Lookup the robot's password. self.conduct_api_login('devtable', 'password') resp = self.conduct('GET', '/api/v1/organization/buynlarge/robots/creatorbot') robot_token = json.loads(resp.text)['token'] images = [{ 'id': 'onlyimagehere' }] self.do_push('buynlarge', 'newrepo', 'buynlarge+creatorbot', robot_token, images) # Pull the repository as devtable, which should succeed because the repository is owned by the # org. self.do_pull('buynlarge', 'newrepo', 'devtable', 'password') class V1RegistryTests(V1RegistryPullMixin, V1RegistryPushMixin, RegistryTestsMixin, RegistryTestCaseMixin, LiveServerTestCase): """ Tests for V1 registry. """ class V2RegistryTests(V2RegistryPullMixin, V2RegistryPushMixin, RegistryTestsMixin, RegistryTestCaseMixin, LiveServerTestCase): """ Tests for V2 registry. """ class V1PushV2PullRegistryTests(V2RegistryPullMixin, V1RegistryPushMixin, RegistryTestsMixin, RegistryTestCaseMixin, LiveServerTestCase): """ Tests for V1 push, V2 pull registry. """ class V1PullV2PushRegistryTests(V1RegistryPullMixin, V2RegistryPushMixin, RegistryTestsMixin, RegistryTestCaseMixin, LiveServerTestCase): """ Tests for V1 pull, V2 push registry. """ if __name__ == '__main__': unittest.main()