bea8b9ac53
Implement the minimal changes to the local filesystem storage driver and feed them through the distributed storage driver. Create a digest package which contains digest_tools and checksums. Fix the tests to use the new v1 endpoint locations. Fix repository.delete_instance to properly filter the generated queries to avoid most subquery deletes, but still generate them when not explicitly filtered.
389 lines
14 KiB
Python
389 lines
14 KiB
Python
import unittest
|
|
import requests
|
|
|
|
from flask import request, jsonify
|
|
from flask.blueprints import Blueprint
|
|
from flask.ext.testing import LiveServerTestCase
|
|
|
|
from app import app
|
|
from endpoints.v1 import v1_bp
|
|
from endpoints.api import api_bp
|
|
from initdb import wipe_database, initialize_database, populate_database
|
|
from endpoints.csrf import generate_csrf_token
|
|
|
|
import endpoints.decorated
|
|
import json
|
|
import features
|
|
|
|
import tarfile
|
|
|
|
from cStringIO import StringIO
|
|
from util.checksums import compute_simple
|
|
|
|
try:
|
|
app.register_blueprint(v1_bp, url_prefix='/v1')
|
|
app.register_blueprint(api_bp, url_prefix='/api')
|
|
except ValueError:
|
|
# Blueprint was already registered
|
|
pass
|
|
|
|
|
|
# Add a test blueprint for generating CSRF tokens and setting feature flags.
|
|
testbp = Blueprint('testbp', __name__)
|
|
|
|
@testbp.route('/csrf', methods=['GET'])
|
|
def generate_csrf():
|
|
return generate_csrf_token()
|
|
|
|
@testbp.route('/feature/<feature_name>', methods=['POST'])
|
|
def set_feature(feature_name):
|
|
import features
|
|
old_value = features._FEATURES[feature_name].value
|
|
features._FEATURES[feature_name].value = request.get_json()['value']
|
|
return jsonify({'old_value': old_value})
|
|
|
|
app.register_blueprint(testbp, url_prefix='/__test')
|
|
|
|
|
|
class TestFeature(object):
|
|
""" Helper object which temporarily sets the value of a feature flag.
|
|
"""
|
|
|
|
def __init__(self, test_case, feature_flag, test_value):
|
|
self.test_case = test_case
|
|
self.feature_flag = feature_flag
|
|
self.test_value = test_value
|
|
self.old_value = None
|
|
|
|
def __enter__(self):
|
|
result = self.test_case.conduct('POST', '/__test/feature/' + self.feature_flag,
|
|
data=json.dumps(dict(value=self.test_value)),
|
|
headers={'Content-Type': 'application/json'})
|
|
|
|
result_data = json.loads(result.text)
|
|
self.old_value = result_data['old_value']
|
|
|
|
def __exit__(self, type, value, traceback):
|
|
self.test_case.conduct('POST', '/__test/feature/' + self.feature_flag,
|
|
data=json.dumps(dict(value=self.old_value)),
|
|
headers={'Content-Type': 'application/json'})
|
|
|
|
class RegistryTestCase(LiveServerTestCase):
|
|
maxDiff = None
|
|
|
|
def create_app(self):
|
|
app.config['TESTING'] = True
|
|
return app
|
|
|
|
def setUp(self):
|
|
# Note: We cannot use the normal savepoint-based DB setup here because we are accessing
|
|
# different app instances remotely via a live webserver, which is multiprocess. Therefore, we
|
|
# completely clear the database between tests.
|
|
wipe_database()
|
|
initialize_database()
|
|
populate_database()
|
|
|
|
self.clearSession()
|
|
|
|
def clearSession(self):
|
|
self.session = requests.Session()
|
|
self.signature = None
|
|
self.docker_token = 'true'
|
|
|
|
# Load the CSRF token.
|
|
self.csrf_token = ''
|
|
self.csrf_token = self.conduct('GET', '/__test/csrf').text
|
|
|
|
def conduct(self, method, url, headers=None, data=None, auth=None, expected_code=200):
|
|
headers = headers or {}
|
|
headers['X-Docker-Token'] = self.docker_token
|
|
|
|
if self.signature and not auth:
|
|
headers['Authorization'] = 'token ' + self.signature
|
|
|
|
response = self.session.request(method, self.get_server_url() + url, headers=headers, data=data,
|
|
auth=auth, params=dict(_csrf_token=self.csrf_token))
|
|
if response.status_code != expected_code:
|
|
print response.text
|
|
|
|
if 'www-authenticate' in response.headers:
|
|
self.signature = response.headers['www-authenticate']
|
|
|
|
if 'X-Docker-Token' in response.headers:
|
|
self.docker_token = response.headers['X-Docker-Token']
|
|
|
|
self.assertEquals(response.status_code, expected_code)
|
|
return response
|
|
|
|
def ping(self):
|
|
self.conduct('GET', '/v1/_ping')
|
|
|
|
def do_login(self, username, password='password'):
|
|
self.ping()
|
|
result = self.conduct('POST', '/v1/users/',
|
|
data=json.dumps(dict(username=username, password=password,
|
|
email='bar@example.com')),
|
|
headers={"Content-Type": "application/json"},
|
|
expected_code=400)
|
|
|
|
self.assertEquals(result.text, '"Username or email already exists"')
|
|
self.conduct('GET', '/v1/users/', auth=(username, password))
|
|
|
|
def do_push(self, namespace, repository, username, password, images):
|
|
auth = (username, password)
|
|
|
|
# Ping!
|
|
self.ping()
|
|
|
|
# PUT /v1/repositories/{namespace}/{repository}/
|
|
data = [{"id": image['id']} for image in images]
|
|
self.conduct('PUT', '/v1/repositories/%s/%s' % (namespace, repository),
|
|
data=json.dumps(data), auth=auth,
|
|
expected_code=201)
|
|
|
|
for image in images:
|
|
# PUT /v1/images/{imageID}/json
|
|
self.conduct('PUT', '/v1/images/%s/json' % image['id'], data=json.dumps(image))
|
|
|
|
# PUT /v1/images/{imageID}/layer
|
|
tar_file_info = tarfile.TarInfo(name='image_name')
|
|
tar_file_info.type = tarfile.REGTYPE
|
|
tar_file_info.size = len(image['id'])
|
|
|
|
layer_data = StringIO()
|
|
|
|
tar_file = tarfile.open(fileobj=layer_data, mode='w|gz')
|
|
tar_file.addfile(tar_file_info, StringIO(image['id']))
|
|
tar_file.close()
|
|
|
|
layer_bytes = layer_data.getvalue()
|
|
layer_data.close()
|
|
|
|
self.conduct('PUT', '/v1/images/%s/layer' % image['id'], data=StringIO(layer_bytes))
|
|
|
|
# PUT /v1/images/{imageID}/checksum
|
|
checksum = compute_simple(StringIO(layer_bytes), json.dumps(image))
|
|
self.conduct('PUT', '/v1/images/%s/checksum' % image['id'],
|
|
headers={'X-Docker-Checksum-Payload': checksum})
|
|
|
|
|
|
# PUT /v1/repositories/{namespace}/{repository}/tags/latest
|
|
self.conduct('PUT', '/v1/repositories/%s/%s/tags/latest' % (namespace, repository),
|
|
data='"' + images[0]['id'] + '"')
|
|
|
|
# PUT /v1/repositories/{namespace}/{repository}/images
|
|
self.conduct('PUT', '/v1/repositories/%s/%s/images' % (namespace, repository),
|
|
expected_code=204)
|
|
|
|
|
|
def do_pull(self, namespace, repository, username=None, password='password', expected_code=200):
|
|
auth = None
|
|
if username:
|
|
auth = (username, password)
|
|
|
|
# Ping!
|
|
self.ping()
|
|
|
|
prefix = '/v1/repositories/%s/%s/' % (namespace, repository)
|
|
|
|
# GET /v1/repositories/{namespace}/{repository}/
|
|
self.conduct('GET', prefix + 'images', auth=auth, expected_code=expected_code)
|
|
if expected_code != 200:
|
|
return
|
|
|
|
# GET /v1/repositories/{namespace}/{repository}/
|
|
result = json.loads(self.conduct('GET', prefix + 'tags').text)
|
|
|
|
for image_id in result.values():
|
|
# /v1/images/{imageID}/{ancestry, json, layer}
|
|
image_prefix = '/v1/images/%s/' % image_id
|
|
self.conduct('GET', image_prefix + 'ancestry')
|
|
self.conduct('GET', image_prefix + 'json')
|
|
self.conduct('GET', image_prefix + 'layer')
|
|
|
|
def conduct_api_login(self, username, password):
|
|
self.conduct('POST', '/api/v1/signin',
|
|
data=json.dumps(dict(username=username, password=password)),
|
|
headers={'Content-Type': 'application/json'})
|
|
|
|
def change_repo_visibility(self, repository, namespace, visibility):
|
|
self.conduct('POST', '/api/v1/repository/%s/%s/changevisibility' % (repository, namespace),
|
|
data=json.dumps(dict(visibility=visibility)),
|
|
headers={'Content-Type': 'application/json'})
|
|
|
|
|
|
class RegistryTests(RegistryTestCase):
|
|
def test_pull_publicrepo_anonymous(self):
|
|
# Add a new repository under the public user, so we have a real repository to pull.
|
|
images = [{
|
|
'id': 'onlyimagehere'
|
|
}]
|
|
self.do_push('public', 'newrepo', 'public', 'password', images)
|
|
self.clearSession()
|
|
|
|
# First try to pull the (currently private) repo anonymously, which should fail (since it is
|
|
# private)
|
|
self.do_pull('public', 'newrepo', expected_code=403)
|
|
|
|
# Make the repository public.
|
|
self.conduct_api_login('public', 'password')
|
|
self.change_repo_visibility('public', 'newrepo', 'public')
|
|
self.clearSession()
|
|
|
|
# Pull the repository anonymously, which should succeed because the repository is public.
|
|
self.do_pull('public', 'newrepo')
|
|
|
|
|
|
def test_pull_publicrepo_devtable(self):
|
|
# Add a new repository under the public user, so we have a real repository to pull.
|
|
images = [{
|
|
'id': 'onlyimagehere'
|
|
}]
|
|
self.do_push('public', 'newrepo', 'public', 'password', images)
|
|
self.clearSession()
|
|
|
|
# First try to pull the (currently private) repo as devtable, which should fail as it belongs
|
|
# to public.
|
|
self.do_pull('public', 'newrepo', 'devtable', 'password', expected_code=403)
|
|
|
|
# Make the repository public.
|
|
self.conduct_api_login('public', 'password')
|
|
self.change_repo_visibility('public', 'newrepo', 'public')
|
|
self.clearSession()
|
|
|
|
# Pull the repository as devtable, which should succeed because the repository is public.
|
|
self.do_pull('public', 'newrepo', 'devtable', 'password')
|
|
|
|
|
|
def test_pull_private_repo(self):
|
|
# Add a new repository under the devtable user, so we have a real repository to pull.
|
|
images = [{
|
|
'id': 'onlyimagehere'
|
|
}]
|
|
self.do_push('devtable', 'newrepo', 'devtable', 'password', images)
|
|
self.clearSession()
|
|
|
|
# First try to pull the (currently private) repo as public, which should fail as it belongs
|
|
# to devtable.
|
|
self.do_pull('devtable', 'newrepo', 'public', 'password', expected_code=403)
|
|
|
|
# Pull the repository as devtable, which should succeed because the repository is owned by
|
|
# devtable.
|
|
self.do_pull('devtable', 'newrepo', 'devtable', 'password')
|
|
|
|
|
|
def test_public_no_anonymous_access_with_auth(self):
|
|
# Turn off anonymous access.
|
|
with TestFeature(self, 'ANONYMOUS_ACCESS', False):
|
|
# Add a new repository under the public user, so we have a real repository to pull.
|
|
images = [{
|
|
'id': 'onlyimagehere'
|
|
}]
|
|
self.do_push('public', 'newrepo', 'public', 'password', images)
|
|
self.clearSession()
|
|
|
|
# First try to pull the (currently private) repo as devtable, which should fail as it belongs
|
|
# to public.
|
|
self.do_pull('public', 'newrepo', 'devtable', 'password', expected_code=403)
|
|
|
|
# Make the repository public.
|
|
self.conduct_api_login('public', 'password')
|
|
self.change_repo_visibility('public', 'newrepo', 'public')
|
|
self.clearSession()
|
|
|
|
# Pull the repository as devtable, which should succeed because the repository is public.
|
|
self.do_pull('public', 'newrepo', 'devtable', 'password')
|
|
|
|
|
|
def test_private_no_anonymous_access(self):
|
|
# Turn off anonymous access.
|
|
with TestFeature(self, 'ANONYMOUS_ACCESS', False):
|
|
# Add a new repository under the public user, so we have a real repository to pull.
|
|
images = [{
|
|
'id': 'onlyimagehere'
|
|
}]
|
|
self.do_push('public', 'newrepo', 'public', 'password', images)
|
|
self.clearSession()
|
|
|
|
# First try to pull the (currently private) repo as devtable, which should fail as it belongs
|
|
# to public.
|
|
self.do_pull('public', 'newrepo', 'devtable', 'password', expected_code=403)
|
|
|
|
# Pull the repository as public, which should succeed because the repository is owned by public.
|
|
self.do_pull('public', 'newrepo', 'public', 'password')
|
|
|
|
|
|
def test_public_no_anonymous_access_no_auth(self):
|
|
# Turn off anonymous access.
|
|
with TestFeature(self, 'ANONYMOUS_ACCESS', False):
|
|
# Add a new repository under the public user, so we have a real repository to pull.
|
|
images = [{
|
|
'id': 'onlyimagehere'
|
|
}]
|
|
self.do_push('public', 'newrepo', 'public', 'password', images)
|
|
self.clearSession()
|
|
|
|
# First try to pull the (currently private) repo as anonymous, which should fail as it
|
|
# is private.
|
|
self.do_pull('public', 'newrepo', expected_code=401)
|
|
|
|
# Make the repository public.
|
|
self.conduct_api_login('public', 'password')
|
|
self.change_repo_visibility('public', 'newrepo', 'public')
|
|
self.clearSession()
|
|
|
|
# Try again to pull the (currently public) repo as anonymous, which should fail as
|
|
# anonymous access is disabled.
|
|
self.do_pull('public', 'newrepo', expected_code=401)
|
|
|
|
# Pull the repository as public, which should succeed because the repository is owned by public.
|
|
self.do_pull('public', 'newrepo', 'public', 'password')
|
|
|
|
# Pull the repository as devtable, which should succeed because the repository is public.
|
|
self.do_pull('public', 'newrepo', 'devtable', 'password')
|
|
|
|
|
|
def test_create_repo_creator_user(self):
|
|
images = [{
|
|
'id': 'onlyimagehere'
|
|
}]
|
|
self.do_push('buynlarge', 'newrepo', 'creator', 'password', images)
|
|
|
|
# Pull the repository as devtable, which should succeed because the repository is owned by the
|
|
# org.
|
|
self.do_pull('buynlarge', 'newrepo', 'devtable', 'password')
|
|
|
|
|
|
def test_create_repo_robot_owner(self):
|
|
# Lookup the robot's password.
|
|
self.conduct_api_login('devtable', 'password')
|
|
resp = self.conduct('GET', '/api/v1/organization/buynlarge/robots/ownerbot')
|
|
robot_token = json.loads(resp.text)['token']
|
|
|
|
images = [{
|
|
'id': 'onlyimagehere'
|
|
}]
|
|
self.do_push('buynlarge', 'newrepo', 'buynlarge+ownerbot', robot_token, images)
|
|
|
|
# Pull the repository as devtable, which should succeed because the repository is owned by the
|
|
# org.
|
|
self.do_pull('buynlarge', 'newrepo', 'devtable', 'password')
|
|
|
|
|
|
def test_create_repo_robot_creator(self):
|
|
# Lookup the robot's password.
|
|
self.conduct_api_login('devtable', 'password')
|
|
resp = self.conduct('GET', '/api/v1/organization/buynlarge/robots/creatorbot')
|
|
robot_token = json.loads(resp.text)['token']
|
|
|
|
images = [{
|
|
'id': 'onlyimagehere'
|
|
}]
|
|
self.do_push('buynlarge', 'newrepo', 'buynlarge+creatorbot', robot_token, images)
|
|
|
|
# Pull the repository as devtable, which should succeed because the repository is owned by the
|
|
# org.
|
|
self.do_pull('buynlarge', 'newrepo', 'devtable', 'password')
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|