This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/test/registry_tests.py
Joseph Schorr 075c75d031 Change to always granting a signed token if there is a valid user OR if there is valid permissions on a repository
This fixes the issue whereby attempting to pull a public repository as an authenticated user with anonymous access disabled caused an unexpected 401. This change also adds tests for a few other use cases to verify we haven't broken anything.
2015-06-02 15:16:22 -04:00

350 lines
12 KiB
Python

import unittest
import requests
from flask import request, jsonify
from flask.blueprints import Blueprint
from flask.ext.testing import LiveServerTestCase
from app import app
from endpoints.registry import registry
from endpoints.index import index
from endpoints.tags import tags
from endpoints.api import api_bp
from initdb import wipe_database, initialize_database, populate_database
from endpoints.csrf import generate_csrf_token
import endpoints.decorated
import json
import features
import tarfile
from cStringIO import StringIO
from util.checksums import compute_simple
try:
app.register_blueprint(index, url_prefix='/v1')
app.register_blueprint(tags, url_prefix='/v1')
app.register_blueprint(registry, url_prefix='/v1')
app.register_blueprint(api_bp, url_prefix='/api')
except ValueError:
# Blueprint was already registered
pass
# Add a test blueprint for generating CSRF tokens and setting feature flags.
testbp = Blueprint('testbp', __name__)
@testbp.route('/csrf', methods=['GET'])
def generate_csrf():
return generate_csrf_token()
@testbp.route('/feature/<feature_name>', methods=['POST'])
def set_feature(feature_name):
import features
old_value = features._FEATURES[feature_name].value
features._FEATURES[feature_name].value = request.get_json()['value']
return jsonify({'old_value': old_value})
app.register_blueprint(testbp, url_prefix='/__test')
class TestFeature(object):
""" Helper object which temporarily sets the value of a feature flag.
"""
def __init__(self, test_case, feature_flag, test_value):
self.test_case = test_case
self.feature_flag = feature_flag
self.test_value = test_value
self.old_value = None
def __enter__(self):
result = self.test_case.conduct('POST', '/__test/feature/' + self.feature_flag,
data=json.dumps(dict(value=self.test_value)),
headers={'Content-Type': 'application/json'})
result_data = json.loads(result.text)
self.old_value = result_data['old_value']
def __exit__(self, type, value, traceback):
self.test_case.conduct('POST', '/__test/feature/' + self.feature_flag,
data=json.dumps(dict(value=self.old_value)),
headers={'Content-Type': 'application/json'})
class RegistryTestCase(LiveServerTestCase):
maxDiff = None
def create_app(self):
app.config['TESTING'] = True
return app
def setUp(self):
# Note: We cannot use the normal savepoint-based DB setup here because we are accessing
# different app instances remotely via a live webserver, which is multiprocess. Therefore, we
# completely clear the database between tests.
wipe_database()
initialize_database()
populate_database()
self.clearSession()
def clearSession(self):
self.session = requests.Session()
self.signature = None
self.docker_token = 'true'
# Load the CSRF token.
self.csrf_token = ''
self.csrf_token = self.conduct('GET', '/__test/csrf').text
def conduct(self, method, url, headers=None, data=None, auth=None, expected_code=200):
headers = headers or {}
headers['X-Docker-Token'] = self.docker_token
if self.signature and not auth:
headers['Authorization'] = 'token ' + self.signature
response = self.session.request(method, self.get_server_url() + url, headers=headers, data=data,
auth=auth, params=dict(_csrf_token=self.csrf_token))
if response.status_code != expected_code:
print response.text
if 'www-authenticate' in response.headers:
self.signature = response.headers['www-authenticate']
if 'X-Docker-Token' in response.headers:
self.docker_token = response.headers['X-Docker-Token']
self.assertEquals(response.status_code, expected_code)
return response
def ping(self):
self.conduct('GET', '/v1/_ping')
def do_login(self, username, password='password'):
self.ping()
result = self.conduct('POST', '/v1/users/',
data=json.dumps(dict(username=username, password=password,
email='bar@example.com')),
headers={"Content-Type": "application/json"},
expected_code=400)
self.assertEquals(result.text, '"Username or email already exists"')
self.conduct('GET', '/v1/users/', auth=(username, password))
def do_push(self, namespace, repository, username, password, images):
auth = (username, password)
# Ping!
self.ping()
# PUT /v1/repositories/{namespace}/{repository}/
data = [{"id": image['id']} for image in images]
self.conduct('PUT', '/v1/repositories/%s/%s' % (namespace, repository),
data=json.dumps(data), auth=auth,
expected_code=201)
for image in images:
# PUT /v1/images/{imageID}/json
self.conduct('PUT', '/v1/images/%s/json' % image['id'], data=json.dumps(image))
# PUT /v1/images/{imageID}/layer
tar_file_info = tarfile.TarInfo(name='image_name')
tar_file_info.type = tarfile.REGTYPE
tar_file_info.size = len(image['id'])
layer_data = StringIO()
tar_file = tarfile.open(fileobj=layer_data, mode='w|gz')
tar_file.addfile(tar_file_info, StringIO(image['id']))
tar_file.close()
layer_bytes = layer_data.getvalue()
layer_data.close()
self.conduct('PUT', '/v1/images/%s/layer' % image['id'], data=StringIO(layer_bytes))
# PUT /v1/images/{imageID}/checksum
checksum = compute_simple(StringIO(layer_bytes), json.dumps(image))
self.conduct('PUT', '/v1/images/%s/checksum' % image['id'],
headers={'X-Docker-Checksum-Payload': checksum})
# PUT /v1/repositories/{namespace}/{repository}/tags/latest
self.conduct('PUT', '/v1/repositories/%s/%s/tags/latest' % (namespace, repository),
data='"' + images[0]['id'] + '"')
# PUT /v1/repositories/{namespace}/{repository}/images
self.conduct('PUT', '/v1/repositories/%s/%s/images' % (namespace, repository),
expected_code=204)
def do_pull(self, namespace, repository, username=None, password='password', expected_code=200):
auth = None
if username:
auth = (username, password)
# Ping!
self.ping()
prefix = '/v1/repositories/%s/%s/' % (namespace, repository)
# GET /v1/repositories/{namespace}/{repository}/
self.conduct('GET', prefix + 'images', auth=auth, expected_code=expected_code)
if expected_code != 200:
return
# GET /v1/repositories/{namespace}/{repository}/
result = json.loads(self.conduct('GET', prefix + 'tags').text)
for image_id in result.values():
# /v1/images/{imageID}/{ancestry, json, layer}
image_prefix = '/v1/images/%s/' % image_id
self.conduct('GET', image_prefix + 'ancestry')
self.conduct('GET', image_prefix + 'json')
self.conduct('GET', image_prefix + 'layer')
def conduct_api_login(self, username, password):
self.conduct('POST', '/api/v1/signin',
data=json.dumps(dict(username=username, password=password)),
headers={'Content-Type': 'application/json'})
def change_repo_visibility(self, repository, namespace, visibility):
self.conduct('POST', '/api/v1/repository/%s/%s/changevisibility' % (repository, namespace),
data=json.dumps(dict(visibility=visibility)),
headers={'Content-Type': 'application/json'})
class RegistryTests(RegistryTestCase):
def test_pull_publicrepo_anonymous(self):
# Add a new repository under the public user, so we have a real repository to pull.
images = [{
'id': 'onlyimagehere'
}]
self.do_push('public', 'newrepo', 'public', 'password', images)
self.clearSession()
# First try to pull the (currently private) repo anonymously, which should fail (since it is
# private)
self.do_pull('public', 'newrepo', expected_code=403)
# Make the repository public.
self.conduct_api_login('public', 'password')
self.change_repo_visibility('public', 'newrepo', 'public')
self.clearSession()
# Pull the repository anonymously, which should succeed because the repository is public.
self.do_pull('public', 'newrepo')
def test_pull_publicrepo_devtable(self):
# Add a new repository under the public user, so we have a real repository to pull.
images = [{
'id': 'onlyimagehere'
}]
self.do_push('public', 'newrepo', 'public', 'password', images)
self.clearSession()
# First try to pull the (currently private) repo as devtable, which should fail as it belongs
# to public.
self.do_pull('public', 'newrepo', 'devtable', 'password', expected_code=403)
# Make the repository public.
self.conduct_api_login('public', 'password')
self.change_repo_visibility('public', 'newrepo', 'public')
self.clearSession()
# Pull the repository as devtable, which should succeed because the repository is public.
self.do_pull('public', 'newrepo', 'devtable', 'password')
def test_pull_private_repo(self):
# Add a new repository under the devtable user, so we have a real repository to pull.
images = [{
'id': 'onlyimagehere'
}]
self.do_push('devtable', 'newrepo', 'devtable', 'password', images)
self.clearSession()
# First try to pull the (currently private) repo as public, which should fail as it belongs
# to devtable.
self.do_pull('devtable', 'newrepo', 'public', 'password', expected_code=403)
# Pull the repository as devtable, which should succeed because the repository is owned by
# devtable.
self.do_pull('devtable', 'newrepo', 'devtable', 'password')
def test_public_no_anonymous_access_with_auth(self):
# Turn off anonymous access.
with TestFeature(self, 'ANONYMOUS_ACCESS', False):
# Add a new repository under the public user, so we have a real repository to pull.
images = [{
'id': 'onlyimagehere'
}]
self.do_push('public', 'newrepo', 'public', 'password', images)
self.clearSession()
# First try to pull the (currently private) repo as devtable, which should fail as it belongs
# to public.
self.do_pull('public', 'newrepo', 'devtable', 'password', expected_code=403)
# Make the repository public.
self.conduct_api_login('public', 'password')
self.change_repo_visibility('public', 'newrepo', 'public')
self.clearSession()
# Pull the repository as devtable, which should succeed because the repository is public.
self.do_pull('public', 'newrepo', 'devtable', 'password')
def test_private_no_anonymous_access(self):
# Turn off anonymous access.
with TestFeature(self, 'ANONYMOUS_ACCESS', False):
# Add a new repository under the public user, so we have a real repository to pull.
images = [{
'id': 'onlyimagehere'
}]
self.do_push('public', 'newrepo', 'public', 'password', images)
self.clearSession()
# First try to pull the (currently private) repo as devtable, which should fail as it belongs
# to public.
self.do_pull('public', 'newrepo', 'devtable', 'password', expected_code=403)
# Pull the repository as public, which should succeed because the repository is owned by public.
self.do_pull('public', 'newrepo', 'public', 'password')
def test_public_no_anonymous_access_no_auth(self):
# Turn off anonymous access.
with TestFeature(self, 'ANONYMOUS_ACCESS', False):
# Add a new repository under the public user, so we have a real repository to pull.
images = [{
'id': 'onlyimagehere'
}]
self.do_push('public', 'newrepo', 'public', 'password', images)
self.clearSession()
# First try to pull the (currently private) repo as anonymous, which should fail as it
# is private.
self.do_pull('public', 'newrepo', expected_code=401)
# Make the repository public.
self.conduct_api_login('public', 'password')
self.change_repo_visibility('public', 'newrepo', 'public')
self.clearSession()
# Try again to pull the (currently public) repo as anonymous, which should fail as
# anonymous access is disabled.
self.do_pull('public', 'newrepo', expected_code=401)
# Pull the repository as public, which should succeed because the repository is owned by public.
self.do_pull('public', 'newrepo', 'public', 'password')
# Pull the repository as devtable, which should succeed because the repository is public.
self.do_pull('public', 'newrepo', 'devtable', 'password')
if __name__ == '__main__':
unittest.main()