diff --git a/endpoints/v2/v2auth.py b/endpoints/v2/v2auth.py index db30f6c23..e339aaed3 100644 --- a/endpoints/v2/v2auth.py +++ b/endpoints/v2/v2auth.py @@ -78,10 +78,16 @@ def generate_registry_jwt(): if not REPOSITORY_NAME_REGEX.match(reponame): abort(400) - if 'pull' in actions and 'push' in actions: + final_actions = [] + + if 'push' in actions: + # If there is no valid user or token, then the repository cannot be + # accessed. if user is None and token is None: abort(401) + # Lookup the repository. If it exists, make sure the entity has modify + # permission. Otherwise, make sure the entity has create permission. repo = model.repository.get_repository(namespace, reponame) if repo: if not ModifyRepositoryPermission(namespace, reponame).can(): @@ -92,15 +98,25 @@ def generate_registry_jwt(): logger.debug('Creating repository: %s/%s', namespace, reponame) model.repository.create_repository(namespace, reponame, user) - elif 'pull' in actions: - if (not ReadRepositoryPermission(namespace, reponame).can() and - not model.repository.repository_is_public(namespace, reponame)): + + final_actions.append('push') + + if 'pull' in actions: + # Grant pull if the user can read the repo or it is public. We also + # grant it if the user already has push, as they can clearly change + # the repository. + if (ReadRepositoryPermission(namespace, reponame).can() or + model.repository.repository_is_public(namespace, reponame) or + 'push' in final_actions): + final_actions.append('pull') + else: abort(403) + access.append({ 'type': 'repository', 'name': namespace_and_repo, - 'actions': actions, + 'actions': final_actions, }) elif user is None and token is None: diff --git a/test/registry_tests.py b/test/registry_tests.py index 46f20692e..74220c679 100644 --- a/test/registry_tests.py +++ b/test/registry_tests.py @@ -180,7 +180,8 @@ class RegistryTestCaseMixin(LiveServerTestCase): class BaseRegistryMixin(object): - def conduct(self, method, url, headers=None, data=None, auth=None, params=None, expected_code=200): + def conduct(self, method, url, headers=None, data=None, auth=None, params=None, expected_code=200, + json_data=None): params = params or {} params['_csrf_token'] = self.csrf_token @@ -199,6 +200,10 @@ class BaseRegistryMixin(object): elif auth: auth_tuple = auth + if json_data is not None: + data = json.dumps(json_data) + headers['Content-Type'] = 'application/json' + response = self.session.request(method, self.get_server_url() + url, headers=headers, data=data, auth=auth_tuple, params=params) if response.status_code != expected_code: @@ -386,14 +391,15 @@ class V2RegistryMixin(BaseRegistryMixin): class V2RegistryPushMixin(V2RegistryMixin): def do_push(self, namespace, repository, username, password, images=None, tag_name=None, - cancel=False, invalid=False, expected_manifest_code=202, expected_auth_code=200): + cancel=False, invalid=False, expected_manifest_code=202, expected_auth_code=200, + scopes=None): images = images or self._get_default_images() # Ping! self.v2_ping() # Auth. - self.do_auth(username, password, namespace, repository, scopes=['push', 'pull'], + self.do_auth(username, password, namespace, repository, scopes=scopes or ['push', 'pull'], expected_code=expected_auth_code) if expected_auth_code != 200: @@ -722,6 +728,24 @@ class V2RegistryTests(V2RegistryPullMixin, V2RegistryPushMixin, RegistryTestsMix RegistryTestCaseMixin, LiveServerTestCase): """ Tests for V2 registry. """ + def test_push_only_push_scope(self): + images = [{ + 'id': 'onlyimagehere', + 'contents': 'foobar', + }] + + self.do_push('devtable', 'somenewrepo', 'devtable', 'password', images, + scopes=['push']) + + def test_attempt_push_only_push_scope(self): + images = [{ + 'id': 'onlyimagehere', + 'contents': 'foobar', + }] + + self.do_push('public', 'somenewrepo', 'devtable', 'password', images, + scopes=['push'], expected_auth_code=403) + def test_push_reponame_with_slashes(self): # Attempt to add a repository name with slashes. This should fail as we do not support it. images = [{ @@ -736,7 +760,6 @@ class V2RegistryTests(V2RegistryPullMixin, V2RegistryPushMixin, RegistryTestsMix def test_cancel_push(self): self.do_push('devtable', 'newrepo', 'devtable', 'password', cancel=True) - def test_pull_by_checksum(self): # Add a new repository under the user, so we have a real repository to pull. _, digest = self.do_push('devtable', 'newrepo', 'devtable', 'password') @@ -913,7 +936,6 @@ class V2RegistryTests(V2RegistryPullMixin, V2RegistryPushMixin, RegistryTestsMix self.conduct('GET', '/v2/devtable/doesnotexist/tags/list', auth='jwt', expected_code=401) - class V1PushV2PullRegistryTests(V2RegistryPullMixin, V1RegistryPushMixin, RegistryTestsMixin, RegistryTestCaseMixin, LiveServerTestCase): """ Tests for V1 push, V2 pull registry. """ @@ -968,6 +990,114 @@ class VerbTests(RegistryTestCaseMixin, V1RegistryPushMixin, LiveServerTestCase): image_name = layer_tar.extractfile(layer_tar.getmember('image_name')).read() self.assertEquals('latestid', image_name) +class V1RegistryLoginMixin(object): + def do_login(self, username, password, scope, expect_success=True): + data = { + 'username': username, + 'password': password, + } + + response = self.conduct('POST', '/v1/users/', json_data=data, expected_code=400) + if expect_success: + self.assertEquals(response.text, '"Username or email already exists"') + else: + self.assertNotEquals(response.text, '"Username or email already exists"') + +class V2RegistryLoginMixin(object): + def do_login(self, username, password, scope, expect_success=True, expected_code=None): + params = { + 'account': username, + 'scope': scope, + 'service': app.config['SERVER_HOSTNAME'], + } + + if expected_code is None: + if expect_success: + expected_code = 200 + else: + expected_code = 403 + + response = self.conduct('GET', '/v2/auth', params=params, auth=(username, password), + expected_code=expected_code) + return response + + +class LoginTests(object): + """ Generic tests for registry login. """ + def test_invalid_username_knownrepo(self): + self.do_login('invaliduser', 'somepassword', expect_success=False, + scope='repository:devtable/simple:pull') + + def test_invalid_password_knownrepo(self): + self.do_login('devtable', 'somepassword', expect_success=False, + scope='repository:devtable/simple:pull') + + def test_validuser_knownrepo(self): + self.do_login('devtable', 'password', expect_success=True, + scope='repository:devtable/simple:pull') + + def test_validuser_encryptedpass(self): + # Generate an encrypted password. + self.conduct_api_login('devtable', 'password') + resp = self.conduct('POST', '/api/v1/user/clientkey', json_data=dict(password='password')) + + encryptedpassword = resp.json()['key'] + self.do_login('devtable', encryptedpassword, expect_success=True, + scope='repository:devtable/simple:pull') + + def test_robotkey(self): + # Lookup the robot's password. + self.conduct_api_login('devtable', 'password') + resp = self.conduct('GET', '/api/v1/user/robots/dtrobot') + robot_token = resp.json()['token'] + + self.do_login('devtable+dtrobot', robot_token, expect_success=True, + scope='repository:devtable/complex:pull') + + def test_oauth(self): + self.do_login('$oauthtoken', 'test', expect_success=True, + scope='repository:devtable/complex:pull') + + +class V1LoginTests(V1RegistryLoginMixin, LoginTests, RegistryTestCaseMixin, BaseRegistryMixin, LiveServerTestCase): + """ Tests for V1 login. """ + pass # No additional tests. + + +class V2LoginTests(V2RegistryLoginMixin, LoginTests, RegistryTestCaseMixin, BaseRegistryMixin, LiveServerTestCase): + """ Tests for V2 login. """ + + def test_validuser_unknownrepo(self): + self.do_login('devtable', 'password', expect_success=False, + scope='repository:invalidnamespace/simple:pull') + + def test_validuser_unknownnamespacerepo(self): + self.do_login('devtable', 'password', expect_success=True, + scope='repository:devtable/newrepo:push') + + def test_validuser_noaccess(self): + self.do_login('public', 'password', expect_success=False, + scope='repository:devtable/simple:pull') + + def test_validuser_noscope(self): + self.do_login('public', 'password', expect_success=True, scope=None) + + def test_invaliduser_noscope(self): + self.do_login('invaliduser', 'invalidpass', expected_code=401, scope=None) + + def test_invalidpassword_noscope(self): + self.do_login('public', 'invalidpass', expected_code=401, scope=None) + + def test_oauth_noaccess(self): + self.do_login('$oauthtoken', 'test', expect_success=False, + scope='repository:public/publicrepo:pull,push') + + def test_nouser_pull_publicrepo(self): + self.do_login('', '', expect_success=True, scope='repository:public/publicrepo:pull') + + def test_nouser_push_publicrepo(self): + self.do_login('', '', expected_code=401, scope='repository:public/publicrepo:push') + if __name__ == '__main__': unittest.main()