import unittest import requests import os import math import random import string import resumablehashlib import binascii import Crypto.Random from cachetools import lru_cache from flask import request, jsonify from flask.blueprints import Blueprint from flask.ext.testing import LiveServerTestCase from cryptography.x509 import load_pem_x509_certificate from cryptography.hazmat.backends import default_backend from app import app, storage from data.database import close_db_filter, configure from data import model from endpoints.v1 import v1_bp from endpoints.v2 import v2_bp from endpoints.verbs import verbs from endpoints.v2.manifest import SignedManifestBuilder from endpoints.api import api_bp from initdb import wipe_database, initialize_database, populate_database from endpoints.csrf import generate_csrf_token from tempfile import NamedTemporaryFile from jsonschema import validate as validate_schema from util.security import strictjwt import endpoints.decorated import json import features import hashlib import logging import bencode import tarfile import shutil from jwkest.jws import SIGNER_ALGS from jwkest.jwk import RSAKey from Crypto.PublicKey import RSA from cStringIO import StringIO from digest.checksums import compute_simple try: app.register_blueprint(v1_bp, url_prefix='/v1') app.register_blueprint(v2_bp, url_prefix='/v2') app.register_blueprint(verbs, url_prefix='/c1') app.register_blueprint(api_bp, url_prefix='/api') except ValueError: # Blueprint was already registered pass # Add a test blueprint for generating CSRF tokens, setting feature flags and reloading the # DB connection. testbp = Blueprint('testbp', __name__) logger = logging.getLogger(__name__) @testbp.route('/csrf', methods=['GET']) def generate_csrf(): return generate_csrf_token() @testbp.route('/fakestoragedd/', methods=['POST']) def set_fakestorage_directdownload(enabled): storage.put_content(['local_us'], 'supports_direct_download', enabled) return 'OK' @testbp.route('/feature/', methods=['POST']) def set_feature(feature_name): import features old_value = features._FEATURES[feature_name].value features._FEATURES[feature_name].value = request.get_json()['value'] return jsonify({'old_value': old_value}) @testbp.route('/removeuncompressed/', methods=['POST']) def removeuncompressed(image_id): image = model.image.get_image_by_id('devtable', 'newrepo', image_id) image.storage.uncompressed_size = None image.storage.save() return 'OK' @testbp.route('/addtoken', methods=['POST']) def addtoken(): another_token = model.token.create_delegate_token('devtable', 'newrepo', 'my-new-token', 'write') another_token.code = 'somecooltokencode' another_token.save() return 'OK' @testbp.route('/reloadapp', methods=['POST']) def reload_app(): # Close any existing connection. close_db_filter(None) # Reload the database config. configure(app.config) # Reload random after the process split, as it cannot be used uninitialized across forks. Crypto.Random.atfork() return 'OK' app.register_blueprint(testbp, url_prefix='/__test') class TestFeature(object): """ Helper object which temporarily sets the value of a feature flag. """ def __init__(self, test_case, feature_flag, test_value): self.test_case = test_case self.feature_flag = feature_flag self.test_value = test_value self.old_value = None def __enter__(self): result = self.test_case.conduct('POST', '/__test/feature/' + self.feature_flag, data=json.dumps(dict(value=self.test_value)), headers={'Content-Type': 'application/json'}) result_data = json.loads(result.text) self.old_value = result_data['old_value'] def __exit__(self, type, value, traceback): self.test_case.conduct('POST', '/__test/feature/' + self.feature_flag, data=json.dumps(dict(value=self.old_value)), headers={'Content-Type': 'application/json'}) _PORT_NUMBER = 5001 _CLEAN_DATABASE_PATH = None _JWK = RSAKey(key=RSA.generate(2048)) class FailureCodes: """ Defines tuples representing the HTTP status codes for various errors. The tuple is defined as ('errordescription', V1HTTPStatusCode, V2HTTPStatusCode). """ UNAUTHENTICATED = ('unauthenticated', 401, 401) UNAUTHORIZED = ('unauthorized', 403, 401) INVALID_REGISTRY = ('invalidregistry', 404, 404) DOES_NOT_EXIST = ('doesnotexist', 404, 404) INVALID_REQUEST = ('invalidrequest', 400, 400) def _get_expected_code(expected_failure, version, success_status_code): """ Returns the HTTP status code for the expected failure under the specified protocol version (1 or 2). If none, returns the success status code. """ if not expected_failure: return success_status_code return expected_failure[version] def _get_repo_name(namespace, name): if namespace == '': return name return '%s/%s' % (namespace, name) def _get_full_contents(image_data): if 'chunks' in image_data: # Data is just for chunking; no need for a real TAR. return image_data['contents'] layer_data = StringIO() tar_file_info = tarfile.TarInfo(name='contents') tar_file_info.type = tarfile.REGTYPE tar_file_info.size = len(image_data['contents']) tar_file = tarfile.open(fileobj=layer_data, mode='w|gz') tar_file.addfile(tar_file_info, StringIO(image_data['contents'])) tar_file.close() layer_bytes = layer_data.getvalue() layer_data.close() return layer_bytes def get_new_database_uri(): # If a clean copy of the database has not yet been created, create one now. global _CLEAN_DATABASE_PATH if not _CLEAN_DATABASE_PATH: wipe_database() initialize_database() populate_database() close_db_filter(None) # Save the path of the clean database. _CLEAN_DATABASE_PATH = app.config['TEST_DB_FILE'].name # Create a new temp file to be used as the actual backing database for the test. # Note that we have the close() the file to ensure we can copy to it via shutil. local_db_file = NamedTemporaryFile(delete=True) local_db_file.close() # Copy the clean database to the path. shutil.copy2(_CLEAN_DATABASE_PATH, local_db_file.name) return 'sqlite:///{0}'.format(local_db_file.name) class RegistryTestCaseMixin(LiveServerTestCase): def create_app(self): global _PORT_NUMBER _PORT_NUMBER = _PORT_NUMBER + 1 if os.environ.get('DEBUG') == 'true': app.config['DEBUG'] = True app.config['TESTING'] = True app.config['LIVESERVER_PORT'] = _PORT_NUMBER app.config['DB_URI'] = get_new_database_uri() return app def setUp(self): self.clearSession() # Tell the remote running app to reload the database and app. By default, the app forks from the # current context and has already loaded the DB config with the *original* DB URL. We call # the remote reload method to force it to pick up the changes to DB_URI set in the create_app # method. self.conduct('POST', '/__test/reloadapp') def clearSession(self): self.session = requests.Session() self.signature = None self.docker_token = 'true' self.jwt = None # Load the CSRF token. self.csrf_token = '' self.csrf_token = self.conduct('GET', '/__test/csrf').text def do_tag(self, namespace, repository, tag, image_id, expected_code=200): repo_name = _get_repo_name(namespace, repository) self.conduct('PUT', '/v1/repositories/%s/tags/%s' % (repo_name, tag), data='"%s"' % image_id, expected_code=expected_code, auth='sig') def conduct_api_login(self, username, password): self.conduct('POST', '/api/v1/signin', data=json.dumps(dict(username=username, password=password)), headers={'Content-Type': 'application/json'}) def change_repo_visibility(self, namespace, repository, visibility): repo_name = _get_repo_name(namespace, repository) self.conduct('POST', '/api/v1/repository/%s/changevisibility' % repo_name, data=json.dumps(dict(visibility=visibility)), headers={'Content-Type': 'application/json'}) class BaseRegistryMixin(object): def conduct(self, method, url, headers=None, data=None, auth=None, params=None, expected_code=200, json_data=None, user_agent=None): params = params or {} params['_csrf_token'] = self.csrf_token headers = headers or {} auth_tuple = None if user_agent is not None: headers['User-Agent'] = user_agent else: headers['User-Agent'] = 'docker/1.9.1' if self.docker_token: headers['X-Docker-Token'] = self.docker_token if auth == 'sig': if self.signature: headers['Authorization'] = 'token ' + self.signature elif auth == 'jwt': if self.jwt: headers['Authorization'] = 'Bearer ' + self.jwt elif auth: auth_tuple = auth if json_data is not None: data = json.dumps(json_data) headers['Content-Type'] = 'application/json' response = self.session.request(method, self.get_server_url() + url, headers=headers, data=data, auth=auth_tuple, params=params) if response.status_code != expected_code: print response.text if 'www-authenticate' in response.headers: self.signature = response.headers['www-authenticate'] if 'X-Docker-Token' in response.headers: self.docker_token = response.headers['X-Docker-Token'] self.assertEquals(response.status_code, expected_code) return response def _get_default_images(self): return [{'id': 'someid', 'contents': 'somecontent'}] class V1RegistryMixin(BaseRegistryMixin): def v1_ping(self): self.conduct('GET', '/v1/_ping') class V1RegistryPushMixin(V1RegistryMixin): def do_push(self, namespace, repository, username, password, images=None, expect_failure=None): images = images or self._get_default_images() auth = (username, password) repo_name = _get_repo_name(namespace, repository) # Ping! self.v1_ping() # PUT /v1/repositories/{namespace}/{repository}/ expected_code = _get_expected_code(expect_failure, 1, 201) self.conduct('PUT', '/v1/repositories/%s/' % repo_name, data=json.dumps(images), auth=auth, expected_code=expected_code) if expected_code != 201: return last_image_id = None for image_data in images: image_id = image_data['id'] last_image_id = image_id # PUT /v1/images/{imageID}/json image_json_data = {'id': image_id} if 'size' in image_data: image_json_data['Size'] = image_data['size'] self.conduct('PUT', '/v1/images/%s/json' % image_id, data=json.dumps(image_json_data), auth='sig') # PUT /v1/images/{imageID}/layer layer_bytes = _get_full_contents(image_data) self.conduct('PUT', '/v1/images/%s/layer' % image_id, data=StringIO(layer_bytes), auth='sig') # PUT /v1/images/{imageID}/checksum checksum = compute_simple(StringIO(layer_bytes), json.dumps(image_json_data)) self.conduct('PUT', '/v1/images/%s/checksum' % image_id, headers={'X-Docker-Checksum-Payload': checksum}, auth='sig') # PUT /v1/repositories/{namespace}/{repository}/tags/latest self.do_tag(namespace, repository, 'latest', images[0]['id']) # PUT /v1/repositories/{namespace}/{repository}/images self.conduct('PUT', '/v1/repositories/%s/images' % repo_name, expected_code=204, auth='sig') class V1RegistryPullMixin(V1RegistryMixin): def do_pull(self, namespace, repository, username=None, password='password', expect_failure=None, images=None): images = images or self._get_default_images() repo_name = _get_repo_name(namespace, repository) auth = None if username: auth = (username, password) # Ping! self.v1_ping() prefix = '/v1/repositories/%s/' % repo_name # GET /v1/repositories/{namespace}/{repository}/ expected_code = _get_expected_code(expect_failure, 1, 200) self.conduct('GET', prefix + 'images', auth=auth, expected_code=expected_code) if expected_code != 200: return # GET /v1/repositories/{namespace}/{repository}/ result = json.loads(self.conduct('GET', prefix + 'tags', auth='sig').text) self.assertEquals(len(images), len(result.values())) for image_data in images: image_id = image_data['id'] self.assertIn(image_id, result.values()) # /v1/images/{imageID}/{ancestry, json, layer} image_prefix = '/v1/images/%s/' % image_id self.conduct('GET', image_prefix + 'ancestry', auth='sig') self.conduct('GET', image_prefix + 'json', auth='sig') self.conduct('GET', image_prefix + 'layer', auth='sig') class V2RegistryMixin(BaseRegistryMixin): MANIFEST_SCHEMA = { 'type': 'object', 'properties': { 'name': { 'type': 'string', }, 'tag': { 'type': 'string', }, 'signatures': { 'type': 'array', 'itemType': { 'type': 'object', }, }, 'fsLayers': { 'type': 'array', 'itemType': { 'type': 'object', 'properties': { 'blobSum': { 'type': 'string', }, }, 'required': 'blobSum', }, }, 'history': { 'type': 'array', 'itemType': { 'type': 'object', 'properties': { 'v1Compatibility': { 'type': 'object', }, }, 'required': ['v1Compatibility'], }, }, }, 'required': ['name', 'tag', 'fsLayers', 'history', 'signatures'], } def v2_ping(self): response = self.conduct('GET', '/v2/', expected_code=200 if self.jwt else 401, auth='jwt') self.assertEquals(response.headers['Docker-Distribution-API-Version'], 'registry/2.0') def do_auth(self, username, password, namespace, repository, expected_code=200, scopes=[]): auth = None if username and password: auth = (username, password) repo_name = _get_repo_name(namespace, repository) params = { 'account': username, 'scope': 'repository:%s:%s' % (repo_name, ','.join(scopes)), 'service': app.config['SERVER_HOSTNAME'], } response = self.conduct('GET', '/v2/auth', params=params, auth=auth, expected_code=expected_code) if expected_code == 200: response_json = json.loads(response.text) self.assertIsNotNone(response_json.get('token')) self.jwt = response_json['token'] return response class V2RegistryPushMixin(V2RegistryMixin): def do_push(self, namespace, repository, username, password, images=None, tag_name=None, cancel=False, invalid=False, expect_failure=None, scopes=None): images = images or self._get_default_images() repo_name = _get_repo_name(namespace, repository) # Ping! self.v2_ping() # Auth. If the expected failure is an invalid registry, in V2 we'll receive that error from # the auth endpoint first, rather than just the V2 requests below. expected_auth_code = 200 if expect_failure == FailureCodes.INVALID_REGISTRY: expected_auth_code = 400 self.do_auth(username, password, namespace, repository, scopes=scopes or ['push', 'pull'], expected_code=expected_auth_code) if expected_auth_code != 200: return # Build a fake manifest. tag_name = tag_name or 'latest' builder = SignedManifestBuilder(namespace, repository, tag_name) full_contents = {} for image_data in images: full_contents[image_data['id']] = _get_full_contents(image_data) checksum = 'sha256:' + hashlib.sha256(full_contents[image_data['id']]).hexdigest() if invalid: checksum = 'sha256:' + hashlib.sha256('foobarbaz').hexdigest() builder.add_layer(checksum, json.dumps(image_data)) expected_code = _get_expected_code(expect_failure, 2, 404) # Build the manifest. manifest = builder.build(_JWK) # Push the image's layers. checksums = {} for image_data in images: image_id = image_data['id'] layer_bytes = full_contents[image_data['id']] chunks = image_data.get('chunks') # Layer data should not yet exist. checksum = 'sha256:' + hashlib.sha256(layer_bytes).hexdigest() self.conduct('HEAD', '/v2/%s/blobs/%s' % (repo_name, checksum), expected_code=404, auth='jwt') # If we expected a non-404 status code, then the HEAD operation has failed and we cannot # continue performing the push. if expected_code != 404: return # Start a new upload of the layer data. response = self.conduct('POST', '/v2/%s/blobs/uploads/' % repo_name, expected_code=202, auth='jwt') upload_uuid = response.headers['Docker-Upload-UUID'] location = response.headers['Location'][len(self.get_server_url()):] # PATCH the image data into the layer. if chunks is None: self.conduct('PATCH', location, data=layer_bytes, expected_code=204, auth='jwt') else: for chunk in chunks: if len(chunk) == 3: (start_byte, end_byte, expected_code) = chunk else: (start_byte, end_byte) = chunk expected_code = 204 contents_chunk = layer_bytes[start_byte:end_byte] self.conduct('PATCH', location, data=contents_chunk, expected_code=expected_code, auth='jwt', headers={'Range': 'bytes=%s-%s' % (start_byte, end_byte)}) if expected_code != 204: return # Retrieve the upload status at each point. status_url = '/v2/%s/blobs/uploads/%s' % (repo_name, upload_uuid) response = self.conduct('GET', status_url, expected_code=204, auth='jwt', headers=dict(host=self.get_server_url())) self.assertEquals(response.headers['Docker-Upload-UUID'], upload_uuid) self.assertEquals(response.headers['Range'], "bytes=0-%s" % end_byte) if cancel: self.conduct('DELETE', location, params=dict(digest=checksum), expected_code=204, auth='jwt') # Ensure the upload was canceled. status_url = '/v2/%s/blobs/uploads/%s' % (repo_name, upload_uuid) self.conduct('GET', status_url, expected_code=404, auth='jwt', headers=dict(host=self.get_server_url())) return # Finish the layer upload with a PUT. response = self.conduct('PUT', location, params=dict(digest=checksum), expected_code=201, auth='jwt') self.assertEquals(response.headers['Docker-Content-Digest'], checksum) checksums[image_id] = checksum # Ensure the layer exists now. response = self.conduct('HEAD', '/v2/%s/blobs/%s' % (repo_name, checksum), expected_code=200, auth='jwt') self.assertEquals(response.headers['Docker-Content-Digest'], checksum) self.assertEquals(response.headers['Content-Length'], str(len(layer_bytes))) # Write the manifest. If we expect it to be invalid, we expect a 404 code. Otherwise, we expect # a 202 response for success. put_code = 404 if invalid else 202 self.conduct('PUT', '/v2/%s/manifests/%s' % (repo_name, tag_name), data=manifest.bytes, expected_code=put_code, headers={'Content-Type': 'application/json'}, auth='jwt') return checksums, manifest.digest class V2RegistryPullMixin(V2RegistryMixin): def do_pull(self, namespace, repository, username=None, password='password', expect_failure=None, manifest_id=None, images=None): images = images or self._get_default_images() repo_name = _get_repo_name(namespace, repository) # Ping! self.v2_ping() # Auth. If the failure expected is unauthenticated, then the auth endpoint will 401 before # we reach any of the registry operations. expected_auth_code = 200 if expect_failure == FailureCodes.UNAUTHENTICATED: expected_auth_code = 401 self.do_auth(username, password, namespace, repository, scopes=['pull'], expected_code=expected_auth_code) if expected_auth_code != 200: return # Retrieve the manifest for the tag or digest. manifest_id = manifest_id or 'latest' expected_code = _get_expected_code(expect_failure, 2, 200) response = self.conduct('GET', '/v2/%s/manifests/%s' % (repo_name, manifest_id), auth='jwt', expected_code=expected_code) if expected_code != 200: return manifest_data = json.loads(response.text) # Ensure the manifest returned by us is valid. validate_schema(manifest_data, V2RegistryMixin.MANIFEST_SCHEMA) # Verify the layers. blobs = {} for layer in manifest_data['fsLayers']: blob_id = layer['blobSum'] result = self.conduct('GET', '/v2/%s/blobs/%s' % (repo_name, blob_id), expected_code=200, auth='jwt') blobs[blob_id] = result.content # Verify the V1 metadata is present for each expected image. found_v1_layers = set() history = manifest_data['history'] for entry in history: v1_history = json.loads(entry['v1Compatibility']) found_v1_layers.add(v1_history['id']) for image in images: self.assertIn(image['id'], found_v1_layers) return blobs class V1RegistryLoginMixin(object): def do_login(self, username, password, scope, expect_success=True): data = { 'username': username, 'password': password, } response = self.conduct('POST', '/v1/users/', json_data=data, expected_code=400) if expect_success: self.assertEquals(response.text, '"Username or email already exists"') else: self.assertNotEquals(response.text, '"Username or email already exists"') class V2RegistryLoginMixin(object): def do_login(self, username, password, scope, expect_success=True): params = { 'account': username, 'scope': scope, 'service': app.config['SERVER_HOSTNAME'], } if expect_success: expected_code = 200 else: expected_code = 401 auth = None if username and password: auth = (username, password) response = self.conduct('GET', '/v2/auth', params=params, auth=auth, expected_code=expected_code) return response class RegistryTestsMixin(object): def test_push_pull_logging(self): # Push a new repository. self.do_push('public', 'newrepo', 'public', 'password') # Retrieve the logs and ensure the push was added. self.conduct_api_login('public', 'password') result = self.conduct('GET', '/api/v1/repository/public/newrepo/logs') logs = result.json()['logs'] self.assertEquals(1, len(logs)) self.assertEquals('push_repo', logs[0]['kind']) self.assertEquals('public', logs[0]['performer']['name']) # Pull the repository. self.do_pull('public', 'newrepo', 'public', 'password') # Retrieve the logs and ensure the pull was added. result = self.conduct('GET', '/api/v1/repository/public/newrepo/logs') logs = result.json()['logs'] self.assertEquals(2, len(logs)) self.assertEquals('pull_repo', logs[0]['kind']) self.assertEquals('public', logs[0]['performer']['name']) def test_push_pull_logging_byrobot(self): # Lookup the robot's password. self.conduct_api_login('devtable', 'password') resp = self.conduct('GET', '/api/v1/organization/buynlarge/robots/ownerbot') robot_token = json.loads(resp.text)['token'] # Push a new repository. self.do_push('buynlarge', 'newrepo', 'buynlarge+ownerbot', robot_token) # Retrieve the logs and ensure the push was added. result = self.conduct('GET', '/api/v1/repository/buynlarge/newrepo/logs') logs = result.json()['logs'] self.assertEquals(1, len(logs)) self.assertEquals('push_repo', logs[0]['kind']) self.assertEquals('buynlarge+ownerbot', logs[0]['performer']['name']) # Pull the repository. self.do_pull('buynlarge', 'newrepo', 'buynlarge+ownerbot', robot_token) # Retrieve the logs and ensure the pull was added. result = self.conduct('GET', '/api/v1/repository/buynlarge/newrepo/logs') logs = result.json()['logs'] self.assertEquals(2, len(logs)) self.assertEquals('pull_repo', logs[0]['kind']) self.assertEquals('buynlarge+ownerbot', logs[0]['performer']['name']) def test_push_pull_logging_bytoken(self): # Push the repository. self.do_push('devtable', 'newrepo', 'devtable', 'password') # Add a token. self.conduct('POST', '/__test/addtoken') # Pull the repository. self.do_pull('devtable', 'newrepo', '$token', 'somecooltokencode') # Retrieve the logs and ensure the pull was added. self.conduct_api_login('devtable', 'password') result = self.conduct('GET', '/api/v1/repository/devtable/newrepo/logs') logs = result.json()['logs'] self.assertEquals('pull_repo', logs[0]['kind']) self.assertEquals('my-new-token', logs[0]['metadata']['token']) def test_push_pull_logging_byoauth(self): # Push the repository. self.do_push('devtable', 'newrepo', 'devtable', 'password') # Pull the repository. self.do_pull('devtable', 'newrepo', '$oauthtoken', 'test') # Retrieve the logs and ensure the pull was added. self.conduct_api_login('devtable', 'password') result = self.conduct('GET', '/api/v1/repository/devtable/newrepo/logs') logs = result.json()['logs'] self.assertEquals(2, len(logs)) self.assertEquals('pull_repo', logs[0]['kind']) self.assertEquals('devtable', logs[0]['performer']['name']) self.assertEquals(1, logs[0]['metadata']['oauth_token_id']) def test_pull_publicrepo_anonymous(self): # Add a new repository under the public user, so we have a real repository to pull. self.do_push('public', 'newrepo', 'public', 'password') self.clearSession() # First try to pull the (currently private) repo anonymously, which should fail (since it is # private) self.do_pull('public', 'newrepo', expect_failure=FailureCodes.UNAUTHORIZED) self.do_pull('public', 'newrepo', 'devtable', 'password', expect_failure=FailureCodes.UNAUTHORIZED) # Make the repository public. self.conduct_api_login('public', 'password') self.change_repo_visibility('public', 'newrepo', 'public') self.clearSession() # Pull the repository anonymously, which should succeed because the repository is public. self.do_pull('public', 'newrepo') def test_pull_publicrepo_devtable(self): # Add a new repository under the public user, so we have a real repository to pull. self.do_push('public', 'newrepo', 'public', 'password') self.clearSession() # First try to pull the (currently private) repo as devtable, which should fail as it belongs # to public. self.do_pull('public', 'newrepo', 'devtable', 'password', expect_failure=FailureCodes.UNAUTHORIZED) # Make the repository public. self.conduct_api_login('public', 'password') self.change_repo_visibility('public', 'newrepo', 'public') self.clearSession() # Pull the repository as devtable, which should succeed because the repository is public. self.do_pull('public', 'newrepo', 'devtable', 'password') def test_pull_private_repo(self): # Add a new repository under the devtable user, so we have a real repository to pull. self.do_push('devtable', 'newrepo', 'devtable', 'password') self.clearSession() # First try to pull the (currently private) repo as public, which should fail as it belongs # to devtable. self.do_pull('devtable', 'newrepo', 'public', 'password', expect_failure=FailureCodes.UNAUTHORIZED) # Pull the repository as devtable, which should succeed because the repository is owned by # devtable. self.do_pull('devtable', 'newrepo', 'devtable', 'password') def test_public_no_anonymous_access_with_auth(self): # Turn off anonymous access. with TestFeature(self, 'ANONYMOUS_ACCESS', False): # Add a new repository under the public user, so we have a real repository to pull. self.do_push('public', 'newrepo', 'public', 'password') self.clearSession() # First try to pull the (currently private) repo as devtable, which should fail as it belongs # to public. self.do_pull('public', 'newrepo', 'devtable', 'password', expect_failure=FailureCodes.UNAUTHORIZED) # Make the repository public. self.conduct_api_login('public', 'password') self.change_repo_visibility('public', 'newrepo', 'public') self.clearSession() # Pull the repository as devtable, which should succeed because the repository is public. self.do_pull('public', 'newrepo', 'devtable', 'password') def test_private_no_anonymous_access(self): # Turn off anonymous access. with TestFeature(self, 'ANONYMOUS_ACCESS', False): # Add a new repository under the public user, so we have a real repository to pull. self.do_push('public', 'newrepo', 'public', 'password') self.clearSession() # First try to pull the (currently private) repo as devtable, which should fail as it belongs # to public. self.do_pull('public', 'newrepo', 'devtable', 'password', expect_failure=FailureCodes.UNAUTHORIZED) # Pull the repository as public, which should succeed because the repository is owned by public. self.do_pull('public', 'newrepo', 'public', 'password') def test_public_no_anonymous_access_no_auth(self): # Turn off anonymous access. with TestFeature(self, 'ANONYMOUS_ACCESS', False): # Add a new repository under the public user, so we have a real repository to pull. self.do_push('public', 'newrepo', 'public', 'password') self.clearSession() # First try to pull the (currently private) repo as anonymous, which should fail as it # is private. self.do_pull('public', 'newrepo', expect_failure=FailureCodes.UNAUTHENTICATED) # Make the repository public. self.conduct_api_login('public', 'password') self.change_repo_visibility('public', 'newrepo', 'public') self.clearSession() # Try again to pull the (currently public) repo as anonymous, which should fail as # anonymous access is disabled. self.do_pull('public', 'newrepo', expect_failure=FailureCodes.UNAUTHENTICATED) # Pull the repository as public, which should succeed because the repository is owned by public. self.do_pull('public', 'newrepo', 'public', 'password') # Pull the repository as devtable, which should succeed because the repository is public. self.do_pull('public', 'newrepo', 'devtable', 'password') def test_create_repo_creator_user(self): self.do_push('buynlarge', 'newrepo', 'creator', 'password') # Pull the repository as devtable, which should succeed because the repository is owned by the # org. self.do_pull('buynlarge', 'newrepo', 'devtable', 'password') def test_create_repo_robot_owner(self): # Lookup the robot's password. self.conduct_api_login('devtable', 'password') resp = self.conduct('GET', '/api/v1/organization/buynlarge/robots/ownerbot') robot_token = json.loads(resp.text)['token'] self.do_push('buynlarge', 'newrepo', 'buynlarge+ownerbot', robot_token) # Pull the repository as devtable, which should succeed because the repository is owned by the # org. self.do_pull('buynlarge', 'newrepo', 'devtable', 'password') def test_create_repo_robot_creator(self): # Lookup the robot's password. self.conduct_api_login('devtable', 'password') resp = self.conduct('GET', '/api/v1/organization/buynlarge/robots/creatorbot') robot_token = json.loads(resp.text)['token'] self.do_push('buynlarge', 'newrepo', 'buynlarge+creatorbot', robot_token) # Pull the repository as devtable, which should succeed because the repository is owned by the # org. self.do_pull('buynlarge', 'newrepo', 'devtable', 'password') def test_library_repo(self): self.do_push('', 'newrepo', 'devtable', 'password') self.do_pull('', 'newrepo', 'devtable', 'password') self.do_pull('library', 'newrepo', 'devtable', 'password') def test_library_disabled(self): with TestFeature(self, 'LIBRARY_SUPPORT', False): self.do_push('library', 'newrepo', 'devtable', 'password') self.do_pull('library', 'newrepo', 'devtable', 'password') class V1RegistryTests(V1RegistryPullMixin, V1RegistryPushMixin, RegistryTestsMixin, RegistryTestCaseMixin, LiveServerTestCase): """ Tests for V1 registry. """ def test_push_reponame_with_slashes(self): # Attempt to add a repository name with slashes. This should fail as we do not support it. images = [{ 'id': 'onlyimagehere', 'contents': 'somecontents', }] self.do_push('public', 'newrepo/somesubrepo', 'public', 'password', images, expect_failure=FailureCodes.INVALID_REGISTRY) def test_push_unicode_metadata(self): self.conduct_api_login('devtable', 'password') images = [{ 'id': 'onlyimagehere', 'comment': 'Pawe\xc5\x82 Kami\xc5\x84ski '.decode('utf-8'), 'contents': 'somecontents', }] self.do_push('devtable', 'unicodetest', 'devtable', 'password', images) self.do_pull('devtable', 'unicodetest', 'devtable', 'password', images=images) def test_tag_validation(self): image_id = 'onlyimagehere' images = [{ 'id': image_id, 'contents': 'somecontents', }] self.do_push('public', 'newrepo', 'public', 'password', images) self.do_tag('public', 'newrepo', '1', image_id) self.do_tag('public', 'newrepo', 'x' * 128, image_id) self.do_tag('public', 'newrepo', '', image_id, expected_code=404) self.do_tag('public', 'newrepo', 'x' * 129, image_id, expected_code=400) self.do_tag('public', 'newrepo', '.fail', image_id, expected_code=400) self.do_tag('public', 'newrepo', '-fail', image_id, expected_code=400) class V2RegistryTests(V2RegistryPullMixin, V2RegistryPushMixin, RegistryTestsMixin, RegistryTestCaseMixin, LiveServerTestCase): """ Tests for V2 registry. """ def test_delete_manifest(self): # Push a new repo with the latest tag. (_, digest) = self.do_push('devtable', 'newrepo', 'devtable', 'password') # Ensure the pull works. self.do_pull('devtable', 'newrepo', 'devtable', 'password') # Conduct auth for the write scope. self.do_auth('devtable', 'password', 'devtable', 'newrepo', scopes=['push']) # Delete the digest. self.conduct('DELETE', '/v2/devtable/newrepo/manifests/' + digest, auth='jwt', expected_code=202) # Ensure the tag no longer exists. self.do_pull('devtable', 'newrepo', 'devtable', 'password', expect_failure=FailureCodes.DOES_NOT_EXIST) def test_push_only_push_scope(self): images = [{ 'id': 'onlyimagehere', 'contents': 'foobar', }] self.do_push('devtable', 'somenewrepo', 'devtable', 'password', images, scopes=['push']) def test_push_reponame_with_slashes(self): # Attempt to add a repository name with slashes. This should fail as we do not support it. images = [{ 'id': 'onlyimagehere', 'contents': 'somecontents', }] self.do_push('public', 'newrepo/somesubrepo', 'devtable', 'password', images, expect_failure=FailureCodes.INVALID_REGISTRY) def test_invalid_push(self): self.do_push('devtable', 'newrepo', 'devtable', 'password', invalid=True) def test_cancel_push(self): self.do_push('devtable', 'newrepo', 'devtable', 'password', cancel=True) def test_pull_by_checksum(self): # Add a new repository under the user, so we have a real repository to pull. _, digest = self.do_push('devtable', 'newrepo', 'devtable', 'password') # Attempt to pull by digest. self.do_pull('devtable', 'newrepo', 'devtable', 'password', manifest_id=digest) def test_pull_invalid_image_tag(self): # Add a new repository under the user, so we have a real repository to pull. self.do_push('devtable', 'newrepo', 'devtable', 'password') self.clearSession() # Attempt to pull the invalid tag. self.do_pull('devtable', 'newrepo', 'devtable', 'password', manifest_id='invalid', expect_failure=FailureCodes.INVALID_REGISTRY) def test_partial_upload_below_5mb(self): chunksize = 1024 * 1024 * 2 size = chunksize * 3 contents = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(size)) chunk_count = int(math.ceil((len(contents) * 1.0) / chunksize)) chunks = [(index * chunksize, (index + 1)*chunksize) for index in range(chunk_count)] images = [ { 'id':'someid', 'contents': contents, 'chunks': chunks } ] # Push the chunked upload. self.do_push('devtable', 'newrepo', 'devtable', 'password', images=images) # Pull the image back and verify the contents. blobs = self.do_pull('devtable', 'newrepo', 'devtable', 'password', images=images) self.assertEquals(len(blobs.items()), 1) self.assertEquals(blobs.items()[0][1], contents) def test_partial_upload_way_below_5mb(self): size = 1024 contents = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(size)) chunks = [(0, 100), (100, size)] images = [ { 'id':'someid', 'contents': contents, 'chunks': chunks } ] # Push the chunked upload. self.do_push('devtable', 'newrepo', 'devtable', 'password', images=images) # Pull the image back and verify the contents. blobs = self.do_pull('devtable', 'newrepo', 'devtable', 'password', images=images) self.assertEquals(len(blobs.items()), 1) self.assertEquals(blobs.items()[0][1], contents) def test_partial_upload_resend_below_5mb(self): size = 150 contents = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(size)) chunks = [(0, 100), (10, size)] images = [ { 'id':'someid', 'contents': contents, 'chunks': chunks } ] # Push the chunked upload. self.do_push('devtable', 'newrepo', 'devtable', 'password', images=images) # Pull the image back and verify the contents. blobs = self.do_pull('devtable', 'newrepo', 'devtable', 'password', images=images) self.assertEquals(len(blobs.items()), 1) self.assertEquals(blobs.items()[0][1], contents) def test_partial_upload_try_resend_with_gap(self): size = 150 contents = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(size)) chunks = [(0, 100), (101, size, 416)] images = [ { 'id':'someid', 'contents': contents, 'chunks': chunks } ] # Attempt to push the chunked upload, which should fail. self.do_push('devtable', 'newrepo', 'devtable', 'password', images=images) def test_multiple_layers_invalid(self): # Attempt to push a manifest with an image depending on an unknown base layer. images = [ { 'id': 'latestid', 'contents': 'the latest image', 'parent': 'baseid', } ] self.do_push('devtable', 'newrepo', 'devtable', 'password', images=images, expect_failure=FailureCodes.INVALID_REQUEST) def test_multiple_layers(self): # Push a manifest with multiple layers. images = [ { 'id': 'latestid', 'contents': 'the latest image', 'parent': 'baseid', }, { 'id': 'baseid', 'contents': 'The base image', } ] self.do_push('devtable', 'newrepo', 'devtable', 'password', images=images) def test_invalid_regname(self): self.do_push('devtable', 'this/is/a/repo', 'devtable', 'password', expect_failure=FailureCodes.INVALID_REGISTRY) def test_multiple_tags(self): latest_images = [ { 'id': 'latestid', 'contents': 'the latest image' } ] foobar_images = [ { 'id': 'foobarid', 'contents': 'the foobar image', } ] # Create the repo. self.do_push('devtable', 'newrepo', 'devtable', 'password', images=latest_images, tag_name='latest') self.do_push('devtable', 'newrepo', 'devtable', 'password', images=foobar_images, tag_name='foobar') # Retrieve the tags. response = self.conduct('GET', '/v2/devtable/newrepo/tags/list', auth='jwt', expected_code=200) data = json.loads(response.text) self.assertEquals(data['name'], "devtable/newrepo") self.assertIn('latest', data['tags']) self.assertIn('foobar', data['tags']) # Retrieve the tags with pagination. response = self.conduct('GET', '/v2/devtable/newrepo/tags/list', auth='jwt', params=dict(n=1), expected_code=200) data = json.loads(response.text) self.assertEquals(data['name'], "devtable/newrepo") self.assertEquals(len(data['tags']), 1) self.assertTrue(response.headers['Link'].find('n=1&last=2') > 0) # Try to get tags before a repo exists. self.conduct('GET', '/v2/devtable/doesnotexist/tags/list', auth='jwt', expected_code=401) def test_one_five_blacklist(self): self.conduct('GET', '/v2/', expected_code=404, user_agent='Go 1.1 package http') class V1PushV2PullRegistryTests(V2RegistryPullMixin, V1RegistryPushMixin, RegistryTestsMixin, RegistryTestCaseMixin, LiveServerTestCase): """ Tests for V1 push, V2 pull registry. """ class V1PullV2PushRegistryTests(V1RegistryPullMixin, V2RegistryPushMixin, RegistryTestsMixin, RegistryTestCaseMixin, LiveServerTestCase): """ Tests for V1 pull, V2 push registry. """ class TorrentTestMixin(V2RegistryPullMixin): """ Mixin of tests for torrent support. """ def get_torrent(self, blobsum): # Enable direct download URLs in fake storage. self.conduct('POST', '/__test/fakestoragedd/true') response = self.conduct('GET', '/c1/torrent/devtable/newrepo/blobs/' + blobsum, auth=('devtable', 'password')) # Disable direct download URLs in fake storage. self.conduct('POST', '/__test/fakestoragedd/false') return response.content def test_get_basic_torrent(self): initial_images = [ { 'id': 'initialid', 'contents': 'the initial image', }, ] # Create the repo. self.do_push('devtable', 'newrepo', 'devtable', 'password', images=initial_images) # Retrieve the manifest for the tag. blobs = self.do_pull('devtable', 'newrepo', 'devtable', 'password', manifest_id='latest', images=initial_images) self.assertEquals(1, len(list(blobs.keys()))) blobsum = list(blobs.keys())[0] # Retrieve the torrent for the tag. torrent = self.get_torrent(blobsum) contents = bencode.bdecode(torrent) # Ensure that there is a webseed. self.assertEquals(contents['url-list'], 'http://somefakeurl') # Ensure there is an announce and some pieces. self.assertIsNotNone(contents.get('info', {}).get('pieces')) self.assertIsNotNone(contents.get('announce')) sha = resumablehashlib.sha1() sha.update(blobs[blobsum]) expected = binascii.hexlify(sha.digest()) found = binascii.hexlify(contents['info']['pieces']) self.assertEquals(expected, found) class TorrentV1PushTests(RegistryTestCaseMixin, TorrentTestMixin, V1RegistryPushMixin, LiveServerTestCase): """ Torrent tests via V1 push. """ pass class TorrentV2PushTests(RegistryTestCaseMixin, TorrentTestMixin, V2RegistryPushMixin, LiveServerTestCase): """ Torrent tests via V2 push. """ pass class SquashingTests(RegistryTestCaseMixin, V1RegistryPushMixin, LiveServerTestCase): """ Tests for registry squashing. """ def get_squashed_image(self): response = self.conduct('GET', '/c1/squash/devtable/newrepo/latest', auth='sig') tar = tarfile.open(fileobj=StringIO(response.content)) return tar, response.content def test_squashed_changes(self): initial_images = [ { 'id': 'initialid', 'contents': 'the initial image', }, ] # Create the repo. self.do_push('devtable', 'newrepo', 'devtable', 'password', images=initial_images) initial_image_id = '91081df45b58dc62dd207441785eef2b895f0383fbe601c99a3cf643c79957dc' # Pull the squashed version of the tag. tar, _ = self.get_squashed_image() self.assertTrue(initial_image_id in tar.getnames()) # Change the images. updated_images = [ { 'id': 'updatedid', 'contents': 'the updated image', }, ] self.do_push('devtable', 'newrepo', 'devtable', 'password', images=updated_images) updated_image_id = '38df4bd4cdffc6b7d656dbd2813c73e864f2d362ad887c999ac315224ad281ac' # Pull the squashed version of the tag and ensure it has changed. tar, _ = self.get_squashed_image() self.assertTrue(updated_image_id in tar.getnames()) def test_estimated_squashing(self): initial_images = [ { 'id': 'initialid', 'contents': 'the initial image', 'size': 2002, }, ] # Create the repo. self.do_push('devtable', 'newrepo', 'devtable', 'password', images=initial_images) # NULL out the uncompressed size to force estimation. self.conduct('POST', '/__test/removeuncompressed/initialid') # Pull the squashed version of the tag. initial_image_id = '91081df45b58dc62dd207441785eef2b895f0383fbe601c99a3cf643c79957dc' tar, _ = self.get_squashed_image() self.assertTrue(initial_image_id in tar.getnames()) def test_multilayer_squashing(self): images = [ { 'id': 'latestid', 'contents': 'the latest image', 'parent': 'baseid', }, { 'id': 'baseid', 'contents': 'The base image', } ] # Create the repo. self.do_push('devtable', 'newrepo', 'devtable', 'password', images=images) # Pull the squashed version of the tag. expected_image_id = 'bd590ae79fba5ebc6550aaf016c0bd0f49b1d78178e0f83e0ca1c56c2bb7e7bf' expected_names = ['repositories', expected_image_id, '%s/json' % expected_image_id, '%s/VERSION' % expected_image_id, '%s/layer.tar' % expected_image_id] tar, _ = self.get_squashed_image() self.assertEquals(expected_names, tar.getnames()) self.assertEquals('1.0', tar.extractfile(tar.getmember('%s/VERSION' % expected_image_id)).read()) json_data = (tar.extractfile(tar.getmember('%s/json' % expected_image_id)).read()) # Ensure the JSON loads and parses. result = json.loads(json_data) self.assertEquals(expected_image_id, result['id']) # Ensure that the "image_name" file refers to the latest image, as it is the top layer. layer_tar = tarfile.open(fileobj=tar.extractfile(tar.getmember('%s/layer.tar' % expected_image_id))) image_contents = layer_tar.extractfile(layer_tar.getmember('contents')).read() self.assertEquals('the latest image', image_contents) def test_squashed_torrent(self): initial_images = [ { 'id': 'initialid', 'contents': 'the initial image', }, ] # Create the repo. self.do_push('devtable', 'newrepo', 'devtable', 'password', images=initial_images) initial_image_id = '91081df45b58dc62dd207441785eef2b895f0383fbe601c99a3cf643c79957dc' # Try to pull the torrent of the squashed image. This should fail with a 404 since the # squashed image doesn't yet exist. self.conduct('GET', '/c1/squash/devtable/newrepo/latest', auth=('devtable', 'password'), headers=dict(accept='application/x-bittorrent'), expected_code=406) # Pull the squashed version of the tag. tar, squashed = self.get_squashed_image() self.assertTrue(initial_image_id in tar.getnames()) # Enable direct download URLs in fake storage. self.conduct('POST', '/__test/fakestoragedd/true') # Pull the torrent. response = self.conduct('GET', '/c1/squash/devtable/newrepo/latest', auth=('devtable', 'password'), headers=dict(accept='application/x-bittorrent')) # Disable direct download URLs in fake storage. self.conduct('POST', '/__test/fakestoragedd/false') # Ensure the torrent is valid. contents = bencode.bdecode(response.content) # Ensure that there is a webseed. self.assertEquals(contents['url-list'], 'http://somefakeurl') # Ensure there is an announce and some pieces. self.assertIsNotNone(contents.get('info', {}).get('pieces')) self.assertIsNotNone(contents.get('announce')) # Ensure the SHA1 matches the generated tar. sha = resumablehashlib.sha1() sha.update(squashed) expected = binascii.hexlify(sha.digest()) found = binascii.hexlify(contents['info']['pieces']) self.assertEquals(expected, found) class LoginTests(object): """ Generic tests for registry login. """ def test_invalid_username_knownrepo(self): self.do_login('invaliduser', 'somepassword', expect_success=False, scope='repository:devtable/simple:pull') def test_invalid_password_knownrepo(self): self.do_login('devtable', 'somepassword', expect_success=False, scope='repository:devtable/simple:pull') def test_validuser_knownrepo(self): self.do_login('devtable', 'password', expect_success=True, scope='repository:devtable/simple:pull') def test_validuser_encryptedpass(self): # Generate an encrypted password. self.conduct_api_login('devtable', 'password') resp = self.conduct('POST', '/api/v1/user/clientkey', json_data=dict(password='password')) encryptedpassword = resp.json()['key'] self.do_login('devtable', encryptedpassword, expect_success=True, scope='repository:devtable/simple:pull') def test_robotkey(self): # Lookup the robot's password. self.conduct_api_login('devtable', 'password') resp = self.conduct('GET', '/api/v1/user/robots/dtrobot') robot_token = resp.json()['token'] self.do_login('devtable+dtrobot', robot_token, expect_success=True, scope='repository:devtable/complex:pull') def test_oauth(self): self.do_login('$oauthtoken', 'test', expect_success=True, scope='repository:devtable/complex:pull') class V1LoginTests(V1RegistryLoginMixin, LoginTests, RegistryTestCaseMixin, BaseRegistryMixin, LiveServerTestCase): """ Tests for V1 login. """ pass # No additional tests. class V2LoginTests(V2RegistryLoginMixin, LoginTests, RegistryTestCaseMixin, BaseRegistryMixin, LiveServerTestCase): """ Tests for V2 login. """ @staticmethod @lru_cache(maxsize=1) def load_public_key(certificate_file_path): with open(certificate_file_path) as cert_file: cert_obj = load_pem_x509_certificate(cert_file.read(), default_backend()) return cert_obj.public_key() def do_logincheck(self, username, password, scope, expected_actions=[], expect_success=True): response = self.do_login(username, password, scope, expect_success=expect_success) if not expect_success: return # Validate the returned JWT. encoded = response.json()['token'] expected_issuer = app.config['JWT_AUTH_TOKEN_ISSUER'] audience = app.config['SERVER_HOSTNAME'] max_signed_s = app.config.get('JWT_AUTH_MAX_FRESH_S', 3660) certificate_file_path = app.config['JWT_AUTH_CERTIFICATE_PATH'] public_key = V2LoginTests.load_public_key(certificate_file_path) max_exp = strictjwt.exp_max_s_option(max_signed_s) payload = strictjwt.decode(encoded, public_key, algorithms=['RS256'], audience=audience, issuer=expected_issuer, options=max_exp) if scope is None: self.assertEquals(0, len(payload['access'])) else: self.assertEquals(1, len(payload['access'])) self.assertEquals(payload['access'][0]['actions'], expected_actions) def test_nouser_noscope(self): self.do_logincheck('', '', expect_success=False, scope=None) def test_validuser_unknownrepo(self): self.do_logincheck('devtable', 'password', expect_success=True, scope='repository:invalidnamespace/simple:pull', expected_actions=[]) def test_validuser_unknownnamespacerepo(self): self.do_logincheck('devtable', 'password', expect_success=True, scope='repository:devtable/newrepo:push', expected_actions=['push']) def test_validuser_noaccess(self): self.do_logincheck('public', 'password', expect_success=True, scope='repository:devtable/simple:pull', expected_actions=[]) def test_validuser_noscope(self): self.do_logincheck('public', 'password', expect_success=True, scope=None) def test_invaliduser_noscope(self): self.do_logincheck('invaliduser', 'invalidpass', expect_success=False, scope=None) def test_invalidpassword_noscope(self): self.do_logincheck('public', 'invalidpass', expect_success=False, scope=None) def test_oauth_noaccess(self): self.do_logincheck('$oauthtoken', 'test', expect_success=True, scope='repository:freshuser/unknownrepo:pull,push', expected_actions=[]) def test_oauth_public(self): self.do_logincheck('$oauthtoken', 'test', expect_success=True, scope='repository:public/publicrepo:pull,push', expected_actions=['pull']) def test_nouser_pull_publicrepo(self): self.do_logincheck('', '', expect_success=True, scope='repository:public/publicrepo:pull', expected_actions=['pull']) def test_nouser_push_publicrepo(self): self.do_logincheck('', '', expect_success=True, scope='repository:public/publicrepo:push', expected_actions=[]) def test_library_invaliduser(self): self.do_logincheck('invaliduser', 'password', expect_success=False, scope='repository:librepo:pull,push') def test_library_noaccess(self): self.do_logincheck('freshuser', 'password', expect_success=True, scope='repository:librepo:pull,push', expected_actions=[]) def test_library_access(self): self.do_logincheck('devtable', 'password', expect_success=True, scope='repository:librepo:pull,push', expected_actions=['push', 'pull']) def test_nouser_pushpull_publicrepo(self): # Note: Docker 1.8.3 will ask for both push and pull scopes at all times. For public pulls # with no credentials, we were returning a 401. This test makes sure we get back just a pull # token. self.do_logincheck('', '', expect_success=True, scope='repository:public/publicrepo:pull,push', expected_actions=['pull']) if __name__ == '__main__': unittest.main()