import unittest import requests from flask import request, jsonify from flask.blueprints import Blueprint from flask.ext.testing import LiveServerTestCase from app import app from endpoints.registry import registry from endpoints.index import index from endpoints.tags import tags from endpoints.api import api_bp from initdb import wipe_database, initialize_database, populate_database from endpoints.csrf import generate_csrf_token import endpoints.decorated import json import features import tarfile from cStringIO import StringIO from util.checksums import compute_simple try: app.register_blueprint(index, url_prefix='/v1') app.register_blueprint(tags, url_prefix='/v1') app.register_blueprint(registry, url_prefix='/v1') app.register_blueprint(api_bp, url_prefix='/api') except ValueError: # Blueprint was already registered pass # Add a test blueprint for generating CSRF tokens and setting feature flags. testbp = Blueprint('testbp', __name__) @testbp.route('/csrf', methods=['GET']) def generate_csrf(): return generate_csrf_token() @testbp.route('/feature/', methods=['POST']) def set_feature(feature_name): import features old_value = features._FEATURES[feature_name].value features._FEATURES[feature_name].value = request.get_json()['value'] return jsonify({'old_value': old_value}) app.register_blueprint(testbp, url_prefix='/__test') class TestFeature(object): """ Helper object which temporarily sets the value of a feature flag. """ def __init__(self, test_case, feature_flag, test_value): self.test_case = test_case self.feature_flag = feature_flag self.test_value = test_value self.old_value = None def __enter__(self): result = self.test_case.conduct('POST', '/__test/feature/' + self.feature_flag, data=json.dumps(dict(value=self.test_value)), headers={'Content-Type': 'application/json'}) result_data = json.loads(result.text) self.old_value = result_data['old_value'] def __exit__(self, type, value, traceback): self.test_case.conduct('POST', '/__test/feature/' + self.feature_flag, data=json.dumps(dict(value=self.old_value)), headers={'Content-Type': 'application/json'}) class RegistryTestCase(LiveServerTestCase): maxDiff = None def create_app(self): app.config['TESTING'] = True return app def setUp(self): # Note: We cannot use the normal savepoint-based DB setup here because we are accessing # different app instances remotely via a live webserver, which is multiprocess. Therefore, we # completely clear the database between tests. wipe_database() initialize_database() populate_database() self.clearSession() def clearSession(self): self.session = requests.Session() self.signature = None self.docker_token = 'true' # Load the CSRF token. self.csrf_token = '' self.csrf_token = self.conduct('GET', '/__test/csrf').text def conduct(self, method, url, headers=None, data=None, auth=None, expected_code=200): headers = headers or {} headers['X-Docker-Token'] = self.docker_token if self.signature and not auth: headers['Authorization'] = 'token ' + self.signature response = self.session.request(method, self.get_server_url() + url, headers=headers, data=data, auth=auth, params=dict(_csrf_token=self.csrf_token)) if response.status_code != expected_code: print response.text if 'www-authenticate' in response.headers: self.signature = response.headers['www-authenticate'] if 'X-Docker-Token' in response.headers: self.docker_token = response.headers['X-Docker-Token'] self.assertEquals(response.status_code, expected_code) return response def ping(self): self.conduct('GET', '/v1/_ping') def do_login(self, username, password='password'): self.ping() result = self.conduct('POST', '/v1/users/', data=json.dumps(dict(username=username, password=password, email='bar@example.com')), headers={"Content-Type": "application/json"}, expected_code=400) self.assertEquals(result.text, '"Username or email already exists"') self.conduct('GET', '/v1/users/', auth=(username, password)) def do_push(self, namespace, repository, username, password, images): auth = (username, password) # Ping! self.ping() # PUT /v1/repositories/{namespace}/{repository}/ data = [{"id": image['id']} for image in images] self.conduct('PUT', '/v1/repositories/%s/%s' % (namespace, repository), data=json.dumps(data), auth=auth, expected_code=201) for image in images: # PUT /v1/images/{imageID}/json self.conduct('PUT', '/v1/images/%s/json' % image['id'], data=json.dumps(image)) # PUT /v1/images/{imageID}/layer tar_file_info = tarfile.TarInfo(name='image_name') tar_file_info.type = tarfile.REGTYPE tar_file_info.size = len(image['id']) layer_data = StringIO() tar_file = tarfile.open(fileobj=layer_data, mode='w|gz') tar_file.addfile(tar_file_info, StringIO(image['id'])) tar_file.close() layer_bytes = layer_data.getvalue() layer_data.close() self.conduct('PUT', '/v1/images/%s/layer' % image['id'], data=StringIO(layer_bytes)) # PUT /v1/images/{imageID}/checksum checksum = compute_simple(StringIO(layer_bytes), json.dumps(image)) self.conduct('PUT', '/v1/images/%s/checksum' % image['id'], headers={'X-Docker-Checksum-Payload': checksum}) # PUT /v1/repositories/{namespace}/{repository}/tags/latest self.conduct('PUT', '/v1/repositories/%s/%s/tags/latest' % (namespace, repository), data='"' + images[0]['id'] + '"') # PUT /v1/repositories/{namespace}/{repository}/images self.conduct('PUT', '/v1/repositories/%s/%s/images' % (namespace, repository), expected_code=204) def do_pull(self, namespace, repository, username=None, password='password', expected_code=200): auth = None if username: auth = (username, password) # Ping! self.ping() prefix = '/v1/repositories/%s/%s/' % (namespace, repository) # GET /v1/repositories/{namespace}/{repository}/ self.conduct('GET', prefix + 'images', auth=auth, expected_code=expected_code) if expected_code != 200: return # GET /v1/repositories/{namespace}/{repository}/ result = json.loads(self.conduct('GET', prefix + 'tags').text) for image_id in result.values(): # /v1/images/{imageID}/{ancestry, json, layer} image_prefix = '/v1/images/%s/' % image_id self.conduct('GET', image_prefix + 'ancestry') self.conduct('GET', image_prefix + 'json') self.conduct('GET', image_prefix + 'layer') def conduct_api_login(self, username, password): self.conduct('POST', '/api/v1/signin', data=json.dumps(dict(username=username, password=password)), headers={'Content-Type': 'application/json'}) def change_repo_visibility(self, repository, namespace, visibility): self.conduct('POST', '/api/v1/repository/%s/%s/changevisibility' % (repository, namespace), data=json.dumps(dict(visibility=visibility)), headers={'Content-Type': 'application/json'}) class RegistryTests(RegistryTestCase): def test_pull_publicrepo_anonymous(self): # Add a new repository under the public user, so we have a real repository to pull. images = [{ 'id': 'onlyimagehere' }] self.do_push('public', 'newrepo', 'public', 'password', images) self.clearSession() # First try to pull the (currently private) repo anonymously, which should fail (since it is # private) self.do_pull('public', 'newrepo', expected_code=403) # Make the repository public. self.conduct_api_login('public', 'password') self.change_repo_visibility('public', 'newrepo', 'public') self.clearSession() # Pull the repository anonymously, which should succeed because the repository is public. self.do_pull('public', 'newrepo') def test_pull_publicrepo_devtable(self): # Add a new repository under the public user, so we have a real repository to pull. images = [{ 'id': 'onlyimagehere' }] self.do_push('public', 'newrepo', 'public', 'password', images) self.clearSession() # First try to pull the (currently private) repo as devtable, which should fail as it belongs # to public. self.do_pull('public', 'newrepo', 'devtable', 'password', expected_code=403) # Make the repository public. self.conduct_api_login('public', 'password') self.change_repo_visibility('public', 'newrepo', 'public') self.clearSession() # Pull the repository as devtable, which should succeed because the repository is public. self.do_pull('public', 'newrepo', 'devtable', 'password') def test_pull_private_repo(self): # Add a new repository under the devtable user, so we have a real repository to pull. images = [{ 'id': 'onlyimagehere' }] self.do_push('devtable', 'newrepo', 'devtable', 'password', images) self.clearSession() # First try to pull the (currently private) repo as public, which should fail as it belongs # to devtable. self.do_pull('devtable', 'newrepo', 'public', 'password', expected_code=403) # Pull the repository as devtable, which should succeed because the repository is owned by # devtable. self.do_pull('devtable', 'newrepo', 'devtable', 'password') def test_public_no_anonymous_access_with_auth(self): # Turn off anonymous access. with TestFeature(self, 'ANONYMOUS_ACCESS', False): # Add a new repository under the public user, so we have a real repository to pull. images = [{ 'id': 'onlyimagehere' }] self.do_push('public', 'newrepo', 'public', 'password', images) self.clearSession() # First try to pull the (currently private) repo as devtable, which should fail as it belongs # to public. self.do_pull('public', 'newrepo', 'devtable', 'password', expected_code=403) # Make the repository public. self.conduct_api_login('public', 'password') self.change_repo_visibility('public', 'newrepo', 'public') self.clearSession() # Pull the repository as devtable, which should succeed because the repository is public. self.do_pull('public', 'newrepo', 'devtable', 'password') def test_private_no_anonymous_access(self): # Turn off anonymous access. with TestFeature(self, 'ANONYMOUS_ACCESS', False): # Add a new repository under the public user, so we have a real repository to pull. images = [{ 'id': 'onlyimagehere' }] self.do_push('public', 'newrepo', 'public', 'password', images) self.clearSession() # First try to pull the (currently private) repo as devtable, which should fail as it belongs # to public. self.do_pull('public', 'newrepo', 'devtable', 'password', expected_code=403) # Pull the repository as public, which should succeed because the repository is owned by public. self.do_pull('public', 'newrepo', 'public', 'password') def test_public_no_anonymous_access_no_auth(self): # Turn off anonymous access. with TestFeature(self, 'ANONYMOUS_ACCESS', False): # Add a new repository under the public user, so we have a real repository to pull. images = [{ 'id': 'onlyimagehere' }] self.do_push('public', 'newrepo', 'public', 'password', images) self.clearSession() # First try to pull the (currently private) repo as anonymous, which should fail as it # is private. self.do_pull('public', 'newrepo', expected_code=401) # Make the repository public. self.conduct_api_login('public', 'password') self.change_repo_visibility('public', 'newrepo', 'public') self.clearSession() # Try again to pull the (currently public) repo as anonymous, which should fail as # anonymous access is disabled. self.do_pull('public', 'newrepo', expected_code=401) # Pull the repository as public, which should succeed because the repository is owned by public. self.do_pull('public', 'newrepo', 'public', 'password') # Pull the repository as devtable, which should succeed because the repository is public. self.do_pull('public', 'newrepo', 'devtable', 'password') if __name__ == '__main__': unittest.main()