import unittest import requests from flask import request, jsonify from flask.blueprints import Blueprint from flask.ext.testing import LiveServerTestCase from app import app from data.database import close_db_filter, configure from endpoints.v1 import v1_bp from endpoints.api import api_bp from initdb import wipe_database, initialize_database, populate_database from endpoints.csrf import generate_csrf_token from tempfile import NamedTemporaryFile import endpoints.decorated import json import features import tarfile import shutil from cStringIO import StringIO from digest.checksums import compute_simple try: app.register_blueprint(v1_bp, url_prefix='/v1') app.register_blueprint(api_bp, url_prefix='/api') except ValueError: # Blueprint was already registered pass # Add a test blueprint for generating CSRF tokens, setting feature flags and reloading the # DB connection. testbp = Blueprint('testbp', __name__) @testbp.route('/csrf', methods=['GET']) def generate_csrf(): return generate_csrf_token() @testbp.route('/feature/', methods=['POST']) def set_feature(feature_name): import features old_value = features._FEATURES[feature_name].value features._FEATURES[feature_name].value = request.get_json()['value'] return jsonify({'old_value': old_value}) @testbp.route('/reloaddb', methods=['POST']) def reload_db(): # Close any existing connection. close_db_filter(None) # Reload the database config. configure(app.config) return 'OK' app.register_blueprint(testbp, url_prefix='/__test') class TestFeature(object): """ Helper object which temporarily sets the value of a feature flag. """ def __init__(self, test_case, feature_flag, test_value): self.test_case = test_case self.feature_flag = feature_flag self.test_value = test_value self.old_value = None def __enter__(self): result = self.test_case.conduct('POST', '/__test/feature/' + self.feature_flag, data=json.dumps(dict(value=self.test_value)), headers={'Content-Type': 'application/json'}) result_data = json.loads(result.text) self.old_value = result_data['old_value'] def __exit__(self, type, value, traceback): self.test_case.conduct('POST', '/__test/feature/' + self.feature_flag, data=json.dumps(dict(value=self.old_value)), headers={'Content-Type': 'application/json'}) _PORT_NUMBER = 5001 _CLEAN_DATABASE_PATH = None def get_new_database_uri(): # If a clean copy of the database has not yet been created, create one now. global _CLEAN_DATABASE_PATH if not _CLEAN_DATABASE_PATH: wipe_database() initialize_database() populate_database() close_db_filter(None) # Save the path of the clean database. _CLEAN_DATABASE_PATH = app.config['TEST_DB_FILE'].name # Create a new temp file to be used as the actual backing database for the test. # Note that we have the close() the file to ensure we can copy to it via shutil. local_db_file = NamedTemporaryFile(delete=True) local_db_file.close() # Copy the clean database to the path. shutil.copy2(_CLEAN_DATABASE_PATH, local_db_file.name) return 'sqlite:///{0}'.format(local_db_file.name) class RegistryTestCase(LiveServerTestCase): maxDiff = None def create_app(self): global _PORT_NUMBER _PORT_NUMBER = _PORT_NUMBER + 1 app.config['TESTING'] = True app.config['LIVESERVER_PORT'] = _PORT_NUMBER app.config['DB_URI'] = get_new_database_uri() return app def setUp(self): self.clearSession() # Tell the remote running app to reload the database. By default, the app forks from the # current context and has already loaded the DB config with the *original* DB URL. We call # the remote reload method to force it to pick up the changes to DB_URI set in the create_app # method. self.conduct('POST', '/__test/reloaddb') def clearSession(self): self.session = requests.Session() self.signature = None self.docker_token = 'true' # Load the CSRF token. self.csrf_token = '' self.csrf_token = self.conduct('GET', '/__test/csrf').text def conduct(self, method, url, headers=None, data=None, auth=None, expected_code=200): headers = headers or {} headers['X-Docker-Token'] = self.docker_token if self.signature and not auth: headers['Authorization'] = 'token ' + self.signature response = self.session.request(method, self.get_server_url() + url, headers=headers, data=data, auth=auth, params=dict(_csrf_token=self.csrf_token)) if response.status_code != expected_code: print response.text if 'www-authenticate' in response.headers: self.signature = response.headers['www-authenticate'] if 'X-Docker-Token' in response.headers: self.docker_token = response.headers['X-Docker-Token'] self.assertEquals(response.status_code, expected_code) return response def ping(self): self.conduct('GET', '/v1/_ping') def do_login(self, username, password='password'): self.ping() result = self.conduct('POST', '/v1/users/', data=json.dumps(dict(username=username, password=password, email='bar@example.com')), headers={"Content-Type": "application/json"}, expected_code=400) self.assertEquals(result.text, '"Username or email already exists"') self.conduct('GET', '/v1/users/', auth=(username, password)) def do_push(self, namespace, repository, username, password, images, expected_code=201): auth = (username, password) # Ping! self.ping() # PUT /v1/repositories/{namespace}/{repository}/ data = [{"id": image['id']} for image in images] self.conduct('PUT', '/v1/repositories/%s/%s' % (namespace, repository), data=json.dumps(data), auth=auth, expected_code=expected_code) if expected_code != 201: return for image in images: # PUT /v1/images/{imageID}/json self.conduct('PUT', '/v1/images/%s/json' % image['id'], data=json.dumps(image)) # PUT /v1/images/{imageID}/layer tar_file_info = tarfile.TarInfo(name='image_name') tar_file_info.type = tarfile.REGTYPE tar_file_info.size = len(image['id']) layer_data = StringIO() tar_file = tarfile.open(fileobj=layer_data, mode='w|gz') tar_file.addfile(tar_file_info, StringIO(image['id'])) tar_file.close() layer_bytes = layer_data.getvalue() layer_data.close() self.conduct('PUT', '/v1/images/%s/layer' % image['id'], data=StringIO(layer_bytes)) # PUT /v1/images/{imageID}/checksum checksum = compute_simple(StringIO(layer_bytes), json.dumps(image)) self.conduct('PUT', '/v1/images/%s/checksum' % image['id'], headers={'X-Docker-Checksum-Payload': checksum}) # PUT /v1/repositories/{namespace}/{repository}/tags/latest self.do_tag(namespace, repository, 'latest', images[0]['id']) # PUT /v1/repositories/{namespace}/{repository}/images self.conduct('PUT', '/v1/repositories/%s/%s/images' % (namespace, repository), expected_code=204) def do_pull(self, namespace, repository, username=None, password='password', expected_code=200): auth = None if username: auth = (username, password) # Ping! self.ping() prefix = '/v1/repositories/%s/%s/' % (namespace, repository) # GET /v1/repositories/{namespace}/{repository}/ self.conduct('GET', prefix + 'images', auth=auth, expected_code=expected_code) if expected_code != 200: # Push was expected to fail, so nothing more to do for the push. return # GET /v1/repositories/{namespace}/{repository}/ result = json.loads(self.conduct('GET', prefix + 'tags').text) for image_id in result.values(): # /v1/images/{imageID}/{ancestry, json, layer} image_prefix = '/v1/images/%s/' % image_id self.conduct('GET', image_prefix + 'ancestry') self.conduct('GET', image_prefix + 'json') self.conduct('GET', image_prefix + 'layer') def do_tag(self, namespace, repository, tag, image_id, expected_code=200): self.conduct('PUT', '/v1/repositories/%s/%s/tags/%s' % (namespace, repository, tag), data='"%s"' % image_id, expected_code=expected_code) def conduct_api_login(self, username, password): self.conduct('POST', '/api/v1/signin', data=json.dumps(dict(username=username, password=password)), headers={'Content-Type': 'application/json'}) def change_repo_visibility(self, repository, namespace, visibility): self.conduct('POST', '/api/v1/repository/%s/%s/changevisibility' % (repository, namespace), data=json.dumps(dict(visibility=visibility)), headers={'Content-Type': 'application/json'}) class RegistryTests(RegistryTestCase): def test_push_reponame_with_slashes(self): # Attempt to add a repository name with slashes. This should fail as we do not support it. images = [{ 'id': 'onlyimagehere' }] self.do_push('public', 'newrepo/somesubrepo', 'public', 'password', images, expected_code=400) def test_pull_publicrepo_anonymous(self): # Add a new repository under the public user, so we have a real repository to pull. images = [{ 'id': 'onlyimagehere' }] self.do_push('public', 'newrepo', 'public', 'password', images) self.clearSession() # First try to pull the (currently private) repo anonymously, which should fail (since it is # private) self.do_pull('public', 'newrepo', expected_code=403) # Make the repository public. self.conduct_api_login('public', 'password') self.change_repo_visibility('public', 'newrepo', 'public') self.clearSession() # Pull the repository anonymously, which should succeed because the repository is public. self.do_pull('public', 'newrepo') def test_pull_publicrepo_devtable(self): # Add a new repository under the public user, so we have a real repository to pull. images = [{ 'id': 'onlyimagehere' }] self.do_push('public', 'newrepo', 'public', 'password', images) self.clearSession() # First try to pull the (currently private) repo as devtable, which should fail as it belongs # to public. self.do_pull('public', 'newrepo', 'devtable', 'password', expected_code=403) # Make the repository public. self.conduct_api_login('public', 'password') self.change_repo_visibility('public', 'newrepo', 'public') self.clearSession() # Pull the repository as devtable, which should succeed because the repository is public. self.do_pull('public', 'newrepo', 'devtable', 'password') def test_pull_private_repo(self): # Add a new repository under the devtable user, so we have a real repository to pull. images = [{ 'id': 'onlyimagehere' }] self.do_push('devtable', 'newrepo', 'devtable', 'password', images) self.clearSession() # First try to pull the (currently private) repo as public, which should fail as it belongs # to devtable. self.do_pull('devtable', 'newrepo', 'public', 'password', expected_code=403) # Pull the repository as devtable, which should succeed because the repository is owned by # devtable. self.do_pull('devtable', 'newrepo', 'devtable', 'password') def test_public_no_anonymous_access_with_auth(self): # Turn off anonymous access. with TestFeature(self, 'ANONYMOUS_ACCESS', False): # Add a new repository under the public user, so we have a real repository to pull. images = [{ 'id': 'onlyimagehere' }] self.do_push('public', 'newrepo', 'public', 'password', images) self.clearSession() # First try to pull the (currently private) repo as devtable, which should fail as it belongs # to public. self.do_pull('public', 'newrepo', 'devtable', 'password', expected_code=403) # Make the repository public. self.conduct_api_login('public', 'password') self.change_repo_visibility('public', 'newrepo', 'public') self.clearSession() # Pull the repository as devtable, which should succeed because the repository is public. self.do_pull('public', 'newrepo', 'devtable', 'password') def test_private_no_anonymous_access(self): # Turn off anonymous access. with TestFeature(self, 'ANONYMOUS_ACCESS', False): # Add a new repository under the public user, so we have a real repository to pull. images = [{ 'id': 'onlyimagehere' }] self.do_push('public', 'newrepo', 'public', 'password', images) self.clearSession() # First try to pull the (currently private) repo as devtable, which should fail as it belongs # to public. self.do_pull('public', 'newrepo', 'devtable', 'password', expected_code=403) # Pull the repository as public, which should succeed because the repository is owned by public. self.do_pull('public', 'newrepo', 'public', 'password') def test_public_no_anonymous_access_no_auth(self): # Turn off anonymous access. with TestFeature(self, 'ANONYMOUS_ACCESS', False): # Add a new repository under the public user, so we have a real repository to pull. images = [{ 'id': 'onlyimagehere' }] self.do_push('public', 'newrepo', 'public', 'password', images) self.clearSession() # First try to pull the (currently private) repo as anonymous, which should fail as it # is private. self.do_pull('public', 'newrepo', expected_code=401) # Make the repository public. self.conduct_api_login('public', 'password') self.change_repo_visibility('public', 'newrepo', 'public') self.clearSession() # Try again to pull the (currently public) repo as anonymous, which should fail as # anonymous access is disabled. self.do_pull('public', 'newrepo', expected_code=401) # Pull the repository as public, which should succeed because the repository is owned by public. self.do_pull('public', 'newrepo', 'public', 'password') # Pull the repository as devtable, which should succeed because the repository is public. self.do_pull('public', 'newrepo', 'devtable', 'password') def test_create_repo_creator_user(self): images = [{ 'id': 'onlyimagehere' }] self.do_push('buynlarge', 'newrepo', 'creator', 'password', images) # Pull the repository as devtable, which should succeed because the repository is owned by the # org. self.do_pull('buynlarge', 'newrepo', 'devtable', 'password') def test_create_repo_robot_owner(self): # Lookup the robot's password. self.conduct_api_login('devtable', 'password') resp = self.conduct('GET', '/api/v1/organization/buynlarge/robots/ownerbot') robot_token = json.loads(resp.text)['token'] images = [{ 'id': 'onlyimagehere' }] self.do_push('buynlarge', 'newrepo', 'buynlarge+ownerbot', robot_token, images) # Pull the repository as devtable, which should succeed because the repository is owned by the # org. self.do_pull('buynlarge', 'newrepo', 'devtable', 'password') def test_create_repo_robot_creator(self): # Lookup the robot's password. self.conduct_api_login('devtable', 'password') resp = self.conduct('GET', '/api/v1/organization/buynlarge/robots/creatorbot') robot_token = json.loads(resp.text)['token'] images = [{ 'id': 'onlyimagehere' }] self.do_push('buynlarge', 'newrepo', 'buynlarge+creatorbot', robot_token, images) # Pull the repository as devtable, which should succeed because the repository is owned by the # org. self.do_pull('buynlarge', 'newrepo', 'devtable', 'password') def test_push_unicode_metadata(self): self.conduct_api_login('devtable', 'password') images = [{ 'id': 'onlyimagehere', 'comment': 'Pawe\xc5\x82 Kami\xc5\x84ski '.decode('utf-8') }] self.do_push('devtable', 'unicodetest', 'devtable', 'password', images) self.do_pull('devtable', 'unicodetest', 'devtable', 'password') def test_tag_validation(self): image_id = 'onlyimagehere' images = [{ 'id': image_id }] self.do_push('public', 'newrepo', 'public', 'password', images) self.do_tag('public', 'newrepo', '1', image_id) self.do_tag('public', 'newrepo', 'x' * 128, image_id) self.do_tag('public', 'newrepo', '', image_id, expected_code=400) self.do_tag('public', 'newrepo', 'x' * 129, image_id, expected_code=400) self.do_tag('public', 'newrepo', '.fail', image_id, expected_code=400) self.do_tag('public', 'newrepo', '-fail', image_id, expected_code=400) if __name__ == '__main__': unittest.main()