This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/test/registry_tests.py
2015-09-17 16:48:08 -04:00

571 lines
20 KiB
Python

import unittest
import requests
import os
import Crypto.Random
from flask import request, jsonify
from flask.blueprints import Blueprint
from flask.ext.testing import LiveServerTestCase
from app import app
from data.database import close_db_filter, configure
from endpoints.v1 import v1_bp
from endpoints.v2 import v2_bp
from endpoints.v2.manifest import SignedManifestBuilder
from endpoints.api import api_bp
from initdb import wipe_database, initialize_database, populate_database
from endpoints.csrf import generate_csrf_token
from tempfile import NamedTemporaryFile
import endpoints.decorated
import json
import features
import hashlib
import tarfile
import shutil
from jwkest.jws import SIGNER_ALGS
from jwkest.jwk import RSAKey
from Crypto.PublicKey import RSA
from cStringIO import StringIO
from digest.checksums import compute_simple
try:
app.register_blueprint(v1_bp, url_prefix='/v1')
app.register_blueprint(v2_bp, url_prefix='/v2')
app.register_blueprint(api_bp, url_prefix='/api')
except ValueError:
# Blueprint was already registered
pass
# Add a test blueprint for generating CSRF tokens, setting feature flags and reloading the
# DB connection.
testbp = Blueprint('testbp', __name__)
@testbp.route('/csrf', methods=['GET'])
def generate_csrf():
return generate_csrf_token()
@testbp.route('/feature/<feature_name>', methods=['POST'])
def set_feature(feature_name):
import features
old_value = features._FEATURES[feature_name].value
features._FEATURES[feature_name].value = request.get_json()['value']
return jsonify({'old_value': old_value})
@testbp.route('/reloadapp', methods=['POST'])
def reload_app():
# Close any existing connection.
close_db_filter(None)
# Reload the database config.
configure(app.config)
# Reload random after the process split, as it cannot be used uninitialized across forks.
Crypto.Random.atfork()
return 'OK'
app.register_blueprint(testbp, url_prefix='/__test')
class TestFeature(object):
""" Helper object which temporarily sets the value of a feature flag.
"""
def __init__(self, test_case, feature_flag, test_value):
self.test_case = test_case
self.feature_flag = feature_flag
self.test_value = test_value
self.old_value = None
def __enter__(self):
result = self.test_case.conduct('POST', '/__test/feature/' + self.feature_flag,
data=json.dumps(dict(value=self.test_value)),
headers={'Content-Type': 'application/json'})
result_data = json.loads(result.text)
self.old_value = result_data['old_value']
def __exit__(self, type, value, traceback):
self.test_case.conduct('POST', '/__test/feature/' + self.feature_flag,
data=json.dumps(dict(value=self.old_value)),
headers={'Content-Type': 'application/json'})
_PORT_NUMBER = 5001
_CLEAN_DATABASE_PATH = None
def get_new_database_uri():
# If a clean copy of the database has not yet been created, create one now.
global _CLEAN_DATABASE_PATH
if not _CLEAN_DATABASE_PATH:
wipe_database()
initialize_database()
populate_database()
close_db_filter(None)
# Save the path of the clean database.
_CLEAN_DATABASE_PATH = app.config['TEST_DB_FILE'].name
# Create a new temp file to be used as the actual backing database for the test.
# Note that we have the close() the file to ensure we can copy to it via shutil.
local_db_file = NamedTemporaryFile(delete=True)
local_db_file.close()
# Copy the clean database to the path.
shutil.copy2(_CLEAN_DATABASE_PATH, local_db_file.name)
return 'sqlite:///{0}'.format(local_db_file.name)
class RegistryTestCaseMixin(LiveServerTestCase):
maxDiff = None
def create_app(self):
global _PORT_NUMBER
_PORT_NUMBER = _PORT_NUMBER + 1
app.config['DEBUG'] = True
app.config['TESTING'] = True
app.config['LIVESERVER_PORT'] = _PORT_NUMBER
app.config['DB_URI'] = get_new_database_uri()
return app
def setUp(self):
self.clearSession()
# Tell the remote running app to reload the database and app. By default, the app forks from the
# current context and has already loaded the DB config with the *original* DB URL. We call
# the remote reload method to force it to pick up the changes to DB_URI set in the create_app
# method.
self.conduct('POST', '/__test/reloadapp')
def clearSession(self):
self.session = requests.Session()
self.signature = None
self.docker_token = 'true'
self.jwt = None
# Load the CSRF token.
self.csrf_token = ''
self.csrf_token = self.conduct('GET', '/__test/csrf').text
def conduct_api_login(self, username, password):
self.conduct('POST', '/api/v1/signin',
data=json.dumps(dict(username=username, password=password)),
headers={'Content-Type': 'application/json'})
def change_repo_visibility(self, repository, namespace, visibility):
self.conduct('POST', '/api/v1/repository/%s/%s/changevisibility' % (repository, namespace),
data=json.dumps(dict(visibility=visibility)),
headers={'Content-Type': 'application/json'})
class BaseRegistryMixin(object):
def conduct(self, method, url, headers=None, data=None, auth=None, params=None, expected_code=200):
params = params or {}
params['_csrf_token'] = self.csrf_token
headers = headers or {}
auth_tuple = None
if self.docker_token:
headers['X-Docker-Token'] = self.docker_token
if auth == 'sig':
if self.signature:
headers['Authorization'] = 'token ' + self.signature
elif auth == 'jwt':
if self.jwt:
headers['Authorization'] = 'Bearer ' + self.jwt
elif auth:
auth_tuple = auth
response = self.session.request(method, self.get_server_url() + url, headers=headers, data=data,
auth=auth_tuple, params=params)
if response.status_code != expected_code:
print response.text
if 'www-authenticate' in response.headers:
self.signature = response.headers['www-authenticate']
if 'X-Docker-Token' in response.headers:
self.docker_token = response.headers['X-Docker-Token']
self.assertEquals(response.status_code, expected_code)
return response
class V1RegistryMixin(BaseRegistryMixin):
def v1_ping(self):
self.conduct('GET', '/v1/_ping')
class V1RegistryPushMixin(V1RegistryMixin):
def do_push(self, namespace, repository, username, password, images):
auth = (username, password)
# Ping!
self.v1_ping()
# PUT /v1/repositories/{namespace}/{repository}/
data = [{"id": image['id']} for image in images]
self.conduct('PUT', '/v1/repositories/%s/%s' % (namespace, repository),
data=json.dumps(data), auth=auth,
expected_code=201)
for image in images:
# PUT /v1/images/{imageID}/json
self.conduct('PUT', '/v1/images/%s/json' % image['id'],
data=json.dumps(image), auth='sig')
# PUT /v1/images/{imageID}/layer
tar_file_info = tarfile.TarInfo(name='image_name')
tar_file_info.type = tarfile.REGTYPE
tar_file_info.size = len(image['id'])
layer_data = StringIO()
tar_file = tarfile.open(fileobj=layer_data, mode='w|gz')
tar_file.addfile(tar_file_info, StringIO(image['id']))
tar_file.close()
layer_bytes = layer_data.getvalue()
layer_data.close()
self.conduct('PUT', '/v1/images/%s/layer' % image['id'],
data=StringIO(layer_bytes), auth='sig')
# PUT /v1/images/{imageID}/checksum
checksum = compute_simple(StringIO(layer_bytes), json.dumps(image))
self.conduct('PUT', '/v1/images/%s/checksum' % image['id'],
headers={'X-Docker-Checksum-Payload': checksum},
auth='sig')
# PUT /v1/repositories/{namespace}/{repository}/tags/latest
self.conduct('PUT', '/v1/repositories/%s/%s/tags/latest' % (namespace, repository),
data='"' + images[0]['id'] + '"',
auth='sig')
# PUT /v1/repositories/{namespace}/{repository}/images
self.conduct('PUT', '/v1/repositories/%s/%s/images' % (namespace, repository),
expected_code=204,
auth='sig')
class V1RegistryPullMixin(V1RegistryMixin):
def do_pull(self, namespace, repository, username=None, password='password', expected_code=200):
auth = None
if username:
auth = (username, password)
# Ping!
self.v1_ping()
prefix = '/v1/repositories/%s/%s/' % (namespace, repository)
# GET /v1/repositories/{namespace}/{repository}/
self.conduct('GET', prefix + 'images', auth=auth, expected_code=expected_code)
if expected_code != 200:
return
# GET /v1/repositories/{namespace}/{repository}/
result = json.loads(self.conduct('GET', prefix + 'tags', auth='sig').text)
for image_id in result.values():
# /v1/images/{imageID}/{ancestry, json, layer}
image_prefix = '/v1/images/%s/' % image_id
self.conduct('GET', image_prefix + 'ancestry', auth='sig')
self.conduct('GET', image_prefix + 'json', auth='sig')
self.conduct('GET', image_prefix + 'layer', auth='sig')
class V2RegistryMixin(BaseRegistryMixin):
def v2_ping(self):
self.conduct('GET', '/v2/', expected_code=200 if self.jwt else 401, auth='jwt')
def do_auth(self, username, password, namespace, repository, expected_code=200, scopes=[]):
auth = (username, password)
params = {
'account': username,
'scope': 'repository:%s/%s:%s' % (namespace, repository, ','.join(scopes)),
'service': app.config['SERVER_HOSTNAME'],
}
response = self.conduct('GET', '/v2/auth', params=params, auth=(username, password),
expected_code=expected_code)
if expected_code == 200:
response_json = json.loads(response.text)
self.assertIsNotNone(response_json.get('token'))
self.jwt = response_json['token']
class V2RegistryPushMixin(V2RegistryMixin):
def do_push(self, namespace, repository, username, password, images):
# Ping!
self.v2_ping()
# Auth.
self.do_auth(username, password, namespace, repository, scopes=['push', 'pull'])
# Build a fake manifest.
images = [('somelayer', 'some fake data')]
tag_name = 'latest'
builder = SignedManifestBuilder(namespace, repository, tag_name)
for image_id, contents in images:
checksum = 'sha256:' + hashlib.sha256(contents).hexdigest()
builder.add_layer(checksum, json.dumps({'id': image_id, 'data': contents}))
# Push the image's layers.
for image_id, contents in images:
# Layer data should not yet exist.
checksum = 'sha256:' + hashlib.sha256(contents).hexdigest()
self.conduct('HEAD', '/v2/%s/%s/blobs/%s' % (namespace, repository, checksum),
expected_code=404, auth='jwt')
# Start a new upload of the layer data.
response = self.conduct('POST', '/v2/%s/%s/blobs/uploads/' % (namespace, repository),
expected_code=202, auth='jwt')
location = response.headers['Location'][len(self.get_server_url()):]
# PATCH the image data into the layer.
self.conduct('PATCH', location, data=contents, expected_code=204, auth='jwt')
# Finish the layer upload with a PUT.
self.conduct('PUT', location, params=dict(digest=checksum), expected_code=201, auth='jwt')
# Write the manifest.
new_key = RSA.generate(2048)
jwk = RSAKey(key=new_key)
manifest = builder.build(jwk)
self.conduct('PUT', '/v2/%s/%s/manifests/%s' % (namespace, repository, tag_name),
data=manifest.bytes, expected_code=202,
headers={'Content-Type': 'application/json'}, auth='jwt')
class V2RegistryPullMixin(V2RegistryMixin):
def do_pull(self, namespace, repository, username=None, password='password', expected_code=200):
# Ping!
self.v2_ping()
# Auth.
self.do_auth(username, password, namespace, repository, scopes=['pull'],
expected_code=expected_code)
if expected_code != 200:
return
# Retrieve the manifest for the tag.
tag_name = 'latest'
response = self.conduct('GET', '/v2/%s/%s/manifests/%s' % (namespace, repository, tag_name),
auth='jwt')
manifest_data = json.loads(response.text)
for layer in manifest_data['fsLayers']:
blob_id = layer['blobSum']
self.conduct('GET', '/v2/%s/%s/blobs/%s' % (namespace, repository, blob_id),
expected_code=200, auth='jwt')
class RegistryTestsMixin(object):
def test_pull_publicrepo_anonymous(self):
# Add a new repository under the public user, so we have a real repository to pull.
images = [{
'id': 'onlyimagehere'
}]
self.do_push('public', 'newrepo', 'public', 'password', images)
self.clearSession()
# First try to pull the (currently private) repo anonymously, which should fail (since it is
# private)
self.do_pull('public', 'newrepo', expected_code=403)
# Make the repository public.
self.conduct_api_login('public', 'password')
self.change_repo_visibility('public', 'newrepo', 'public')
self.clearSession()
# Pull the repository anonymously, which should succeed because the repository is public.
self.do_pull('public', 'newrepo')
def test_pull_publicrepo_devtable(self):
# Add a new repository under the public user, so we have a real repository to pull.
images = [{
'id': 'onlyimagehere'
}]
self.do_push('public', 'newrepo', 'public', 'password', images)
self.clearSession()
# First try to pull the (currently private) repo as devtable, which should fail as it belongs
# to public.
self.do_pull('public', 'newrepo', 'devtable', 'password', expected_code=403)
# Make the repository public.
self.conduct_api_login('public', 'password')
self.change_repo_visibility('public', 'newrepo', 'public')
self.clearSession()
# Pull the repository as devtable, which should succeed because the repository is public.
self.do_pull('public', 'newrepo', 'devtable', 'password')
def test_pull_private_repo(self):
# Add a new repository under the devtable user, so we have a real repository to pull.
images = [{
'id': 'onlyimagehere'
}]
self.do_push('devtable', 'newrepo', 'devtable', 'password', images)
self.clearSession()
# First try to pull the (currently private) repo as public, which should fail as it belongs
# to devtable.
self.do_pull('devtable', 'newrepo', 'public', 'password', expected_code=403)
# Pull the repository as devtable, which should succeed because the repository is owned by
# devtable.
self.do_pull('devtable', 'newrepo', 'devtable', 'password')
def test_public_no_anonymous_access_with_auth(self):
# Turn off anonymous access.
with TestFeature(self, 'ANONYMOUS_ACCESS', False):
# Add a new repository under the public user, so we have a real repository to pull.
images = [{
'id': 'onlyimagehere'
}]
self.do_push('public', 'newrepo', 'public', 'password', images)
self.clearSession()
# First try to pull the (currently private) repo as devtable, which should fail as it belongs
# to public.
self.do_pull('public', 'newrepo', 'devtable', 'password', expected_code=403)
# Make the repository public.
self.conduct_api_login('public', 'password')
self.change_repo_visibility('public', 'newrepo', 'public')
self.clearSession()
# Pull the repository as devtable, which should succeed because the repository is public.
self.do_pull('public', 'newrepo', 'devtable', 'password')
def test_private_no_anonymous_access(self):
# Turn off anonymous access.
with TestFeature(self, 'ANONYMOUS_ACCESS', False):
# Add a new repository under the public user, so we have a real repository to pull.
images = [{
'id': 'onlyimagehere'
}]
self.do_push('public', 'newrepo', 'public', 'password', images)
self.clearSession()
# First try to pull the (currently private) repo as devtable, which should fail as it belongs
# to public.
self.do_pull('public', 'newrepo', 'devtable', 'password', expected_code=403)
# Pull the repository as public, which should succeed because the repository is owned by public.
self.do_pull('public', 'newrepo', 'public', 'password')
def test_public_no_anonymous_access_no_auth(self):
# Turn off anonymous access.
with TestFeature(self, 'ANONYMOUS_ACCESS', False):
# Add a new repository under the public user, so we have a real repository to pull.
images = [{
'id': 'onlyimagehere'
}]
self.do_push('public', 'newrepo', 'public', 'password', images)
self.clearSession()
# First try to pull the (currently private) repo as anonymous, which should fail as it
# is private.
self.do_pull('public', 'newrepo', expected_code=401)
# Make the repository public.
self.conduct_api_login('public', 'password')
self.change_repo_visibility('public', 'newrepo', 'public')
self.clearSession()
# Try again to pull the (currently public) repo as anonymous, which should fail as
# anonymous access is disabled.
self.do_pull('public', 'newrepo', expected_code=401)
# Pull the repository as public, which should succeed because the repository is owned by public.
self.do_pull('public', 'newrepo', 'public', 'password')
# Pull the repository as devtable, which should succeed because the repository is public.
self.do_pull('public', 'newrepo', 'devtable', 'password')
def test_create_repo_creator_user(self):
images = [{
'id': 'onlyimagehere'
}]
self.do_push('buynlarge', 'newrepo', 'creator', 'password', images)
# Pull the repository as devtable, which should succeed because the repository is owned by the
# org.
self.do_pull('buynlarge', 'newrepo', 'devtable', 'password')
def test_create_repo_robot_owner(self):
# Lookup the robot's password.
self.conduct_api_login('devtable', 'password')
resp = self.conduct('GET', '/api/v1/organization/buynlarge/robots/ownerbot')
robot_token = json.loads(resp.text)['token']
images = [{
'id': 'onlyimagehere'
}]
self.do_push('buynlarge', 'newrepo', 'buynlarge+ownerbot', robot_token, images)
# Pull the repository as devtable, which should succeed because the repository is owned by the
# org.
self.do_pull('buynlarge', 'newrepo', 'devtable', 'password')
def test_create_repo_robot_creator(self):
# Lookup the robot's password.
self.conduct_api_login('devtable', 'password')
resp = self.conduct('GET', '/api/v1/organization/buynlarge/robots/creatorbot')
robot_token = json.loads(resp.text)['token']
images = [{
'id': 'onlyimagehere'
}]
self.do_push('buynlarge', 'newrepo', 'buynlarge+creatorbot', robot_token, images)
# Pull the repository as devtable, which should succeed because the repository is owned by the
# org.
self.do_pull('buynlarge', 'newrepo', 'devtable', 'password')
class V1RegistryTests(V1RegistryPullMixin, V1RegistryPushMixin, RegistryTestsMixin,
RegistryTestCaseMixin, LiveServerTestCase):
""" Tests for V1 registry. """
class V2RegistryTests(V2RegistryPullMixin, V2RegistryPushMixin, RegistryTestsMixin,
RegistryTestCaseMixin, LiveServerTestCase):
""" Tests for V2 registry. """
class V1PushV2PullRegistryTests(V2RegistryPullMixin, V1RegistryPushMixin, RegistryTestsMixin,
RegistryTestCaseMixin, LiveServerTestCase):
""" Tests for V1 push, V2 pull registry. """
class V1PullV2PushRegistryTests(V1RegistryPullMixin, V2RegistryPushMixin, RegistryTestsMixin,
RegistryTestCaseMixin, LiveServerTestCase):
""" Tests for V1 pull, V2 push registry. """
if __name__ == '__main__':
unittest.main()