Merge pull request #976 from coreos-inc/incidentaltests

Add login tests and fix scope security issue
This commit is contained in:
josephschorr 2015-11-24 13:42:06 -05:00
commit 0dbd19a236
2 changed files with 156 additions and 10 deletions

View file

@ -78,10 +78,16 @@ def generate_registry_jwt():
if not REPOSITORY_NAME_REGEX.match(reponame):
abort(400)
if 'pull' in actions and 'push' in actions:
final_actions = []
if 'push' in actions:
# If there is no valid user or token, then the repository cannot be
# accessed.
if user is None and token is None:
abort(401)
# Lookup the repository. If it exists, make sure the entity has modify
# permission. Otherwise, make sure the entity has create permission.
repo = model.repository.get_repository(namespace, reponame)
if repo:
if not ModifyRepositoryPermission(namespace, reponame).can():
@ -92,15 +98,25 @@ def generate_registry_jwt():
logger.debug('Creating repository: %s/%s', namespace, reponame)
model.repository.create_repository(namespace, reponame, user)
elif 'pull' in actions:
if (not ReadRepositoryPermission(namespace, reponame).can() and
not model.repository.repository_is_public(namespace, reponame)):
final_actions.append('push')
if 'pull' in actions:
# Grant pull if the user can read the repo or it is public. We also
# grant it if the user already has push, as they can clearly change
# the repository.
if (ReadRepositoryPermission(namespace, reponame).can() or
model.repository.repository_is_public(namespace, reponame) or
'push' in final_actions):
final_actions.append('pull')
else:
abort(403)
access.append({
'type': 'repository',
'name': namespace_and_repo,
'actions': actions,
'actions': final_actions,
})
elif user is None and token is None:

View file

@ -180,7 +180,8 @@ class RegistryTestCaseMixin(LiveServerTestCase):
class BaseRegistryMixin(object):
def conduct(self, method, url, headers=None, data=None, auth=None, params=None, expected_code=200):
def conduct(self, method, url, headers=None, data=None, auth=None, params=None, expected_code=200,
json_data=None):
params = params or {}
params['_csrf_token'] = self.csrf_token
@ -199,6 +200,10 @@ class BaseRegistryMixin(object):
elif auth:
auth_tuple = auth
if json_data is not None:
data = json.dumps(json_data)
headers['Content-Type'] = 'application/json'
response = self.session.request(method, self.get_server_url() + url, headers=headers, data=data,
auth=auth_tuple, params=params)
if response.status_code != expected_code:
@ -386,14 +391,15 @@ class V2RegistryMixin(BaseRegistryMixin):
class V2RegistryPushMixin(V2RegistryMixin):
def do_push(self, namespace, repository, username, password, images=None, tag_name=None,
cancel=False, invalid=False, expected_manifest_code=202, expected_auth_code=200):
cancel=False, invalid=False, expected_manifest_code=202, expected_auth_code=200,
scopes=None):
images = images or self._get_default_images()
# Ping!
self.v2_ping()
# Auth.
self.do_auth(username, password, namespace, repository, scopes=['push', 'pull'],
self.do_auth(username, password, namespace, repository, scopes=scopes or ['push', 'pull'],
expected_code=expected_auth_code)
if expected_auth_code != 200:
@ -722,6 +728,24 @@ class V2RegistryTests(V2RegistryPullMixin, V2RegistryPushMixin, RegistryTestsMix
RegistryTestCaseMixin, LiveServerTestCase):
""" Tests for V2 registry. """
def test_push_only_push_scope(self):
images = [{
'id': 'onlyimagehere',
'contents': 'foobar',
}]
self.do_push('devtable', 'somenewrepo', 'devtable', 'password', images,
scopes=['push'])
def test_attempt_push_only_push_scope(self):
images = [{
'id': 'onlyimagehere',
'contents': 'foobar',
}]
self.do_push('public', 'somenewrepo', 'devtable', 'password', images,
scopes=['push'], expected_auth_code=403)
def test_push_reponame_with_slashes(self):
# Attempt to add a repository name with slashes. This should fail as we do not support it.
images = [{
@ -736,7 +760,6 @@ class V2RegistryTests(V2RegistryPullMixin, V2RegistryPushMixin, RegistryTestsMix
def test_cancel_push(self):
self.do_push('devtable', 'newrepo', 'devtable', 'password', cancel=True)
def test_pull_by_checksum(self):
# Add a new repository under the user, so we have a real repository to pull.
_, digest = self.do_push('devtable', 'newrepo', 'devtable', 'password')
@ -913,7 +936,6 @@ class V2RegistryTests(V2RegistryPullMixin, V2RegistryPushMixin, RegistryTestsMix
self.conduct('GET', '/v2/devtable/doesnotexist/tags/list', auth='jwt', expected_code=401)
class V1PushV2PullRegistryTests(V2RegistryPullMixin, V1RegistryPushMixin, RegistryTestsMixin,
RegistryTestCaseMixin, LiveServerTestCase):
""" Tests for V1 push, V2 pull registry. """
@ -968,6 +990,114 @@ class VerbTests(RegistryTestCaseMixin, V1RegistryPushMixin, LiveServerTestCase):
image_name = layer_tar.extractfile(layer_tar.getmember('image_name')).read()
self.assertEquals('latestid', image_name)
class V1RegistryLoginMixin(object):
def do_login(self, username, password, scope, expect_success=True):
data = {
'username': username,
'password': password,
}
response = self.conduct('POST', '/v1/users/', json_data=data, expected_code=400)
if expect_success:
self.assertEquals(response.text, '"Username or email already exists"')
else:
self.assertNotEquals(response.text, '"Username or email already exists"')
class V2RegistryLoginMixin(object):
def do_login(self, username, password, scope, expect_success=True, expected_code=None):
params = {
'account': username,
'scope': scope,
'service': app.config['SERVER_HOSTNAME'],
}
if expected_code is None:
if expect_success:
expected_code = 200
else:
expected_code = 403
response = self.conduct('GET', '/v2/auth', params=params, auth=(username, password),
expected_code=expected_code)
return response
class LoginTests(object):
""" Generic tests for registry login. """
def test_invalid_username_knownrepo(self):
self.do_login('invaliduser', 'somepassword', expect_success=False,
scope='repository:devtable/simple:pull')
def test_invalid_password_knownrepo(self):
self.do_login('devtable', 'somepassword', expect_success=False,
scope='repository:devtable/simple:pull')
def test_validuser_knownrepo(self):
self.do_login('devtable', 'password', expect_success=True,
scope='repository:devtable/simple:pull')
def test_validuser_encryptedpass(self):
# Generate an encrypted password.
self.conduct_api_login('devtable', 'password')
resp = self.conduct('POST', '/api/v1/user/clientkey', json_data=dict(password='password'))
encryptedpassword = resp.json()['key']
self.do_login('devtable', encryptedpassword, expect_success=True,
scope='repository:devtable/simple:pull')
def test_robotkey(self):
# Lookup the robot's password.
self.conduct_api_login('devtable', 'password')
resp = self.conduct('GET', '/api/v1/user/robots/dtrobot')
robot_token = resp.json()['token']
self.do_login('devtable+dtrobot', robot_token, expect_success=True,
scope='repository:devtable/complex:pull')
def test_oauth(self):
self.do_login('$oauthtoken', 'test', expect_success=True,
scope='repository:devtable/complex:pull')
class V1LoginTests(V1RegistryLoginMixin, LoginTests, RegistryTestCaseMixin, BaseRegistryMixin, LiveServerTestCase):
""" Tests for V1 login. """
pass # No additional tests.
class V2LoginTests(V2RegistryLoginMixin, LoginTests, RegistryTestCaseMixin, BaseRegistryMixin, LiveServerTestCase):
""" Tests for V2 login. """
def test_validuser_unknownrepo(self):
self.do_login('devtable', 'password', expect_success=False,
scope='repository:invalidnamespace/simple:pull')
def test_validuser_unknownnamespacerepo(self):
self.do_login('devtable', 'password', expect_success=True,
scope='repository:devtable/newrepo:push')
def test_validuser_noaccess(self):
self.do_login('public', 'password', expect_success=False,
scope='repository:devtable/simple:pull')
def test_validuser_noscope(self):
self.do_login('public', 'password', expect_success=True, scope=None)
def test_invaliduser_noscope(self):
self.do_login('invaliduser', 'invalidpass', expected_code=401, scope=None)
def test_invalidpassword_noscope(self):
self.do_login('public', 'invalidpass', expected_code=401, scope=None)
def test_oauth_noaccess(self):
self.do_login('$oauthtoken', 'test', expect_success=False,
scope='repository:public/publicrepo:pull,push')
def test_nouser_pull_publicrepo(self):
self.do_login('', '', expect_success=True, scope='repository:public/publicrepo:pull')
def test_nouser_push_publicrepo(self):
self.do_login('', '', expected_code=401, scope='repository:public/publicrepo:push')
if __name__ == '__main__':
unittest.main()