458 lines
		
	
	
	
		
			16 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			458 lines
		
	
	
	
		
			16 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import unittest
 | |
| import requests
 | |
| 
 | |
| from flask import request, jsonify
 | |
| from flask.blueprints import Blueprint
 | |
| from flask.ext.testing import LiveServerTestCase
 | |
| 
 | |
| from app import app
 | |
| from data.database import close_db_filter, configure
 | |
| from endpoints.v1 import v1_bp
 | |
| from endpoints.api import api_bp
 | |
| from initdb import wipe_database, initialize_database, populate_database
 | |
| from endpoints.csrf import generate_csrf_token
 | |
| from tempfile import NamedTemporaryFile
 | |
| 
 | |
| import endpoints.decorated
 | |
| import json
 | |
| import features
 | |
| 
 | |
| import tarfile
 | |
| import shutil
 | |
| 
 | |
| from cStringIO import StringIO
 | |
| from digest.checksums import compute_simple
 | |
| 
 | |
| try:
 | |
|   app.register_blueprint(v1_bp, url_prefix='/v1')
 | |
|   app.register_blueprint(api_bp, url_prefix='/api')
 | |
| except ValueError:
 | |
|   # Blueprint was already registered
 | |
|   pass
 | |
| 
 | |
| 
 | |
| # Add a test blueprint for generating CSRF tokens, setting feature flags and reloading the
 | |
| # DB connection.
 | |
| 
 | |
| testbp = Blueprint('testbp', __name__)
 | |
| 
 | |
| @testbp.route('/csrf', methods=['GET'])
 | |
| def generate_csrf():
 | |
|   return generate_csrf_token()
 | |
| 
 | |
| @testbp.route('/feature/<feature_name>', methods=['POST'])
 | |
| def set_feature(feature_name):
 | |
|   import features
 | |
|   old_value = features._FEATURES[feature_name].value
 | |
|   features._FEATURES[feature_name].value = request.get_json()['value']
 | |
|   return jsonify({'old_value': old_value})
 | |
| 
 | |
| @testbp.route('/reloaddb', methods=['POST'])
 | |
| def reload_db():
 | |
|   # Close any existing connection.
 | |
|   close_db_filter(None)
 | |
| 
 | |
|   # Reload the database config.
 | |
|   configure(app.config)
 | |
|   return 'OK'
 | |
| 
 | |
| app.register_blueprint(testbp, url_prefix='/__test')
 | |
| 
 | |
| 
 | |
| class TestFeature(object):
 | |
|   """ Helper object which temporarily sets the value of a feature flag.
 | |
|   """
 | |
| 
 | |
|   def __init__(self, test_case, feature_flag, test_value):
 | |
|     self.test_case = test_case
 | |
|     self.feature_flag = feature_flag
 | |
|     self.test_value = test_value
 | |
|     self.old_value = None
 | |
| 
 | |
|   def __enter__(self):
 | |
|     result = self.test_case.conduct('POST', '/__test/feature/' + self.feature_flag,
 | |
|                                     data=json.dumps(dict(value=self.test_value)),
 | |
|                                     headers={'Content-Type': 'application/json'})
 | |
| 
 | |
|     result_data = json.loads(result.text)
 | |
|     self.old_value = result_data['old_value']
 | |
| 
 | |
|   def __exit__(self, type, value, traceback):
 | |
|     self.test_case.conduct('POST', '/__test/feature/' + self.feature_flag,
 | |
|                            data=json.dumps(dict(value=self.old_value)),
 | |
|                            headers={'Content-Type': 'application/json'})
 | |
| 
 | |
| _PORT_NUMBER = 5001
 | |
| _CLEAN_DATABASE_PATH = None
 | |
| 
 | |
| def get_new_database_uri():
 | |
|   # If a clean copy of the database has not yet been created, create one now.
 | |
|   global _CLEAN_DATABASE_PATH
 | |
|   if not _CLEAN_DATABASE_PATH:
 | |
|     wipe_database()
 | |
|     initialize_database()
 | |
|     populate_database()
 | |
|     close_db_filter(None)
 | |
| 
 | |
|     # Save the path of the clean database.
 | |
|     _CLEAN_DATABASE_PATH = app.config['TEST_DB_FILE'].name
 | |
| 
 | |
|   # Create a new temp file to be used as the actual backing database for the test.
 | |
|   # Note that we have the close() the file to ensure we can copy to it via shutil.
 | |
|   local_db_file = NamedTemporaryFile(delete=True)
 | |
|   local_db_file.close()
 | |
| 
 | |
|   # Copy the clean database to the path.
 | |
|   shutil.copy2(_CLEAN_DATABASE_PATH, local_db_file.name)
 | |
|   return 'sqlite:///{0}'.format(local_db_file.name)
 | |
| 
 | |
| class RegistryTestCase(LiveServerTestCase):
 | |
|   maxDiff = None
 | |
| 
 | |
|   def create_app(self):
 | |
|     global _PORT_NUMBER
 | |
|     _PORT_NUMBER = _PORT_NUMBER + 1
 | |
|     app.config['TESTING'] = True
 | |
|     app.config['LIVESERVER_PORT'] = _PORT_NUMBER
 | |
|     app.config['DB_URI'] = get_new_database_uri()
 | |
|     return app
 | |
| 
 | |
|   def setUp(self):
 | |
|     self.clearSession()
 | |
| 
 | |
|     # Tell the remote running app to reload the database. By default, the app forks from the
 | |
|     # current context and has already loaded the DB config with the *original* DB URL. We call
 | |
|     # the remote reload method to force it to pick up the changes to DB_URI set in the create_app
 | |
|     # method.
 | |
|     self.conduct('POST', '/__test/reloaddb')
 | |
| 
 | |
|   def clearSession(self):
 | |
|     self.session = requests.Session()
 | |
|     self.signature = None
 | |
|     self.docker_token = 'true'
 | |
| 
 | |
|     # Load the CSRF token.
 | |
|     self.csrf_token = ''
 | |
|     self.csrf_token = self.conduct('GET', '/__test/csrf').text
 | |
| 
 | |
|   def conduct(self, method, url, headers=None, data=None, auth=None, expected_code=200):
 | |
|     headers = headers or {}
 | |
|     headers['X-Docker-Token'] = self.docker_token
 | |
| 
 | |
|     if self.signature and not auth:
 | |
|       headers['Authorization'] = 'token ' + self.signature
 | |
| 
 | |
|     response = self.session.request(method, self.get_server_url() + url, headers=headers, data=data,
 | |
|                                     auth=auth, params=dict(_csrf_token=self.csrf_token))
 | |
|     if response.status_code != expected_code:
 | |
|       print response.text
 | |
| 
 | |
|     if 'www-authenticate' in response.headers:
 | |
|       self.signature = response.headers['www-authenticate']
 | |
| 
 | |
|     if 'X-Docker-Token' in response.headers:
 | |
|       self.docker_token = response.headers['X-Docker-Token']
 | |
| 
 | |
|     self.assertEquals(response.status_code, expected_code)
 | |
|     return response
 | |
| 
 | |
|   def ping(self):
 | |
|     self.conduct('GET', '/v1/_ping')
 | |
| 
 | |
|   def do_login(self, username, password='password'):
 | |
|     self.ping()
 | |
|     result = self.conduct('POST', '/v1/users/',
 | |
|                            data=json.dumps(dict(username=username, password=password,
 | |
|                                                 email='bar@example.com')),
 | |
|                            headers={"Content-Type": "application/json"},
 | |
|                            expected_code=400)
 | |
| 
 | |
|     self.assertEquals(result.text, '"Username or email already exists"')
 | |
|     self.conduct('GET', '/v1/users/', auth=(username, password))
 | |
| 
 | |
|   def do_push(self, namespace, repository, username, password, images, expected_code=201):
 | |
|     auth = (username, password)
 | |
| 
 | |
|     # Ping!
 | |
|     self.ping()
 | |
| 
 | |
|     # PUT /v1/repositories/{namespace}/{repository}/
 | |
|     data = [{"id": image['id']} for image in images]
 | |
|     self.conduct('PUT', '/v1/repositories/%s/%s' % (namespace, repository),
 | |
|                  data=json.dumps(data), auth=auth,
 | |
|                  expected_code=expected_code)
 | |
| 
 | |
|     if expected_code != 201:
 | |
|       return
 | |
| 
 | |
|     for image in images:
 | |
|       # PUT /v1/images/{imageID}/json
 | |
|       self.conduct('PUT', '/v1/images/%s/json' % image['id'], data=json.dumps(image))
 | |
| 
 | |
|       # PUT /v1/images/{imageID}/layer
 | |
|       tar_file_info = tarfile.TarInfo(name='image_name')
 | |
|       tar_file_info.type = tarfile.REGTYPE
 | |
|       tar_file_info.size = len(image['id'])
 | |
| 
 | |
|       layer_data = StringIO()
 | |
| 
 | |
|       tar_file = tarfile.open(fileobj=layer_data, mode='w|gz')
 | |
|       tar_file.addfile(tar_file_info, StringIO(image['id']))
 | |
|       tar_file.close()
 | |
| 
 | |
|       layer_bytes = layer_data.getvalue()
 | |
|       layer_data.close()
 | |
| 
 | |
|       self.conduct('PUT', '/v1/images/%s/layer' % image['id'], data=StringIO(layer_bytes))
 | |
| 
 | |
|       # PUT /v1/images/{imageID}/checksum
 | |
|       checksum = compute_simple(StringIO(layer_bytes), json.dumps(image))
 | |
|       self.conduct('PUT', '/v1/images/%s/checksum' % image['id'],
 | |
|                    headers={'X-Docker-Checksum-Payload': checksum})
 | |
| 
 | |
| 
 | |
|     # PUT /v1/repositories/{namespace}/{repository}/tags/latest
 | |
|     self.do_tag(namespace, repository, 'latest', images[0]['id'])
 | |
| 
 | |
|     # PUT /v1/repositories/{namespace}/{repository}/images
 | |
|     self.conduct('PUT', '/v1/repositories/%s/%s/images' % (namespace, repository),
 | |
|                  expected_code=204)
 | |
| 
 | |
| 
 | |
|   def do_pull(self, namespace, repository, username=None, password='password', expected_code=200):
 | |
|     auth = None
 | |
|     if username:
 | |
|       auth = (username, password)
 | |
| 
 | |
|     # Ping!
 | |
|     self.ping()
 | |
| 
 | |
|     prefix = '/v1/repositories/%s/%s/' % (namespace, repository)
 | |
| 
 | |
|     # GET /v1/repositories/{namespace}/{repository}/
 | |
|     self.conduct('GET', prefix + 'images', auth=auth, expected_code=expected_code)
 | |
|     if expected_code != 200:
 | |
|       # Push was expected to fail, so nothing more to do for the push.
 | |
|       return
 | |
| 
 | |
|     # GET /v1/repositories/{namespace}/{repository}/
 | |
|     result = json.loads(self.conduct('GET', prefix + 'tags').text)
 | |
| 
 | |
|     for image_id in result.values():
 | |
|       # /v1/images/{imageID}/{ancestry, json, layer}
 | |
|       image_prefix = '/v1/images/%s/' % image_id
 | |
|       self.conduct('GET', image_prefix + 'ancestry')
 | |
|       self.conduct('GET', image_prefix + 'json')
 | |
|       self.conduct('GET', image_prefix + 'layer')
 | |
| 
 | |
|   def do_tag(self, namespace, repository, tag, image_id, expected_code=200):
 | |
|     self.conduct('PUT', '/v1/repositories/%s/%s/tags/%s' % (namespace, repository, tag),
 | |
|                  data='"%s"' % image_id, expected_code=expected_code)
 | |
| 
 | |
|   def conduct_api_login(self, username, password):
 | |
|     self.conduct('POST', '/api/v1/signin',
 | |
|                  data=json.dumps(dict(username=username, password=password)),
 | |
|                  headers={'Content-Type': 'application/json'})
 | |
| 
 | |
|   def change_repo_visibility(self, repository, namespace, visibility):
 | |
|     self.conduct('POST', '/api/v1/repository/%s/%s/changevisibility' % (repository, namespace),
 | |
|                  data=json.dumps(dict(visibility=visibility)),
 | |
|                  headers={'Content-Type': 'application/json'})
 | |
| 
 | |
| 
 | |
| class RegistryTests(RegistryTestCase):
 | |
|   def test_push_reponame_with_slashes(self):
 | |
|     # Attempt to add a repository name with slashes. This should fail as we do not support it.
 | |
|     images = [{
 | |
|       'id': 'onlyimagehere'
 | |
|     }]
 | |
|     self.do_push('public', 'newrepo/somesubrepo', 'public', 'password', images, expected_code=400)
 | |
| 
 | |
|   def test_pull_publicrepo_anonymous(self):
 | |
|     # Add a new repository under the public user, so we have a real repository to pull.
 | |
|     images = [{
 | |
|       'id': 'onlyimagehere'
 | |
|     }]
 | |
|     self.do_push('public', 'newrepo', 'public', 'password', images)
 | |
|     self.clearSession()
 | |
| 
 | |
|     # First try to pull the (currently private) repo anonymously, which should fail (since it is
 | |
|     # private)
 | |
|     self.do_pull('public', 'newrepo', expected_code=403)
 | |
| 
 | |
|     # Make the repository public.
 | |
|     self.conduct_api_login('public', 'password')
 | |
|     self.change_repo_visibility('public', 'newrepo', 'public')
 | |
|     self.clearSession()
 | |
| 
 | |
|     # Pull the repository anonymously, which should succeed because the repository is public.
 | |
|     self.do_pull('public', 'newrepo')
 | |
| 
 | |
| 
 | |
|   def test_pull_publicrepo_devtable(self):
 | |
|     # Add a new repository under the public user, so we have a real repository to pull.
 | |
|     images = [{
 | |
|       'id': 'onlyimagehere'
 | |
|     }]
 | |
|     self.do_push('public', 'newrepo', 'public', 'password', images)
 | |
|     self.clearSession()
 | |
| 
 | |
|     # First try to pull the (currently private) repo as devtable, which should fail as it belongs
 | |
|     # to public.
 | |
|     self.do_pull('public', 'newrepo', 'devtable', 'password', expected_code=403)
 | |
| 
 | |
|     # Make the repository public.
 | |
|     self.conduct_api_login('public', 'password')
 | |
|     self.change_repo_visibility('public', 'newrepo', 'public')
 | |
|     self.clearSession()
 | |
| 
 | |
|     # Pull the repository as devtable, which should succeed because the repository is public.
 | |
|     self.do_pull('public', 'newrepo', 'devtable', 'password')
 | |
| 
 | |
| 
 | |
|   def test_pull_private_repo(self):
 | |
|     # Add a new repository under the devtable user, so we have a real repository to pull.
 | |
|     images = [{
 | |
|       'id': 'onlyimagehere'
 | |
|     }]
 | |
|     self.do_push('devtable', 'newrepo', 'devtable', 'password', images)
 | |
|     self.clearSession()
 | |
| 
 | |
|     # First try to pull the (currently private) repo as public, which should fail as it belongs
 | |
|     # to devtable.
 | |
|     self.do_pull('devtable', 'newrepo', 'public', 'password', expected_code=403)
 | |
| 
 | |
|     # Pull the repository as devtable, which should succeed because the repository is owned by
 | |
|     # devtable.
 | |
|     self.do_pull('devtable', 'newrepo', 'devtable', 'password')
 | |
| 
 | |
| 
 | |
|   def test_public_no_anonymous_access_with_auth(self):
 | |
|     # Turn off anonymous access.
 | |
|     with TestFeature(self, 'ANONYMOUS_ACCESS', False):
 | |
|       # Add a new repository under the public user, so we have a real repository to pull.
 | |
|       images = [{
 | |
|         'id': 'onlyimagehere'
 | |
|       }]
 | |
|       self.do_push('public', 'newrepo', 'public', 'password', images)
 | |
|       self.clearSession()
 | |
| 
 | |
|       # First try to pull the (currently private) repo as devtable, which should fail as it belongs
 | |
|       # to public.
 | |
|       self.do_pull('public', 'newrepo', 'devtable', 'password', expected_code=403)
 | |
| 
 | |
|       # Make the repository public.
 | |
|       self.conduct_api_login('public', 'password')
 | |
|       self.change_repo_visibility('public', 'newrepo', 'public')
 | |
|       self.clearSession()
 | |
| 
 | |
|       # Pull the repository as devtable, which should succeed because the repository is public.
 | |
|       self.do_pull('public', 'newrepo', 'devtable', 'password')
 | |
| 
 | |
| 
 | |
|   def test_private_no_anonymous_access(self):
 | |
|     # Turn off anonymous access.
 | |
|     with TestFeature(self, 'ANONYMOUS_ACCESS', False):
 | |
|       # Add a new repository under the public user, so we have a real repository to pull.
 | |
|       images = [{
 | |
|         'id': 'onlyimagehere'
 | |
|       }]
 | |
|       self.do_push('public', 'newrepo', 'public', 'password', images)
 | |
|       self.clearSession()
 | |
| 
 | |
|       # First try to pull the (currently private) repo as devtable, which should fail as it belongs
 | |
|       # to public.
 | |
|       self.do_pull('public', 'newrepo', 'devtable', 'password', expected_code=403)
 | |
| 
 | |
|       # Pull the repository as public, which should succeed because the repository is owned by public.
 | |
|       self.do_pull('public', 'newrepo', 'public', 'password')
 | |
| 
 | |
| 
 | |
|   def test_public_no_anonymous_access_no_auth(self):
 | |
|     # Turn off anonymous access.
 | |
|     with TestFeature(self, 'ANONYMOUS_ACCESS', False):
 | |
|       # Add a new repository under the public user, so we have a real repository to pull.
 | |
|       images = [{
 | |
|         'id': 'onlyimagehere'
 | |
|       }]
 | |
|       self.do_push('public', 'newrepo', 'public', 'password', images)
 | |
|       self.clearSession()
 | |
| 
 | |
|       # First try to pull the (currently private) repo as anonymous, which should fail as it
 | |
|       # is private.
 | |
|       self.do_pull('public', 'newrepo', expected_code=401)
 | |
| 
 | |
|       # Make the repository public.
 | |
|       self.conduct_api_login('public', 'password')
 | |
|       self.change_repo_visibility('public', 'newrepo', 'public')
 | |
|       self.clearSession()
 | |
| 
 | |
|       # Try again to pull the (currently public) repo as anonymous, which should fail as
 | |
|       # anonymous access is disabled.
 | |
|       self.do_pull('public', 'newrepo', expected_code=401)
 | |
| 
 | |
|       # Pull the repository as public, which should succeed because the repository is owned by public.
 | |
|       self.do_pull('public', 'newrepo', 'public', 'password')
 | |
| 
 | |
|       # Pull the repository as devtable, which should succeed because the repository is public.
 | |
|       self.do_pull('public', 'newrepo', 'devtable', 'password')
 | |
| 
 | |
| 
 | |
|   def test_create_repo_creator_user(self):
 | |
|     images = [{
 | |
|       'id': 'onlyimagehere'
 | |
|     }]
 | |
|     self.do_push('buynlarge', 'newrepo', 'creator', 'password', images)
 | |
| 
 | |
|     # Pull the repository as devtable, which should succeed because the repository is owned by the
 | |
|     # org.
 | |
|     self.do_pull('buynlarge', 'newrepo', 'devtable', 'password')
 | |
| 
 | |
| 
 | |
|   def test_create_repo_robot_owner(self):
 | |
|     # Lookup the robot's password.
 | |
|     self.conduct_api_login('devtable', 'password')
 | |
|     resp = self.conduct('GET', '/api/v1/organization/buynlarge/robots/ownerbot')
 | |
|     robot_token = json.loads(resp.text)['token']
 | |
| 
 | |
|     images = [{
 | |
|       'id': 'onlyimagehere'
 | |
|     }]
 | |
|     self.do_push('buynlarge', 'newrepo', 'buynlarge+ownerbot', robot_token, images)
 | |
| 
 | |
|     # Pull the repository as devtable, which should succeed because the repository is owned by the
 | |
|     # org.
 | |
|     self.do_pull('buynlarge', 'newrepo', 'devtable', 'password')
 | |
| 
 | |
| 
 | |
|   def test_create_repo_robot_creator(self):
 | |
|     # Lookup the robot's password.
 | |
|     self.conduct_api_login('devtable', 'password')
 | |
|     resp = self.conduct('GET', '/api/v1/organization/buynlarge/robots/creatorbot')
 | |
|     robot_token = json.loads(resp.text)['token']
 | |
| 
 | |
|     images = [{
 | |
|       'id': 'onlyimagehere'
 | |
|     }]
 | |
|     self.do_push('buynlarge', 'newrepo', 'buynlarge+creatorbot', robot_token, images)
 | |
| 
 | |
|     # Pull the repository as devtable, which should succeed because the repository is owned by the
 | |
|     # org.
 | |
|     self.do_pull('buynlarge', 'newrepo', 'devtable', 'password')
 | |
| 
 | |
|   def test_tag_validation(self):
 | |
|     image_id = 'onlyimagehere'
 | |
|     images = [{
 | |
|       'id': image_id
 | |
|     }]
 | |
|     self.do_push('public', 'newrepo', 'public', 'password', images)
 | |
|     self.do_tag('public', 'newrepo', '1', image_id)
 | |
|     self.do_tag('public', 'newrepo', 'x' * 128, image_id)
 | |
|     self.do_tag('public', 'newrepo', '', image_id, expected_code=400)
 | |
|     self.do_tag('public', 'newrepo', 'x' * 129, image_id, expected_code=400)
 | |
|     self.do_tag('public', 'newrepo', '.fail', image_id, expected_code=400)
 | |
|     self.do_tag('public', 'newrepo', '-fail', image_id, expected_code=400)
 | |
| 
 | |
| 
 | |
| if __name__ == '__main__':
 | |
|   unittest.main()
 |