Merge pull request #53 from coreos-inc/v1test

Add tests for the registry that mimic Docker's calls
This commit is contained in:
Jake Moshenko 2015-06-02 12:24:42 -04:00
commit 7bc5f7a1ca
5 changed files with 262 additions and 8 deletions

View file

@ -53,6 +53,7 @@ RUN mkdir /usr/local/nginx/logs/
# Run the tests
RUN TEST=true venv/bin/python -m unittest discover -f
RUN TEST=true venv/bin/python -m test.registry_tests -f
VOLUME ["/conf/stack", "/var/log", "/datastorage", "/tmp", "/conf/etcd"]

View file

@ -51,4 +51,5 @@ mock
psutil
stringscore
python-swiftclient
python-keystoneclient
python-keystoneclient
Flask-Testing

View file

@ -5,6 +5,7 @@ Flask-Login==0.2.11
Flask-Mail==0.9.1
Flask-Principal==0.4.0
Flask-RESTful==0.2.12
Flask-Testing==0.4.2
Jinja2==2.7.3
Logentries==0.7
Mako==1.0.1

View file

@ -1,27 +1,31 @@
from storage.basestorage import BaseStorage
_FAKE_STORAGE_MAP = {}
class FakeStorage(BaseStorage):
def _init_path(self, path=None, create=False):
return path
def get_content(self, path):
raise IOError('Fake files are fake!')
if not path in _FAKE_STORAGE_MAP:
raise IOError('Fake file %s not found' % path)
return _FAKE_STORAGE_MAP.get(path)
def put_content(self, path, content):
return path
_FAKE_STORAGE_MAP[path] = content
def stream_read(self, path):
yield ''
yield _FAKE_STORAGE_MAP[path]
def stream_write(self, path, fp, content_type=None, content_encoding=None):
pass
_FAKE_STORAGE_MAP[path] = fp.read()
def remove(self, path):
pass
_FAKE_STORAGE_MAP.pop(path, None)
def exists(self, path):
return False
return path in _FAKE_STORAGE_MAP
def get_checksum(self, path):
return 'abcdefg'
return path

247
test/registry_tests.py Normal file
View file

@ -0,0 +1,247 @@
import unittest
import requests
from flask.blueprints import Blueprint
from flask.ext.testing import LiveServerTestCase
from app import app
from endpoints.registry import registry
from endpoints.index import index
from endpoints.tags import tags
from endpoints.api import api_bp
from initdb import wipe_database, initialize_database, populate_database
from endpoints.csrf import generate_csrf_token
import endpoints.decorated
import json
import tarfile
from cStringIO import StringIO
from util.checksums import compute_simple
try:
app.register_blueprint(index, url_prefix='/v1')
app.register_blueprint(tags, url_prefix='/v1')
app.register_blueprint(registry, url_prefix='/v1')
app.register_blueprint(api_bp, url_prefix='/api')
except ValueError:
# Blueprint was already registered
pass
# Add a test blueprint for generating CSRF tokens.
testbp = Blueprint('testbp', __name__)
@testbp.route('/csrf', methods=['GET'])
def generate_csrf():
return generate_csrf_token()
app.register_blueprint(testbp, url_prefix='/__test')
class RegistryTestCase(LiveServerTestCase):
maxDiff = None
def create_app(self):
app.config['TESTING'] = True
return app
def setUp(self):
# Note: We cannot use the normal savepoint-based DB setup here because we are accessing
# different app instances remotely via a live webserver, which is multiprocess. Therefore, we
# completely clear the database between tests.
wipe_database()
initialize_database()
populate_database()
self.clearSession()
def clearSession(self):
self.session = requests.Session()
self.signature = None
self.docker_token = 'true'
# Load the CSRF token.
self.csrf_token = ''
self.csrf_token = self.conduct('GET', '/__test/csrf').text
def conduct(self, method, url, headers=None, data=None, auth=None, expected_code=200):
headers = headers or {}
headers['X-Docker-Token'] = self.docker_token
if self.signature and not auth:
headers['Authorization'] = 'token ' + self.signature
response = self.session.request(method, self.get_server_url() + url, headers=headers, data=data,
auth=auth, params=dict(_csrf_token=self.csrf_token))
if response.status_code != expected_code:
print response.text
if 'www-authenticate' in response.headers:
self.signature = response.headers['www-authenticate']
if 'X-Docker-Token' in response.headers:
self.docker_token = response.headers['X-Docker-Token']
self.assertEquals(response.status_code, expected_code)
return response
def ping(self):
self.conduct('GET', '/v1/_ping')
def do_login(self, username, password='password'):
self.ping()
result = self.conduct('POST', '/v1/users/',
data=json.dumps(dict(username=username, password=password,
email='bar@example.com')),
headers={"Content-Type": "application/json"},
expected_code=400)
self.assertEquals(result.text, '"Username or email already exists"')
self.conduct('GET', '/v1/users/', auth=(username, password))
def do_push(self, namespace, repository, username, password, images):
auth = (username, password)
# Ping!
self.ping()
# PUT /v1/repositories/{namespace}/{repository}/
data = [{"id": image['id']} for image in images]
self.conduct('PUT', '/v1/repositories/%s/%s' % (namespace, repository),
data=json.dumps(data), auth=auth,
expected_code=201)
for image in images:
# PUT /v1/images/{imageID}/json
self.conduct('PUT', '/v1/images/%s/json' % image['id'], data=json.dumps(image))
# PUT /v1/images/{imageID}/layer
tar_file_info = tarfile.TarInfo(name='image_name')
tar_file_info.type = tarfile.REGTYPE
tar_file_info.size = len(image['id'])
layer_data = StringIO()
tar_file = tarfile.open(fileobj=layer_data, mode='w|gz')
tar_file.addfile(tar_file_info, StringIO(image['id']))
tar_file.close()
layer_bytes = layer_data.getvalue()
layer_data.close()
self.conduct('PUT', '/v1/images/%s/layer' % image['id'], data=StringIO(layer_bytes))
# PUT /v1/images/{imageID}/checksum
checksum = compute_simple(StringIO(layer_bytes), json.dumps(image))
self.conduct('PUT', '/v1/images/%s/checksum' % image['id'],
headers={'X-Docker-Checksum-Payload': checksum})
# PUT /v1/repositories/{namespace}/{repository}/tags/latest
self.conduct('PUT', '/v1/repositories/%s/%s/tags/latest' % (namespace, repository),
data='"' + images[0]['id'] + '"')
# PUT /v1/repositories/{namespace}/{repository}/images
self.conduct('PUT', '/v1/repositories/%s/%s/images' % (namespace, repository),
expected_code=204)
def do_pull(self, namespace, repository, username=None, password='password', expected_code=200):
auth = None
if username:
auth = (username, password)
# Ping!
self.ping()
prefix = '/v1/repositories/%s/%s/' % (namespace, repository)
# GET /v1/repositories/{namespace}/{repository}/
self.conduct('GET', prefix + 'images', auth=auth, expected_code=expected_code)
if expected_code != 200:
return
# GET /v1/repositories/{namespace}/{repository}/
result = json.loads(self.conduct('GET', prefix + 'tags').text)
for image_id in result.values():
# /v1/images/{imageID}/{ancestry, json, layer}
image_prefix = '/v1/images/%s/' % image_id
self.conduct('GET', image_prefix + 'ancestry')
self.conduct('GET', image_prefix + 'json')
self.conduct('GET', image_prefix + 'layer')
def conduct_api_login(self, username, password):
self.conduct('POST', '/api/v1/signin',
data=json.dumps(dict(username=username, password=password)),
headers={'Content-Type': 'application/json'})
def change_repo_visibility(self, repository, namespace, visibility):
self.conduct('POST', '/api/v1/repository/%s/%s/changevisibility' % (repository, namespace),
data=json.dumps(dict(visibility=visibility)),
headers={'Content-Type': 'application/json'})
class RegistryTests(RegistryTestCase):
def test_pull_publicrepo_anonymous(self):
# Add a new repository under the public user, so we have a real repository to pull.
images = [{
'id': 'onlyimagehere'
}]
self.do_push('public', 'newrepo', 'public', 'password', images)
self.clearSession()
# First try to pull the (currently private) repo anonymously, which should fail (since it is
# private)
self.do_pull('public', 'newrepo', expected_code=403)
# Make the repository public.
self.conduct_api_login('public', 'password')
self.change_repo_visibility('public', 'newrepo', 'public')
self.clearSession()
# Pull the repository anonymously, which should succeed because the repository is public.
self.do_pull('public', 'newrepo')
def test_pull_publicrepo_devtable(self):
# Add a new repository under the public user, so we have a real repository to pull.
images = [{
'id': 'onlyimagehere'
}]
self.do_push('public', 'newrepo', 'public', 'password', images)
self.clearSession()
# First try to pull the (currently private) repo as devtable, which should fail as it belongs
# to public.
self.do_pull('public', 'newrepo', 'devtable', 'password', expected_code=403)
# Make the repository public.
self.conduct_api_login('public', 'password')
self.change_repo_visibility('public', 'newrepo', 'public')
self.clearSession()
# Pull the repository as devtable, which should succeed because the repository is public.
self.do_pull('public', 'newrepo', 'devtable', 'password')
def test_pull_private_repo(self):
# Add a new repository under the devtable user, so we have a real repository to pull.
images = [{
'id': 'onlyimagehere'
}]
self.do_push('devtable', 'newrepo', 'devtable', 'password', images)
self.clearSession()
# First try to pull the (currently private) repo as public, which should fail as it belongs
# to devtable.
self.do_pull('devtable', 'newrepo', 'public', 'password', expected_code=403)
# Pull the repository as devtable, which should succeed because the repository is owned by
# devtable.
self.do_pull('devtable', 'newrepo', 'devtable', 'password')
if __name__ == '__main__':
unittest.main()