Make public catalog only enabled via a feature flag
This commit is contained in:
parent
f44df49236
commit
555041876d
3 changed files with 53 additions and 30 deletions
|
@ -263,6 +263,10 @@ class DefaultConfig(ImmutableConfig):
|
|||
# Feature Flag: Whether to enable support for App repositories.
|
||||
FEATURE_APP_REGISTRY = False
|
||||
|
||||
# Feature Flag: If set to true, the _catalog endpoint returns public repositories. Otherwise,
|
||||
# only private repositories can be returned.
|
||||
FEATURE_PUBLIC_CATALOG = False
|
||||
|
||||
# The namespaces which should have the ability to enable signing
|
||||
SIGNING_NAMESPACE_WHITELIST = ['coreos', 'quay']
|
||||
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
import features
|
||||
|
||||
from flask import jsonify
|
||||
|
||||
from auth.registry_jwt_auth import process_registry_jwt_auth, get_granted_entity
|
||||
|
@ -15,8 +17,9 @@ def catalog_search(limit, offset, pagination_callback):
|
|||
if entity:
|
||||
username = entity.user.username
|
||||
|
||||
include_public = bool(features.PUBLIC_CATALOG)
|
||||
visible_repositories = model.get_visible_repositories(username, limit+1, offset,
|
||||
include_public=True)
|
||||
include_public=include_public)
|
||||
response = jsonify({
|
||||
'repositories': ['%s/%s' % (repo.namespace_name, repo.name)
|
||||
for repo in visible_repositories][0:limit],
|
||||
|
|
|
@ -1802,44 +1802,60 @@ class V2RegistryTests(V2RegistryPullMixin, V2RegistryPushMixin, RegistryTestsMix
|
|||
def test_one_five_blacklist(self):
|
||||
self.conduct('GET', '/v2/', expected_code=404, user_agent='Go 1.1 package http')
|
||||
|
||||
def test_catalog(self):
|
||||
def test_normal_catalog(self):
|
||||
# Look for public repositories and ensure all are public.
|
||||
response = self.conduct('GET', '/v2/_catalog')
|
||||
data = response.json()
|
||||
self.assertTrue(len(data['repositories']) > 0)
|
||||
with TestFeature(self, 'PUBLIC_CATALOG', False):
|
||||
response = self.conduct('GET', '/v2/_catalog')
|
||||
data = response.json()
|
||||
self.assertTrue(len(data['repositories']) == 0)
|
||||
|
||||
for reponame in data['repositories']:
|
||||
self.assertTrue(reponame.find('public/') == 0)
|
||||
# Perform auth and lookup the catalog again.
|
||||
self.do_auth('devtable', 'password', 'devtable', 'simple')
|
||||
all_repos = []
|
||||
|
||||
# Perform auth and lookup the catalog again.
|
||||
self.do_auth('devtable', 'password', 'devtable', 'simple')
|
||||
all_repos = []
|
||||
response = self.conduct('GET', '/v2/_catalog', params=dict(n=2), auth='jwt')
|
||||
data = response.json()
|
||||
self.assertEquals(len(data['repositories']), 2)
|
||||
|
||||
response = self.conduct('GET', '/v2/_catalog', params=dict(n=2), auth='jwt')
|
||||
data = response.json()
|
||||
self.assertEquals(len(data['repositories']), 2)
|
||||
all_repos.extend(data['repositories'])
|
||||
def test_public_catalog(self):
|
||||
# Look for public repositories and ensure all are public.
|
||||
with TestFeature(self, 'PUBLIC_CATALOG', True):
|
||||
response = self.conduct('GET', '/v2/_catalog')
|
||||
data = response.json()
|
||||
self.assertTrue(len(data['repositories']) > 0)
|
||||
|
||||
# Ensure we have a next link.
|
||||
self.assertIsNotNone(response.headers.get('Link'))
|
||||
for reponame in data['repositories']:
|
||||
self.assertTrue(reponame.find('public/') == 0)
|
||||
|
||||
# Request with the next link.
|
||||
while response.headers.get('Link'):
|
||||
link_url = response.headers.get('Link')[1:].split(';')[0][:-1]
|
||||
v2_index = link_url.find('/v2/')
|
||||
relative_url = link_url[v2_index:]
|
||||
# Perform auth and lookup the catalog again.
|
||||
self.do_auth('devtable', 'password', 'devtable', 'simple')
|
||||
all_repos = []
|
||||
|
||||
next_response = self.conduct('GET', relative_url, auth='jwt')
|
||||
next_data = next_response.json()
|
||||
all_repos.extend(next_data['repositories'])
|
||||
response = self.conduct('GET', '/v2/_catalog', params=dict(n=2), auth='jwt')
|
||||
data = response.json()
|
||||
self.assertEquals(len(data['repositories']), 2)
|
||||
all_repos.extend(data['repositories'])
|
||||
|
||||
self.assertTrue(len(next_data['repositories']) <= 2)
|
||||
self.assertNotEquals(next_data['repositories'], data['repositories'])
|
||||
response = next_response
|
||||
# Ensure we have a next link.
|
||||
self.assertIsNotNone(response.headers.get('Link'))
|
||||
|
||||
# Ensure the authed request has the public repository.
|
||||
public = [reponame for reponame in all_repos if reponame.find('/publicrepo') >= 0]
|
||||
self.assertTrue(bool(public))
|
||||
# Request with the next link.
|
||||
while response.headers.get('Link'):
|
||||
link_url = response.headers.get('Link')[1:].split(';')[0][:-1]
|
||||
v2_index = link_url.find('/v2/')
|
||||
relative_url = link_url[v2_index:]
|
||||
|
||||
next_response = self.conduct('GET', relative_url, auth='jwt')
|
||||
next_data = next_response.json()
|
||||
all_repos.extend(next_data['repositories'])
|
||||
|
||||
self.assertTrue(len(next_data['repositories']) <= 2)
|
||||
self.assertNotEquals(next_data['repositories'], data['repositories'])
|
||||
response = next_response
|
||||
|
||||
# Ensure the authed request has the public repository.
|
||||
public = [reponame for reponame in all_repos if reponame.find('/publicrepo') >= 0]
|
||||
self.assertTrue(bool(public))
|
||||
|
||||
|
||||
class V1PushV2PullRegistryTests(V2RegistryPullMixin, V1RegistryPushMixin, RegistryTestsMixin,
|
||||
|
|
Reference in a new issue