Merge branch 'master' into nolurk
This commit is contained in:
commit
c0e995c1d4
43 changed files with 1091 additions and 127 deletions
247
test/registry_tests.py
Normal file
247
test/registry_tests.py
Normal file
|
@ -0,0 +1,247 @@
|
|||
import unittest
|
||||
import requests
|
||||
|
||||
from flask.blueprints import Blueprint
|
||||
from flask.ext.testing import LiveServerTestCase
|
||||
|
||||
from app import app
|
||||
from endpoints.registry import registry
|
||||
from endpoints.index import index
|
||||
from endpoints.tags import tags
|
||||
from endpoints.api import api_bp
|
||||
from initdb import wipe_database, initialize_database, populate_database
|
||||
from endpoints.csrf import generate_csrf_token
|
||||
|
||||
import endpoints.decorated
|
||||
import json
|
||||
|
||||
import tarfile
|
||||
|
||||
from cStringIO import StringIO
|
||||
from util.checksums import compute_simple
|
||||
|
||||
try:
|
||||
app.register_blueprint(index, url_prefix='/v1')
|
||||
app.register_blueprint(tags, url_prefix='/v1')
|
||||
app.register_blueprint(registry, url_prefix='/v1')
|
||||
app.register_blueprint(api_bp, url_prefix='/api')
|
||||
except ValueError:
|
||||
# Blueprint was already registered
|
||||
pass
|
||||
|
||||
|
||||
# Add a test blueprint for generating CSRF tokens.
|
||||
testbp = Blueprint('testbp', __name__)
|
||||
@testbp.route('/csrf', methods=['GET'])
|
||||
def generate_csrf():
|
||||
return generate_csrf_token()
|
||||
|
||||
app.register_blueprint(testbp, url_prefix='/__test')
|
||||
|
||||
|
||||
class RegistryTestCase(LiveServerTestCase):
|
||||
maxDiff = None
|
||||
|
||||
def create_app(self):
|
||||
app.config['TESTING'] = True
|
||||
return app
|
||||
|
||||
def setUp(self):
|
||||
# Note: We cannot use the normal savepoint-based DB setup here because we are accessing
|
||||
# different app instances remotely via a live webserver, which is multiprocess. Therefore, we
|
||||
# completely clear the database between tests.
|
||||
wipe_database()
|
||||
initialize_database()
|
||||
populate_database()
|
||||
|
||||
self.clearSession()
|
||||
|
||||
def clearSession(self):
|
||||
self.session = requests.Session()
|
||||
self.signature = None
|
||||
self.docker_token = 'true'
|
||||
|
||||
# Load the CSRF token.
|
||||
self.csrf_token = ''
|
||||
self.csrf_token = self.conduct('GET', '/__test/csrf').text
|
||||
|
||||
def conduct(self, method, url, headers=None, data=None, auth=None, expected_code=200):
|
||||
headers = headers or {}
|
||||
headers['X-Docker-Token'] = self.docker_token
|
||||
|
||||
if self.signature and not auth:
|
||||
headers['Authorization'] = 'token ' + self.signature
|
||||
|
||||
response = self.session.request(method, self.get_server_url() + url, headers=headers, data=data,
|
||||
auth=auth, params=dict(_csrf_token=self.csrf_token))
|
||||
if response.status_code != expected_code:
|
||||
print response.text
|
||||
|
||||
if 'www-authenticate' in response.headers:
|
||||
self.signature = response.headers['www-authenticate']
|
||||
|
||||
if 'X-Docker-Token' in response.headers:
|
||||
self.docker_token = response.headers['X-Docker-Token']
|
||||
|
||||
self.assertEquals(response.status_code, expected_code)
|
||||
return response
|
||||
|
||||
def ping(self):
|
||||
self.conduct('GET', '/v1/_ping')
|
||||
|
||||
def do_login(self, username, password='password'):
|
||||
self.ping()
|
||||
result = self.conduct('POST', '/v1/users/',
|
||||
data=json.dumps(dict(username=username, password=password,
|
||||
email='bar@example.com')),
|
||||
headers={"Content-Type": "application/json"},
|
||||
expected_code=400)
|
||||
|
||||
self.assertEquals(result.text, '"Username or email already exists"')
|
||||
self.conduct('GET', '/v1/users/', auth=(username, password))
|
||||
|
||||
def do_push(self, namespace, repository, username, password, images):
|
||||
auth = (username, password)
|
||||
|
||||
# Ping!
|
||||
self.ping()
|
||||
|
||||
# PUT /v1/repositories/{namespace}/{repository}/
|
||||
data = [{"id": image['id']} for image in images]
|
||||
self.conduct('PUT', '/v1/repositories/%s/%s' % (namespace, repository),
|
||||
data=json.dumps(data), auth=auth,
|
||||
expected_code=201)
|
||||
|
||||
for image in images:
|
||||
# PUT /v1/images/{imageID}/json
|
||||
self.conduct('PUT', '/v1/images/%s/json' % image['id'], data=json.dumps(image))
|
||||
|
||||
# PUT /v1/images/{imageID}/layer
|
||||
tar_file_info = tarfile.TarInfo(name='image_name')
|
||||
tar_file_info.type = tarfile.REGTYPE
|
||||
tar_file_info.size = len(image['id'])
|
||||
|
||||
layer_data = StringIO()
|
||||
|
||||
tar_file = tarfile.open(fileobj=layer_data, mode='w|gz')
|
||||
tar_file.addfile(tar_file_info, StringIO(image['id']))
|
||||
tar_file.close()
|
||||
|
||||
layer_bytes = layer_data.getvalue()
|
||||
layer_data.close()
|
||||
|
||||
self.conduct('PUT', '/v1/images/%s/layer' % image['id'], data=StringIO(layer_bytes))
|
||||
|
||||
# PUT /v1/images/{imageID}/checksum
|
||||
checksum = compute_simple(StringIO(layer_bytes), json.dumps(image))
|
||||
self.conduct('PUT', '/v1/images/%s/checksum' % image['id'],
|
||||
headers={'X-Docker-Checksum-Payload': checksum})
|
||||
|
||||
|
||||
# PUT /v1/repositories/{namespace}/{repository}/tags/latest
|
||||
self.conduct('PUT', '/v1/repositories/%s/%s/tags/latest' % (namespace, repository),
|
||||
data='"' + images[0]['id'] + '"')
|
||||
|
||||
# PUT /v1/repositories/{namespace}/{repository}/images
|
||||
self.conduct('PUT', '/v1/repositories/%s/%s/images' % (namespace, repository),
|
||||
expected_code=204)
|
||||
|
||||
|
||||
def do_pull(self, namespace, repository, username=None, password='password', expected_code=200):
|
||||
auth = None
|
||||
if username:
|
||||
auth = (username, password)
|
||||
|
||||
# Ping!
|
||||
self.ping()
|
||||
|
||||
prefix = '/v1/repositories/%s/%s/' % (namespace, repository)
|
||||
|
||||
# GET /v1/repositories/{namespace}/{repository}/
|
||||
self.conduct('GET', prefix + 'images', auth=auth, expected_code=expected_code)
|
||||
if expected_code != 200:
|
||||
return
|
||||
|
||||
# GET /v1/repositories/{namespace}/{repository}/
|
||||
result = json.loads(self.conduct('GET', prefix + 'tags').text)
|
||||
|
||||
for image_id in result.values():
|
||||
# /v1/images/{imageID}/{ancestry, json, layer}
|
||||
image_prefix = '/v1/images/%s/' % image_id
|
||||
self.conduct('GET', image_prefix + 'ancestry')
|
||||
self.conduct('GET', image_prefix + 'json')
|
||||
self.conduct('GET', image_prefix + 'layer')
|
||||
|
||||
def conduct_api_login(self, username, password):
|
||||
self.conduct('POST', '/api/v1/signin',
|
||||
data=json.dumps(dict(username=username, password=password)),
|
||||
headers={'Content-Type': 'application/json'})
|
||||
|
||||
def change_repo_visibility(self, repository, namespace, visibility):
|
||||
self.conduct('POST', '/api/v1/repository/%s/%s/changevisibility' % (repository, namespace),
|
||||
data=json.dumps(dict(visibility=visibility)),
|
||||
headers={'Content-Type': 'application/json'})
|
||||
|
||||
|
||||
class RegistryTests(RegistryTestCase):
|
||||
def test_pull_publicrepo_anonymous(self):
|
||||
# Add a new repository under the public user, so we have a real repository to pull.
|
||||
images = [{
|
||||
'id': 'onlyimagehere'
|
||||
}]
|
||||
self.do_push('public', 'newrepo', 'public', 'password', images)
|
||||
self.clearSession()
|
||||
|
||||
# First try to pull the (currently private) repo anonymously, which should fail (since it is
|
||||
# private)
|
||||
self.do_pull('public', 'newrepo', expected_code=403)
|
||||
|
||||
# Make the repository public.
|
||||
self.conduct_api_login('public', 'password')
|
||||
self.change_repo_visibility('public', 'newrepo', 'public')
|
||||
self.clearSession()
|
||||
|
||||
# Pull the repository anonymously, which should succeed because the repository is public.
|
||||
self.do_pull('public', 'newrepo')
|
||||
|
||||
|
||||
def test_pull_publicrepo_devtable(self):
|
||||
# Add a new repository under the public user, so we have a real repository to pull.
|
||||
images = [{
|
||||
'id': 'onlyimagehere'
|
||||
}]
|
||||
self.do_push('public', 'newrepo', 'public', 'password', images)
|
||||
self.clearSession()
|
||||
|
||||
# First try to pull the (currently private) repo as devtable, which should fail as it belongs
|
||||
# to public.
|
||||
self.do_pull('public', 'newrepo', 'devtable', 'password', expected_code=403)
|
||||
|
||||
# Make the repository public.
|
||||
self.conduct_api_login('public', 'password')
|
||||
self.change_repo_visibility('public', 'newrepo', 'public')
|
||||
self.clearSession()
|
||||
|
||||
# Pull the repository as devtable, which should succeed because the repository is public.
|
||||
self.do_pull('public', 'newrepo', 'devtable', 'password')
|
||||
|
||||
|
||||
def test_pull_private_repo(self):
|
||||
# Add a new repository under the devtable user, so we have a real repository to pull.
|
||||
images = [{
|
||||
'id': 'onlyimagehere'
|
||||
}]
|
||||
self.do_push('devtable', 'newrepo', 'devtable', 'password', images)
|
||||
self.clearSession()
|
||||
|
||||
# First try to pull the (currently private) repo as public, which should fail as it belongs
|
||||
# to devtable.
|
||||
self.do_pull('devtable', 'newrepo', 'public', 'password', expected_code=403)
|
||||
|
||||
# Pull the repository as devtable, which should succeed because the repository is owned by
|
||||
# devtable.
|
||||
self.do_pull('devtable', 'newrepo', 'devtable', 'password')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
|
@ -38,45 +38,95 @@ class TestLDAP(unittest.TestCase):
|
|||
'ou': 'employees',
|
||||
'uid': ['nomail'],
|
||||
'userPassword': ['somepass']
|
||||
}
|
||||
},
|
||||
'uid=cool.user,ou=employees,dc=quay,dc=io': {
|
||||
'dc': ['quay', 'io'],
|
||||
'ou': 'employees',
|
||||
'uid': ['cool.user', 'referred'],
|
||||
'userPassword': ['somepass'],
|
||||
'mail': ['foo@bar.com']
|
||||
},
|
||||
'uid=referred,ou=employees,dc=quay,dc=io': {
|
||||
'uid': ['referred'],
|
||||
'_referral': 'ldap:///uid=cool.user,ou=employees,dc=quay,dc=io'
|
||||
},
|
||||
'uid=invalidreferred,ou=employees,dc=quay,dc=io': {
|
||||
'uid': ['invalidreferred'],
|
||||
'_referral': 'ldap:///uid=someinvaliduser,ou=employees,dc=quay,dc=io'
|
||||
},
|
||||
'uid=multientry,ou=subgroup1,ou=employees,dc=quay,dc=io': {
|
||||
'uid': ['multientry'],
|
||||
'mail': ['foo@bar.com'],
|
||||
'userPassword': ['somepass'],
|
||||
},
|
||||
'uid=multientry,ou=subgroup2,ou=employees,dc=quay,dc=io': {
|
||||
'uid': ['multientry'],
|
||||
'another': ['key']
|
||||
},
|
||||
})
|
||||
|
||||
self.mockldap.start()
|
||||
|
||||
base_dn = ['dc=quay', 'dc=io']
|
||||
admin_dn = 'uid=testy,ou=employees,dc=quay,dc=io'
|
||||
admin_passwd = 'password'
|
||||
user_rdn = ['ou=employees']
|
||||
uid_attr = 'uid'
|
||||
email_attr = 'mail'
|
||||
|
||||
ldap = LDAPUsers('ldap://localhost', base_dn, admin_dn, admin_passwd, user_rdn,
|
||||
uid_attr, email_attr)
|
||||
|
||||
self.ldap = ldap
|
||||
|
||||
|
||||
def tearDown(self):
|
||||
self.mockldap.stop()
|
||||
finished_database_for_testing(self)
|
||||
self.ctx.__exit__(True, None, None)
|
||||
|
||||
def test_login(self):
|
||||
base_dn = ['dc=quay', 'dc=io']
|
||||
admin_dn = 'uid=testy,ou=employees,dc=quay,dc=io'
|
||||
admin_passwd = 'password'
|
||||
user_rdn = ['ou=employees']
|
||||
uid_attr = 'uid'
|
||||
email_attr = 'mail'
|
||||
# Verify we can login.
|
||||
(response, _) = self.ldap.verify_user('someuser', 'somepass')
|
||||
self.assertEquals(response.username, 'someuser')
|
||||
|
||||
ldap = LDAPUsers('ldap://localhost', base_dn, admin_dn, admin_passwd, user_rdn,
|
||||
uid_attr, email_attr)
|
||||
|
||||
(response, _) = ldap.verify_user('someuser', 'somepass')
|
||||
# Verify we can confirm the user.
|
||||
(response, _) = self.ldap.confirm_existing_user('someuser', 'somepass')
|
||||
self.assertEquals(response.username, 'someuser')
|
||||
|
||||
def test_missing_mail(self):
|
||||
base_dn = ['dc=quay', 'dc=io']
|
||||
admin_dn = 'uid=testy,ou=employees,dc=quay,dc=io'
|
||||
admin_passwd = 'password'
|
||||
user_rdn = ['ou=employees']
|
||||
uid_attr = 'uid'
|
||||
email_attr = 'mail'
|
||||
|
||||
ldap = LDAPUsers('ldap://localhost', base_dn, admin_dn, admin_passwd, user_rdn,
|
||||
uid_attr, email_attr)
|
||||
|
||||
(response, err_msg) = ldap.verify_user('nomail', 'somepass')
|
||||
(response, err_msg) = self.ldap.verify_user('nomail', 'somepass')
|
||||
self.assertIsNone(response)
|
||||
self.assertEquals('Missing mail field "mail" in user record', err_msg)
|
||||
|
||||
def test_confirm_different_username(self):
|
||||
# Verify that the user is logged in and their username was adjusted.
|
||||
(response, _) = self.ldap.verify_user('cool.user', 'somepass')
|
||||
self.assertEquals(response.username, 'cool_user')
|
||||
|
||||
# Verify we can confirm the user's quay username.
|
||||
(response, _) = self.ldap.confirm_existing_user('cool_user', 'somepass')
|
||||
self.assertEquals(response.username, 'cool_user')
|
||||
|
||||
# Verify that we *cannot* confirm the LDAP username.
|
||||
(response, _) = self.ldap.confirm_existing_user('cool.user', 'somepass')
|
||||
self.assertIsNone(response)
|
||||
|
||||
def test_referral(self):
|
||||
(response, _) = self.ldap.verify_user('referred', 'somepass')
|
||||
self.assertEquals(response.username, 'cool_user')
|
||||
|
||||
# Verify we can confirm the user's quay username.
|
||||
(response, _) = self.ldap.confirm_existing_user('cool_user', 'somepass')
|
||||
self.assertEquals(response.username, 'cool_user')
|
||||
|
||||
def test_invalid_referral(self):
|
||||
(response, _) = self.ldap.verify_user('invalidreferred', 'somepass')
|
||||
self.assertIsNone(response)
|
||||
|
||||
def test_multientry(self):
|
||||
(response, _) = self.ldap.verify_user('multientry', 'somepass')
|
||||
self.assertEquals(response.username, 'multientry')
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
|
29
test/test_trigger.py
Normal file
29
test/test_trigger.py
Normal file
|
@ -0,0 +1,29 @@
|
|||
import unittest
|
||||
import re
|
||||
|
||||
from endpoints.trigger import matches_ref
|
||||
|
||||
class TestRegex(unittest.TestCase):
|
||||
def assertDoesNotMatch(self, ref, filt):
|
||||
self.assertFalse(matches_ref(ref, re.compile(filt)))
|
||||
|
||||
def assertMatches(self, ref, filt):
|
||||
self.assertTrue(matches_ref(ref, re.compile(filt)))
|
||||
|
||||
def test_matches_ref(self):
|
||||
self.assertMatches('ref/heads/master', '.+')
|
||||
self.assertMatches('ref/heads/master', 'heads/.+')
|
||||
self.assertMatches('ref/heads/master', 'heads/master')
|
||||
|
||||
self.assertDoesNotMatch('ref/heads/foobar', 'heads/master')
|
||||
self.assertDoesNotMatch('ref/heads/master', 'tags/master')
|
||||
|
||||
self.assertMatches('ref/heads/master', '(((heads/alpha)|(heads/beta))|(heads/gamma))|(heads/master)')
|
||||
self.assertMatches('ref/heads/alpha', '(((heads/alpha)|(heads/beta))|(heads/gamma))|(heads/master)')
|
||||
self.assertMatches('ref/heads/beta', '(((heads/alpha)|(heads/beta))|(heads/gamma))|(heads/master)')
|
||||
self.assertMatches('ref/heads/gamma', '(((heads/alpha)|(heads/beta))|(heads/gamma))|(heads/master)')
|
||||
|
||||
self.assertDoesNotMatch('ref/heads/delta', '(((heads/alpha)|(heads/beta))|(heads/gamma))|(heads/master)')
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
Reference in a new issue