Add login tests and fix scope security issue
This commit is contained in:
parent
c9b1a02e9c
commit
75a91f0f92
2 changed files with 156 additions and 10 deletions
|
@ -78,10 +78,16 @@ def generate_registry_jwt():
|
|||
if not REPOSITORY_NAME_REGEX.match(reponame):
|
||||
abort(400)
|
||||
|
||||
if 'pull' in actions and 'push' in actions:
|
||||
final_actions = []
|
||||
|
||||
if 'push' in actions:
|
||||
# If there is no valid user or token, then the repository cannot be
|
||||
# accessed.
|
||||
if user is None and token is None:
|
||||
abort(401)
|
||||
|
||||
# Lookup the repository. If it exists, make sure the entity has modify
|
||||
# permission. Otherwise, make sure the entity has create permission.
|
||||
repo = model.repository.get_repository(namespace, reponame)
|
||||
if repo:
|
||||
if not ModifyRepositoryPermission(namespace, reponame).can():
|
||||
|
@ -92,15 +98,25 @@ def generate_registry_jwt():
|
|||
|
||||
logger.debug('Creating repository: %s/%s', namespace, reponame)
|
||||
model.repository.create_repository(namespace, reponame, user)
|
||||
elif 'pull' in actions:
|
||||
if (not ReadRepositoryPermission(namespace, reponame).can() and
|
||||
not model.repository.repository_is_public(namespace, reponame)):
|
||||
|
||||
final_actions.append('push')
|
||||
|
||||
if 'pull' in actions:
|
||||
# Grant pull if the user can read the repo or it is public. We also
|
||||
# grant it if the user already has push, as they can clearly change
|
||||
# the repository.
|
||||
if (ReadRepositoryPermission(namespace, reponame).can() or
|
||||
model.repository.repository_is_public(namespace, reponame) or
|
||||
'push' in final_actions):
|
||||
final_actions.append('pull')
|
||||
else:
|
||||
abort(403)
|
||||
|
||||
|
||||
access.append({
|
||||
'type': 'repository',
|
||||
'name': namespace_and_repo,
|
||||
'actions': actions,
|
||||
'actions': final_actions,
|
||||
})
|
||||
|
||||
elif user is None and token is None:
|
||||
|
|
|
@ -180,7 +180,8 @@ class RegistryTestCaseMixin(LiveServerTestCase):
|
|||
|
||||
|
||||
class BaseRegistryMixin(object):
|
||||
def conduct(self, method, url, headers=None, data=None, auth=None, params=None, expected_code=200):
|
||||
def conduct(self, method, url, headers=None, data=None, auth=None, params=None, expected_code=200,
|
||||
json_data=None):
|
||||
params = params or {}
|
||||
params['_csrf_token'] = self.csrf_token
|
||||
|
||||
|
@ -199,6 +200,10 @@ class BaseRegistryMixin(object):
|
|||
elif auth:
|
||||
auth_tuple = auth
|
||||
|
||||
if json_data is not None:
|
||||
data = json.dumps(json_data)
|
||||
headers['Content-Type'] = 'application/json'
|
||||
|
||||
response = self.session.request(method, self.get_server_url() + url, headers=headers, data=data,
|
||||
auth=auth_tuple, params=params)
|
||||
if response.status_code != expected_code:
|
||||
|
@ -386,14 +391,15 @@ class V2RegistryMixin(BaseRegistryMixin):
|
|||
|
||||
class V2RegistryPushMixin(V2RegistryMixin):
|
||||
def do_push(self, namespace, repository, username, password, images=None, tag_name=None,
|
||||
cancel=False, invalid=False, expected_manifest_code=202, expected_auth_code=200):
|
||||
cancel=False, invalid=False, expected_manifest_code=202, expected_auth_code=200,
|
||||
scopes=None):
|
||||
images = images or self._get_default_images()
|
||||
|
||||
# Ping!
|
||||
self.v2_ping()
|
||||
|
||||
# Auth.
|
||||
self.do_auth(username, password, namespace, repository, scopes=['push', 'pull'],
|
||||
self.do_auth(username, password, namespace, repository, scopes=scopes or ['push', 'pull'],
|
||||
expected_code=expected_auth_code)
|
||||
|
||||
if expected_auth_code != 200:
|
||||
|
@ -722,6 +728,24 @@ class V2RegistryTests(V2RegistryPullMixin, V2RegistryPushMixin, RegistryTestsMix
|
|||
RegistryTestCaseMixin, LiveServerTestCase):
|
||||
""" Tests for V2 registry. """
|
||||
|
||||
def test_push_only_push_scope(self):
|
||||
images = [{
|
||||
'id': 'onlyimagehere',
|
||||
'contents': 'foobar',
|
||||
}]
|
||||
|
||||
self.do_push('devtable', 'somenewrepo', 'devtable', 'password', images,
|
||||
scopes=['push'])
|
||||
|
||||
def test_attempt_push_only_push_scope(self):
|
||||
images = [{
|
||||
'id': 'onlyimagehere',
|
||||
'contents': 'foobar',
|
||||
}]
|
||||
|
||||
self.do_push('public', 'somenewrepo', 'devtable', 'password', images,
|
||||
scopes=['push'], expected_auth_code=403)
|
||||
|
||||
def test_push_reponame_with_slashes(self):
|
||||
# Attempt to add a repository name with slashes. This should fail as we do not support it.
|
||||
images = [{
|
||||
|
@ -736,7 +760,6 @@ class V2RegistryTests(V2RegistryPullMixin, V2RegistryPushMixin, RegistryTestsMix
|
|||
def test_cancel_push(self):
|
||||
self.do_push('devtable', 'newrepo', 'devtable', 'password', cancel=True)
|
||||
|
||||
|
||||
def test_pull_by_checksum(self):
|
||||
# Add a new repository under the user, so we have a real repository to pull.
|
||||
_, digest = self.do_push('devtable', 'newrepo', 'devtable', 'password')
|
||||
|
@ -913,7 +936,6 @@ class V2RegistryTests(V2RegistryPullMixin, V2RegistryPushMixin, RegistryTestsMix
|
|||
self.conduct('GET', '/v2/devtable/doesnotexist/tags/list', auth='jwt', expected_code=401)
|
||||
|
||||
|
||||
|
||||
class V1PushV2PullRegistryTests(V2RegistryPullMixin, V1RegistryPushMixin, RegistryTestsMixin,
|
||||
RegistryTestCaseMixin, LiveServerTestCase):
|
||||
""" Tests for V1 push, V2 pull registry. """
|
||||
|
@ -968,6 +990,114 @@ class VerbTests(RegistryTestCaseMixin, V1RegistryPushMixin, LiveServerTestCase):
|
|||
image_name = layer_tar.extractfile(layer_tar.getmember('image_name')).read()
|
||||
self.assertEquals('latestid', image_name)
|
||||
|
||||
class V1RegistryLoginMixin(object):
|
||||
def do_login(self, username, password, scope, expect_success=True):
|
||||
data = {
|
||||
'username': username,
|
||||
'password': password,
|
||||
}
|
||||
|
||||
response = self.conduct('POST', '/v1/users/', json_data=data, expected_code=400)
|
||||
if expect_success:
|
||||
self.assertEquals(response.text, '"Username or email already exists"')
|
||||
else:
|
||||
self.assertNotEquals(response.text, '"Username or email already exists"')
|
||||
|
||||
class V2RegistryLoginMixin(object):
|
||||
def do_login(self, username, password, scope, expect_success=True, expected_code=None):
|
||||
params = {
|
||||
'account': username,
|
||||
'scope': scope,
|
||||
'service': app.config['SERVER_HOSTNAME'],
|
||||
}
|
||||
|
||||
if expected_code is None:
|
||||
if expect_success:
|
||||
expected_code = 200
|
||||
else:
|
||||
expected_code = 403
|
||||
|
||||
response = self.conduct('GET', '/v2/auth', params=params, auth=(username, password),
|
||||
expected_code=expected_code)
|
||||
return response
|
||||
|
||||
|
||||
class LoginTests(object):
|
||||
""" Generic tests for registry login. """
|
||||
def test_invalid_username_knownrepo(self):
|
||||
self.do_login('invaliduser', 'somepassword', expect_success=False,
|
||||
scope='repository:devtable/simple:pull')
|
||||
|
||||
def test_invalid_password_knownrepo(self):
|
||||
self.do_login('devtable', 'somepassword', expect_success=False,
|
||||
scope='repository:devtable/simple:pull')
|
||||
|
||||
def test_validuser_knownrepo(self):
|
||||
self.do_login('devtable', 'password', expect_success=True,
|
||||
scope='repository:devtable/simple:pull')
|
||||
|
||||
def test_validuser_encryptedpass(self):
|
||||
# Generate an encrypted password.
|
||||
self.conduct_api_login('devtable', 'password')
|
||||
resp = self.conduct('POST', '/api/v1/user/clientkey', json_data=dict(password='password'))
|
||||
|
||||
encryptedpassword = resp.json()['key']
|
||||
self.do_login('devtable', encryptedpassword, expect_success=True,
|
||||
scope='repository:devtable/simple:pull')
|
||||
|
||||
def test_robotkey(self):
|
||||
# Lookup the robot's password.
|
||||
self.conduct_api_login('devtable', 'password')
|
||||
resp = self.conduct('GET', '/api/v1/user/robots/dtrobot')
|
||||
robot_token = resp.json()['token']
|
||||
|
||||
self.do_login('devtable+dtrobot', robot_token, expect_success=True,
|
||||
scope='repository:devtable/complex:pull')
|
||||
|
||||
def test_oauth(self):
|
||||
self.do_login('$oauthtoken', 'test', expect_success=True,
|
||||
scope='repository:devtable/complex:pull')
|
||||
|
||||
|
||||
class V1LoginTests(V1RegistryLoginMixin, LoginTests, RegistryTestCaseMixin, BaseRegistryMixin, LiveServerTestCase):
|
||||
""" Tests for V1 login. """
|
||||
pass # No additional tests.
|
||||
|
||||
|
||||
class V2LoginTests(V2RegistryLoginMixin, LoginTests, RegistryTestCaseMixin, BaseRegistryMixin, LiveServerTestCase):
|
||||
""" Tests for V2 login. """
|
||||
|
||||
def test_validuser_unknownrepo(self):
|
||||
self.do_login('devtable', 'password', expect_success=False,
|
||||
scope='repository:invalidnamespace/simple:pull')
|
||||
|
||||
def test_validuser_unknownnamespacerepo(self):
|
||||
self.do_login('devtable', 'password', expect_success=True,
|
||||
scope='repository:devtable/newrepo:push')
|
||||
|
||||
def test_validuser_noaccess(self):
|
||||
self.do_login('public', 'password', expect_success=False,
|
||||
scope='repository:devtable/simple:pull')
|
||||
|
||||
def test_validuser_noscope(self):
|
||||
self.do_login('public', 'password', expect_success=True, scope=None)
|
||||
|
||||
def test_invaliduser_noscope(self):
|
||||
self.do_login('invaliduser', 'invalidpass', expected_code=401, scope=None)
|
||||
|
||||
def test_invalidpassword_noscope(self):
|
||||
self.do_login('public', 'invalidpass', expected_code=401, scope=None)
|
||||
|
||||
def test_oauth_noaccess(self):
|
||||
self.do_login('$oauthtoken', 'test', expect_success=False,
|
||||
scope='repository:public/publicrepo:pull,push')
|
||||
|
||||
def test_nouser_pull_publicrepo(self):
|
||||
self.do_login('', '', expect_success=True, scope='repository:public/publicrepo:pull')
|
||||
|
||||
def test_nouser_push_publicrepo(self):
|
||||
self.do_login('', '', expected_code=401, scope='repository:public/publicrepo:push')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
|
Reference in a new issue