389 lines
		
	
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			389 lines
		
	
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import unittest
 | |
| import requests
 | |
| 
 | |
| from flask import request, jsonify
 | |
| from flask.blueprints import Blueprint
 | |
| from flask.ext.testing import LiveServerTestCase
 | |
| 
 | |
| from app import app
 | |
| from endpoints.v1 import v1_bp
 | |
| from endpoints.api import api_bp
 | |
| from initdb import wipe_database, initialize_database, populate_database
 | |
| from endpoints.csrf import generate_csrf_token
 | |
| 
 | |
| import endpoints.decorated
 | |
| import json
 | |
| import features
 | |
| 
 | |
| import tarfile
 | |
| 
 | |
| from cStringIO import StringIO
 | |
| from digest.checksums import compute_simple
 | |
| 
 | |
| try:
 | |
|   app.register_blueprint(v1_bp, url_prefix='/v1')
 | |
|   app.register_blueprint(api_bp, url_prefix='/api')
 | |
| except ValueError:
 | |
|   # Blueprint was already registered
 | |
|   pass
 | |
| 
 | |
| 
 | |
| # Add a test blueprint for generating CSRF tokens and setting feature flags.
 | |
| testbp = Blueprint('testbp', __name__)
 | |
| 
 | |
| @testbp.route('/csrf', methods=['GET'])
 | |
| def generate_csrf():
 | |
|   return generate_csrf_token()
 | |
| 
 | |
| @testbp.route('/feature/<feature_name>', methods=['POST'])
 | |
| def set_feature(feature_name):
 | |
|   import features
 | |
|   old_value = features._FEATURES[feature_name].value
 | |
|   features._FEATURES[feature_name].value = request.get_json()['value']
 | |
|   return jsonify({'old_value': old_value})
 | |
| 
 | |
| app.register_blueprint(testbp, url_prefix='/__test')
 | |
| 
 | |
| 
 | |
| class TestFeature(object):
 | |
|   """ Helper object which temporarily sets the value of a feature flag.
 | |
|   """
 | |
| 
 | |
|   def __init__(self, test_case, feature_flag, test_value):
 | |
|     self.test_case = test_case
 | |
|     self.feature_flag = feature_flag
 | |
|     self.test_value = test_value
 | |
|     self.old_value = None
 | |
| 
 | |
|   def __enter__(self):
 | |
|     result = self.test_case.conduct('POST', '/__test/feature/' + self.feature_flag,
 | |
|                                     data=json.dumps(dict(value=self.test_value)),
 | |
|                                     headers={'Content-Type': 'application/json'})
 | |
| 
 | |
|     result_data = json.loads(result.text)
 | |
|     self.old_value = result_data['old_value']
 | |
| 
 | |
|   def __exit__(self, type, value, traceback):
 | |
|     self.test_case.conduct('POST', '/__test/feature/' + self.feature_flag,
 | |
|                            data=json.dumps(dict(value=self.old_value)),
 | |
|                            headers={'Content-Type': 'application/json'})
 | |
| 
 | |
| class RegistryTestCase(LiveServerTestCase):
 | |
|   maxDiff = None
 | |
| 
 | |
|   def create_app(self):
 | |
|     app.config['TESTING'] = True
 | |
|     return app
 | |
| 
 | |
|   def setUp(self):
 | |
|     # Note: We cannot use the normal savepoint-based DB setup here because we are accessing
 | |
|     # different app instances remotely via a live webserver, which is multiprocess. Therefore, we
 | |
|     # completely clear the database between tests.
 | |
|     wipe_database()
 | |
|     initialize_database()
 | |
|     populate_database()
 | |
| 
 | |
|     self.clearSession()
 | |
| 
 | |
|   def clearSession(self):
 | |
|     self.session = requests.Session()
 | |
|     self.signature = None
 | |
|     self.docker_token = 'true'
 | |
| 
 | |
|     # Load the CSRF token.
 | |
|     self.csrf_token = ''
 | |
|     self.csrf_token = self.conduct('GET', '/__test/csrf').text
 | |
| 
 | |
|   def conduct(self, method, url, headers=None, data=None, auth=None, expected_code=200):
 | |
|     headers = headers or {}
 | |
|     headers['X-Docker-Token'] = self.docker_token
 | |
| 
 | |
|     if self.signature and not auth:
 | |
|       headers['Authorization'] = 'token ' + self.signature
 | |
| 
 | |
|     response = self.session.request(method, self.get_server_url() + url, headers=headers, data=data,
 | |
|                                     auth=auth, params=dict(_csrf_token=self.csrf_token))
 | |
|     if response.status_code != expected_code:
 | |
|       print response.text
 | |
| 
 | |
|     if 'www-authenticate' in response.headers:
 | |
|       self.signature = response.headers['www-authenticate']
 | |
| 
 | |
|     if 'X-Docker-Token' in response.headers:
 | |
|       self.docker_token = response.headers['X-Docker-Token']
 | |
| 
 | |
|     self.assertEquals(response.status_code, expected_code)
 | |
|     return response
 | |
| 
 | |
|   def ping(self):
 | |
|     self.conduct('GET', '/v1/_ping')
 | |
| 
 | |
|   def do_login(self, username, password='password'):
 | |
|     self.ping()
 | |
|     result = self.conduct('POST', '/v1/users/',
 | |
|                            data=json.dumps(dict(username=username, password=password,
 | |
|                                                 email='bar@example.com')),
 | |
|                            headers={"Content-Type": "application/json"},
 | |
|                            expected_code=400)
 | |
| 
 | |
|     self.assertEquals(result.text, '"Username or email already exists"')
 | |
|     self.conduct('GET', '/v1/users/', auth=(username, password))
 | |
| 
 | |
|   def do_push(self, namespace, repository, username, password, images):
 | |
|     auth = (username, password)
 | |
| 
 | |
|     # Ping!
 | |
|     self.ping()
 | |
| 
 | |
|     # PUT /v1/repositories/{namespace}/{repository}/
 | |
|     data = [{"id": image['id']} for image in images]
 | |
|     self.conduct('PUT', '/v1/repositories/%s/%s' % (namespace, repository),
 | |
|                  data=json.dumps(data), auth=auth,
 | |
|                  expected_code=201)
 | |
| 
 | |
|     for image in images:
 | |
|       # PUT /v1/images/{imageID}/json
 | |
|       self.conduct('PUT', '/v1/images/%s/json' % image['id'], data=json.dumps(image))
 | |
| 
 | |
|       # PUT /v1/images/{imageID}/layer
 | |
|       tar_file_info = tarfile.TarInfo(name='image_name')
 | |
|       tar_file_info.type = tarfile.REGTYPE
 | |
|       tar_file_info.size = len(image['id'])
 | |
| 
 | |
|       layer_data = StringIO()
 | |
| 
 | |
|       tar_file = tarfile.open(fileobj=layer_data, mode='w|gz')
 | |
|       tar_file.addfile(tar_file_info, StringIO(image['id']))
 | |
|       tar_file.close()
 | |
| 
 | |
|       layer_bytes = layer_data.getvalue()
 | |
|       layer_data.close()
 | |
| 
 | |
|       self.conduct('PUT', '/v1/images/%s/layer' % image['id'], data=StringIO(layer_bytes))
 | |
| 
 | |
|       # PUT /v1/images/{imageID}/checksum
 | |
|       checksum = compute_simple(StringIO(layer_bytes), json.dumps(image))
 | |
|       self.conduct('PUT', '/v1/images/%s/checksum' % image['id'],
 | |
|                    headers={'X-Docker-Checksum-Payload': checksum})
 | |
| 
 | |
| 
 | |
|     # PUT /v1/repositories/{namespace}/{repository}/tags/latest
 | |
|     self.conduct('PUT', '/v1/repositories/%s/%s/tags/latest' % (namespace, repository),
 | |
|                  data='"' + images[0]['id'] + '"')
 | |
| 
 | |
|     # PUT /v1/repositories/{namespace}/{repository}/images
 | |
|     self.conduct('PUT', '/v1/repositories/%s/%s/images' % (namespace, repository),
 | |
|                  expected_code=204)
 | |
| 
 | |
| 
 | |
|   def do_pull(self, namespace, repository, username=None, password='password', expected_code=200):
 | |
|     auth = None
 | |
|     if username:
 | |
|       auth = (username, password)
 | |
| 
 | |
|     # Ping!
 | |
|     self.ping()
 | |
| 
 | |
|     prefix = '/v1/repositories/%s/%s/' % (namespace, repository)
 | |
| 
 | |
|     # GET /v1/repositories/{namespace}/{repository}/
 | |
|     self.conduct('GET', prefix + 'images', auth=auth, expected_code=expected_code)
 | |
|     if expected_code != 200:
 | |
|       return
 | |
| 
 | |
|     # GET /v1/repositories/{namespace}/{repository}/
 | |
|     result = json.loads(self.conduct('GET', prefix + 'tags').text)
 | |
| 
 | |
|     for image_id in result.values():
 | |
|       # /v1/images/{imageID}/{ancestry, json, layer}
 | |
|       image_prefix = '/v1/images/%s/' % image_id
 | |
|       self.conduct('GET', image_prefix + 'ancestry')
 | |
|       self.conduct('GET', image_prefix + 'json')
 | |
|       self.conduct('GET', image_prefix + 'layer')
 | |
| 
 | |
|   def conduct_api_login(self, username, password):
 | |
|     self.conduct('POST', '/api/v1/signin',
 | |
|                  data=json.dumps(dict(username=username, password=password)),
 | |
|                  headers={'Content-Type': 'application/json'})
 | |
| 
 | |
|   def change_repo_visibility(self, repository, namespace, visibility):
 | |
|     self.conduct('POST', '/api/v1/repository/%s/%s/changevisibility' % (repository, namespace),
 | |
|                  data=json.dumps(dict(visibility=visibility)),
 | |
|                  headers={'Content-Type': 'application/json'})
 | |
| 
 | |
| 
 | |
| class RegistryTests(RegistryTestCase):
 | |
|   def test_pull_publicrepo_anonymous(self):
 | |
|     # Add a new repository under the public user, so we have a real repository to pull.
 | |
|     images = [{
 | |
|       'id': 'onlyimagehere'
 | |
|     }]
 | |
|     self.do_push('public', 'newrepo', 'public', 'password', images)
 | |
|     self.clearSession()
 | |
| 
 | |
|     # First try to pull the (currently private) repo anonymously, which should fail (since it is
 | |
|     # private)
 | |
|     self.do_pull('public', 'newrepo', expected_code=403)
 | |
| 
 | |
|     # Make the repository public.
 | |
|     self.conduct_api_login('public', 'password')
 | |
|     self.change_repo_visibility('public', 'newrepo', 'public')
 | |
|     self.clearSession()
 | |
| 
 | |
|     # Pull the repository anonymously, which should succeed because the repository is public.
 | |
|     self.do_pull('public', 'newrepo')
 | |
| 
 | |
| 
 | |
|   def test_pull_publicrepo_devtable(self):
 | |
|     # Add a new repository under the public user, so we have a real repository to pull.
 | |
|     images = [{
 | |
|       'id': 'onlyimagehere'
 | |
|     }]
 | |
|     self.do_push('public', 'newrepo', 'public', 'password', images)
 | |
|     self.clearSession()
 | |
| 
 | |
|     # First try to pull the (currently private) repo as devtable, which should fail as it belongs
 | |
|     # to public.
 | |
|     self.do_pull('public', 'newrepo', 'devtable', 'password', expected_code=403)
 | |
| 
 | |
|     # Make the repository public.
 | |
|     self.conduct_api_login('public', 'password')
 | |
|     self.change_repo_visibility('public', 'newrepo', 'public')
 | |
|     self.clearSession()
 | |
| 
 | |
|     # Pull the repository as devtable, which should succeed because the repository is public.
 | |
|     self.do_pull('public', 'newrepo', 'devtable', 'password')
 | |
| 
 | |
| 
 | |
|   def test_pull_private_repo(self):
 | |
|     # Add a new repository under the devtable user, so we have a real repository to pull.
 | |
|     images = [{
 | |
|       'id': 'onlyimagehere'
 | |
|     }]
 | |
|     self.do_push('devtable', 'newrepo', 'devtable', 'password', images)
 | |
|     self.clearSession()
 | |
| 
 | |
|     # First try to pull the (currently private) repo as public, which should fail as it belongs
 | |
|     # to devtable.
 | |
|     self.do_pull('devtable', 'newrepo', 'public', 'password', expected_code=403)
 | |
| 
 | |
|     # Pull the repository as devtable, which should succeed because the repository is owned by
 | |
|     # devtable.
 | |
|     self.do_pull('devtable', 'newrepo', 'devtable', 'password')
 | |
| 
 | |
| 
 | |
|   def test_public_no_anonymous_access_with_auth(self):
 | |
|     # Turn off anonymous access.
 | |
|     with TestFeature(self, 'ANONYMOUS_ACCESS', False):
 | |
|       # Add a new repository under the public user, so we have a real repository to pull.
 | |
|       images = [{
 | |
|         'id': 'onlyimagehere'
 | |
|       }]
 | |
|       self.do_push('public', 'newrepo', 'public', 'password', images)
 | |
|       self.clearSession()
 | |
| 
 | |
|       # First try to pull the (currently private) repo as devtable, which should fail as it belongs
 | |
|       # to public.
 | |
|       self.do_pull('public', 'newrepo', 'devtable', 'password', expected_code=403)
 | |
| 
 | |
|       # Make the repository public.
 | |
|       self.conduct_api_login('public', 'password')
 | |
|       self.change_repo_visibility('public', 'newrepo', 'public')
 | |
|       self.clearSession()
 | |
| 
 | |
|       # Pull the repository as devtable, which should succeed because the repository is public.
 | |
|       self.do_pull('public', 'newrepo', 'devtable', 'password')
 | |
| 
 | |
| 
 | |
|   def test_private_no_anonymous_access(self):
 | |
|     # Turn off anonymous access.
 | |
|     with TestFeature(self, 'ANONYMOUS_ACCESS', False):
 | |
|       # Add a new repository under the public user, so we have a real repository to pull.
 | |
|       images = [{
 | |
|         'id': 'onlyimagehere'
 | |
|       }]
 | |
|       self.do_push('public', 'newrepo', 'public', 'password', images)
 | |
|       self.clearSession()
 | |
| 
 | |
|       # First try to pull the (currently private) repo as devtable, which should fail as it belongs
 | |
|       # to public.
 | |
|       self.do_pull('public', 'newrepo', 'devtable', 'password', expected_code=403)
 | |
| 
 | |
|       # Pull the repository as public, which should succeed because the repository is owned by public.
 | |
|       self.do_pull('public', 'newrepo', 'public', 'password')
 | |
| 
 | |
| 
 | |
|   def test_public_no_anonymous_access_no_auth(self):
 | |
|     # Turn off anonymous access.
 | |
|     with TestFeature(self, 'ANONYMOUS_ACCESS', False):
 | |
|       # Add a new repository under the public user, so we have a real repository to pull.
 | |
|       images = [{
 | |
|         'id': 'onlyimagehere'
 | |
|       }]
 | |
|       self.do_push('public', 'newrepo', 'public', 'password', images)
 | |
|       self.clearSession()
 | |
| 
 | |
|       # First try to pull the (currently private) repo as anonymous, which should fail as it
 | |
|       # is private.
 | |
|       self.do_pull('public', 'newrepo', expected_code=401)
 | |
| 
 | |
|       # Make the repository public.
 | |
|       self.conduct_api_login('public', 'password')
 | |
|       self.change_repo_visibility('public', 'newrepo', 'public')
 | |
|       self.clearSession()
 | |
| 
 | |
|       # Try again to pull the (currently public) repo as anonymous, which should fail as
 | |
|       # anonymous access is disabled.
 | |
|       self.do_pull('public', 'newrepo', expected_code=401)
 | |
| 
 | |
|       # Pull the repository as public, which should succeed because the repository is owned by public.
 | |
|       self.do_pull('public', 'newrepo', 'public', 'password')
 | |
| 
 | |
|       # Pull the repository as devtable, which should succeed because the repository is public.
 | |
|       self.do_pull('public', 'newrepo', 'devtable', 'password')
 | |
| 
 | |
| 
 | |
|   def test_create_repo_creator_user(self):
 | |
|     images = [{
 | |
|       'id': 'onlyimagehere'
 | |
|     }]
 | |
|     self.do_push('buynlarge', 'newrepo', 'creator', 'password', images)
 | |
| 
 | |
|     # Pull the repository as devtable, which should succeed because the repository is owned by the
 | |
|     # org.
 | |
|     self.do_pull('buynlarge', 'newrepo', 'devtable', 'password')
 | |
| 
 | |
| 
 | |
|   def test_create_repo_robot_owner(self):
 | |
|     # Lookup the robot's password.
 | |
|     self.conduct_api_login('devtable', 'password')
 | |
|     resp = self.conduct('GET', '/api/v1/organization/buynlarge/robots/ownerbot')
 | |
|     robot_token = json.loads(resp.text)['token']
 | |
| 
 | |
|     images = [{
 | |
|       'id': 'onlyimagehere'
 | |
|     }]
 | |
|     self.do_push('buynlarge', 'newrepo', 'buynlarge+ownerbot', robot_token, images)
 | |
| 
 | |
|     # Pull the repository as devtable, which should succeed because the repository is owned by the
 | |
|     # org.
 | |
|     self.do_pull('buynlarge', 'newrepo', 'devtable', 'password')
 | |
| 
 | |
| 
 | |
|   def test_create_repo_robot_creator(self):
 | |
|     # Lookup the robot's password.
 | |
|     self.conduct_api_login('devtable', 'password')
 | |
|     resp = self.conduct('GET', '/api/v1/organization/buynlarge/robots/creatorbot')
 | |
|     robot_token = json.loads(resp.text)['token']
 | |
| 
 | |
|     images = [{
 | |
|       'id': 'onlyimagehere'
 | |
|     }]
 | |
|     self.do_push('buynlarge', 'newrepo', 'buynlarge+creatorbot', robot_token, images)
 | |
| 
 | |
|     # Pull the repository as devtable, which should succeed because the repository is owned by the
 | |
|     # org.
 | |
|     self.do_pull('buynlarge', 'newrepo', 'devtable', 'password')
 | |
| 
 | |
| if __name__ == '__main__':
 | |
|   unittest.main()
 |