import unittest import requests import os from flask import request, jsonify from flask.blueprints import Blueprint from flask.ext.testing import LiveServerTestCase from app import app from endpoints.v1 import v1_bp from endpoints.v2 import v2_bp from endpoints.v2.manifest import SignedManifestBuilder from endpoints.api import api_bp from initdb import wipe_database, initialize_database, populate_database from endpoints.csrf import generate_csrf_token import endpoints.decorated import json import features import hashlib import tarfile from jwkest.jws import SIGNER_ALGS from jwkest.jwk import RSAKey from Crypto.PublicKey import RSA from cStringIO import StringIO from digest.checksums import compute_simple try: app.register_blueprint(v1_bp, url_prefix='/v1') app.register_blueprint(v2_bp, url_prefix='/v2') app.register_blueprint(api_bp, url_prefix='/api') except ValueError: # Blueprint was already registered pass # Add a test blueprint for generating CSRF tokens and setting feature flags. testbp = Blueprint('testbp', __name__) @testbp.route('/csrf', methods=['GET']) def generate_csrf(): return generate_csrf_token() @testbp.route('/feature/', methods=['POST']) def set_feature(feature_name): import features old_value = features._FEATURES[feature_name].value features._FEATURES[feature_name].value = request.get_json()['value'] return jsonify({'old_value': old_value}) app.register_blueprint(testbp, url_prefix='/__test') class TestFeature(object): """ Helper object which temporarily sets the value of a feature flag. """ def __init__(self, test_case, feature_flag, test_value): self.test_case = test_case self.feature_flag = feature_flag self.test_value = test_value self.old_value = None def __enter__(self): result = self.test_case.conduct('POST', '/__test/feature/' + self.feature_flag, data=json.dumps(dict(value=self.test_value)), headers={'Content-Type': 'application/json'}) result_data = json.loads(result.text) self.old_value = result_data['old_value'] def __exit__(self, type, value, traceback): self.test_case.conduct('POST', '/__test/feature/' + self.feature_flag, data=json.dumps(dict(value=self.old_value)), headers={'Content-Type': 'application/json'}) class V1RegistryMixin(object): def conduct(self, method, url, headers=None, data=None, auth=None, expected_code=200): headers = headers or {} headers['X-Docker-Token'] = self.docker_token if self.signature and not auth: headers['Authorization'] = 'token ' + self.signature response = self.session.request(method, self.get_server_url() + url, headers=headers, data=data, auth=auth, params=dict(_csrf_token=self.csrf_token)) if response.status_code != expected_code: print response.text if 'www-authenticate' in response.headers: self.signature = response.headers['www-authenticate'] if 'X-Docker-Token' in response.headers: self.docker_token = response.headers['X-Docker-Token'] self.assertEquals(response.status_code, expected_code) return response def ping(self): self.conduct('GET', '/v1/_ping') def do_push(self, namespace, repository, username, password, images): auth = (username, password) # Ping! self.ping() # PUT /v1/repositories/{namespace}/{repository}/ data = [{"id": image['id']} for image in images] self.conduct('PUT', '/v1/repositories/%s/%s' % (namespace, repository), data=json.dumps(data), auth=auth, expected_code=201) for image in images: # PUT /v1/images/{imageID}/json self.conduct('PUT', '/v1/images/%s/json' % image['id'], data=json.dumps(image)) # PUT /v1/images/{imageID}/layer tar_file_info = tarfile.TarInfo(name='image_name') tar_file_info.type = tarfile.REGTYPE tar_file_info.size = len(image['id']) layer_data = StringIO() tar_file = tarfile.open(fileobj=layer_data, mode='w|gz') tar_file.addfile(tar_file_info, StringIO(image['id'])) tar_file.close() layer_bytes = layer_data.getvalue() layer_data.close() self.conduct('PUT', '/v1/images/%s/layer' % image['id'], data=StringIO(layer_bytes)) # PUT /v1/images/{imageID}/checksum checksum = compute_simple(StringIO(layer_bytes), json.dumps(image)) self.conduct('PUT', '/v1/images/%s/checksum' % image['id'], headers={'X-Docker-Checksum-Payload': checksum}) # PUT /v1/repositories/{namespace}/{repository}/tags/latest self.conduct('PUT', '/v1/repositories/%s/%s/tags/latest' % (namespace, repository), data='"' + images[0]['id'] + '"') # PUT /v1/repositories/{namespace}/{repository}/images self.conduct('PUT', '/v1/repositories/%s/%s/images' % (namespace, repository), expected_code=204) def do_pull(self, namespace, repository, username=None, password='password', expected_code=200): auth = None if username: auth = (username, password) # Ping! self.ping() prefix = '/v1/repositories/%s/%s/' % (namespace, repository) # GET /v1/repositories/{namespace}/{repository}/ self.conduct('GET', prefix + 'images', auth=auth, expected_code=expected_code) if expected_code != 200: return # GET /v1/repositories/{namespace}/{repository}/ result = json.loads(self.conduct('GET', prefix + 'tags').text) for image_id in result.values(): # /v1/images/{imageID}/{ancestry, json, layer} image_prefix = '/v1/images/%s/' % image_id self.conduct('GET', image_prefix + 'ancestry') self.conduct('GET', image_prefix + 'json') self.conduct('GET', image_prefix + 'layer') def clearSession(self): self.signature = None self.docker_token = 'true' class V2RegistryMixin(object): def conduct(self, method, url, headers=None, params=None, data=None, auth=None, expected_code=200): headers = headers or {} params = params or {} params['_csrf_token'] = self.csrf_token if self.docker_token and not auth: headers['Authorization'] = 'Bearer ' + self.docker_token response = self.session.request(method, self.get_server_url() + url, headers=headers, data=data, auth=auth, params=params) if response.status_code != expected_code: print response.text self.assertEquals(response.status_code, expected_code) return response def ping(self): self.conduct('GET', '/v2/', expected_code=200 if self.docker_token else 401) def do_auth(self, username, password, namespace, repository, expected_code=200, scopes=[]): auth = (username, password) params = { 'account': username, 'scope': 'repository:%s/%s:%s' % (namespace, repository, ','.join(scopes)), 'service': 'quay' } response = self.conduct('GET', '/v2/auth', params=params, auth=(username, password), expected_code=expected_code) if expected_code == 200: response_json = json.loads(response.text) self.assertIsNotNone(response_json.get('token')) self.docker_token = response_json['token'] def do_push(self, namespace, repository, username, password, images): # Ping! self.ping() # Auth. self.do_auth(username, password, namespace, repository, scopes=['push', 'pull']) # Build a fake manifest. images = [('somelayer', 'some fake data')] tag_name = 'latest' builder = SignedManifestBuilder(namespace, repository, tag_name) for image_id, contents in images: checksum = 'sha256:' + hashlib.sha256(contents).hexdigest() builder.add_layer(checksum, json.dumps({'id': image_id, 'data': contents})) # Push the image's layers. for image_id, contents in images: # Layer data should not yet exist. checksum = 'sha256:' + hashlib.sha256(contents).hexdigest() self.conduct('HEAD', '/v2/%s/%s/blobs/%s' % (namespace, repository, checksum), expected_code=404) # Start a new upload of the layer data. response = self.conduct('POST', '/v2/%s/%s/blobs/uploads/' % (namespace, repository), expected_code=202) location = response.headers['Location'][len(self.get_server_url()):] # PATCH the image data into the layer. self.conduct('PATCH', location, data=contents, expected_code=204) # Finish the layer upload with a PUT. self.conduct('PUT', location, params=dict(digest=checksum), expected_code=201) # Write the manifest. new_key = RSA.generate(2048) jwk = RSAKey(key=new_key) manifest = builder.build(jwk) self.conduct('PUT', '/v2/%s/%s/manifests/%s' % (namespace, repository, tag_name), data=manifest.bytes, expected_code=202, headers={'Content-Type': 'application/json'}) def do_pull(self, namespace, repository, username=None, password='password', expected_code=200): auth = None if username: auth = (username, password) # Ping! self.ping() # Auth. self.do_auth(username, password, namespace, repository, scopes=['pull'], expected_code=expected_code) if expected_code != 200: return # Retrieve the manifest for the tag. tag_name = 'latest' response = self.conduct('GET', '/v2/%s/%s/manifests/%s' % (namespace, repository, tag_name)) manifest_data = json.loads(response.text) for layer in manifest_data['fsLayers']: blob_id = layer['blobSum'] self.conduct('GET', '/v2/%s/%s/blobs/%s' % (namespace, repository, blob_id), expected_code=200) def clearSession(self): self.docker_token = None class RegistryTestCaseMixin(object): maxDiff = None def create_app(self): app.config['TESTING'] = True app.config['DEBUG'] = True return app def setUp(self): # Note: We cannot use the normal savepoint-based DB setup here because we are accessing # different app instances remotely via a live webserver, which is multiprocess. Therefore, we # completely clear the database between tests. wipe_database() initialize_database() populate_database() self.clearTestSession() def clearTestSession(self): self.session = requests.Session() self.clearSession() # Load the CSRF token. self.csrf_token = '' self.csrf_token = self.conduct('GET', '/__test/csrf').text def conduct_api_login(self, username, password): self.conduct('POST', '/api/v1/signin', data=json.dumps(dict(username=username, password=password)), headers={'Content-Type': 'application/json'}) def change_repo_visibility(self, repository, namespace, visibility): self.conduct('POST', '/api/v1/repository/%s/%s/changevisibility' % (repository, namespace), data=json.dumps(dict(visibility=visibility)), headers={'Content-Type': 'application/json'}) class RegistryTestsMixin(object): def test_pull_publicrepo_anonymous(self): # Add a new repository under the public user, so we have a real repository to pull. images = [{ 'id': 'onlyimagehere' }] self.do_push('public', 'newrepo', 'public', 'password', images) self.clearSession() # First try to pull the (currently private) repo anonymously, which should fail (since it is # private) self.do_pull('public', 'newrepo', expected_code=403) # Make the repository public. self.conduct_api_login('public', 'password') self.change_repo_visibility('public', 'newrepo', 'public') self.clearSession() # Pull the repository anonymously, which should succeed because the repository is public. self.do_pull('public', 'newrepo') def test_pull_publicrepo_devtable(self): # Add a new repository under the public user, so we have a real repository to pull. images = [{ 'id': 'onlyimagehere' }] self.do_push('public', 'newrepo', 'public', 'password', images) self.clearSession() # First try to pull the (currently private) repo as devtable, which should fail as it belongs # to public. self.do_pull('public', 'newrepo', 'devtable', 'password', expected_code=403) # Make the repository public. self.conduct_api_login('public', 'password') self.change_repo_visibility('public', 'newrepo', 'public') self.clearSession() # Pull the repository as devtable, which should succeed because the repository is public. self.do_pull('public', 'newrepo', 'devtable', 'password') def test_pull_private_repo(self): # Add a new repository under the devtable user, so we have a real repository to pull. images = [{ 'id': 'onlyimagehere' }] self.do_push('devtable', 'newrepo', 'devtable', 'password', images) self.clearSession() # First try to pull the (currently private) repo as public, which should fail as it belongs # to devtable. self.do_pull('devtable', 'newrepo', 'public', 'password', expected_code=403) # Pull the repository as devtable, which should succeed because the repository is owned by # devtable. self.do_pull('devtable', 'newrepo', 'devtable', 'password') def test_public_no_anonymous_access_with_auth(self): # Turn off anonymous access. with TestFeature(self, 'ANONYMOUS_ACCESS', False): # Add a new repository under the public user, so we have a real repository to pull. images = [{ 'id': 'onlyimagehere' }] self.do_push('public', 'newrepo', 'public', 'password', images) self.clearSession() # First try to pull the (currently private) repo as devtable, which should fail as it belongs # to public. self.do_pull('public', 'newrepo', 'devtable', 'password', expected_code=403) # Make the repository public. self.conduct_api_login('public', 'password') self.change_repo_visibility('public', 'newrepo', 'public') self.clearSession() # Pull the repository as devtable, which should succeed because the repository is public. self.do_pull('public', 'newrepo', 'devtable', 'password') def test_private_no_anonymous_access(self): # Turn off anonymous access. with TestFeature(self, 'ANONYMOUS_ACCESS', False): # Add a new repository under the public user, so we have a real repository to pull. images = [{ 'id': 'onlyimagehere' }] self.do_push('public', 'newrepo', 'public', 'password', images) self.clearSession() # First try to pull the (currently private) repo as devtable, which should fail as it belongs # to public. self.do_pull('public', 'newrepo', 'devtable', 'password', expected_code=403) # Pull the repository as public, which should succeed because the repository is owned by public. self.do_pull('public', 'newrepo', 'public', 'password') def test_public_no_anonymous_access_no_auth(self): # Turn off anonymous access. with TestFeature(self, 'ANONYMOUS_ACCESS', False): # Add a new repository under the public user, so we have a real repository to pull. images = [{ 'id': 'onlyimagehere' }] self.do_push('public', 'newrepo', 'public', 'password', images) self.clearSession() # First try to pull the (currently private) repo as anonymous, which should fail as it # is private. self.do_pull('public', 'newrepo', expected_code=401) # Make the repository public. self.conduct_api_login('public', 'password') self.change_repo_visibility('public', 'newrepo', 'public') self.clearSession() # Try again to pull the (currently public) repo as anonymous, which should fail as # anonymous access is disabled. self.do_pull('public', 'newrepo', expected_code=401) # Pull the repository as public, which should succeed because the repository is owned by public. self.do_pull('public', 'newrepo', 'public', 'password') # Pull the repository as devtable, which should succeed because the repository is public. self.do_pull('public', 'newrepo', 'devtable', 'password') def test_create_repo_creator_user(self): images = [{ 'id': 'onlyimagehere' }] self.do_push('buynlarge', 'newrepo', 'creator', 'password', images) # Pull the repository as devtable, which should succeed because the repository is owned by the # org. self.do_pull('buynlarge', 'newrepo', 'devtable', 'password') def test_create_repo_robot_owner(self): # Lookup the robot's password. self.conduct_api_login('devtable', 'password') resp = self.conduct('GET', '/api/v1/organization/buynlarge/robots/ownerbot') robot_token = json.loads(resp.text)['token'] images = [{ 'id': 'onlyimagehere' }] self.do_push('buynlarge', 'newrepo', 'buynlarge+ownerbot', robot_token, images) # Pull the repository as devtable, which should succeed because the repository is owned by the # org. self.do_pull('buynlarge', 'newrepo', 'devtable', 'password') def test_create_repo_robot_creator(self): # Lookup the robot's password. self.conduct_api_login('devtable', 'password') resp = self.conduct('GET', '/api/v1/organization/buynlarge/robots/creatorbot') robot_token = json.loads(resp.text)['token'] images = [{ 'id': 'onlyimagehere' }] self.do_push('buynlarge', 'newrepo', 'buynlarge+creatorbot', robot_token, images) # Pull the repository as devtable, which should succeed because the repository is owned by the # org. self.do_pull('buynlarge', 'newrepo', 'devtable', 'password') class V1RegistryTests(V1RegistryMixin, RegistryTestsMixin, RegistryTestCaseMixin, LiveServerTestCase): """ Tests for V1 registry. """ class V2RegistryTests(V2RegistryMixin, RegistryTestsMixin, RegistryTestCaseMixin, LiveServerTestCase): """ Tests for V2 registry. """ if __name__ == '__main__': unittest.main()