Merge pull request #3122 from quay/joseph.schorr/QUAY-975/disabled-routes

Audit which routes needed to be disabled for disabled users
This commit is contained in:
Joseph Schorr 2018-06-22 11:34:22 -04:00 committed by GitHub
commit 92aaa23669
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 80 additions and 2 deletions

View file

@ -204,6 +204,10 @@ def _verify_repo_verb(_, namespace, repo_name, tag, verb, checker=None):
get_authenticated_user())
abort(405)
# Make sure the repo's namespace isn't disabled.
if not model.is_namespace_enabled(namespace):
abort(400)
# If there is a data checker, call it first.
if checker is not None:
if not checker(tag_image):

View file

@ -152,3 +152,8 @@ class VerbsDataInterface(object):
or None if none.
"""
pass
@abstractmethod
def is_namespace_enabled(self, namespace_name):
""" Returns whether the given namespace exists and is enabled. """
pass

View file

@ -141,6 +141,10 @@ class PreOCIModel(VerbsDataInterface):
v1_metadata=_docker_v1_metadata(namespace_name, repo_name, found),
internal_db_id=found.id,)
def is_namespace_enabled(self, namespace_name):
namespace = model.user.get_namespace_user(namespace_name)
return namespace is not None and namespace.enabled
pre_oci_model = PreOCIModel()

View file

@ -16,6 +16,8 @@ class V2ProtocolSteps(Enum):
GET_MANIFEST = 'get-manifest'
PUT_MANIFEST = 'put-manifest'
MOUNT_BLOB = 'mount-blob'
CATALOG = 'catalog'
LIST_TAGS = 'list-tags'
class V2Protocol(RegistryProtocol):
@ -374,7 +376,8 @@ class V2Protocol(RegistryProtocol):
params['n'] = page_size
while True:
response = self.conduct(session, 'GET', url, headers=headers, params=params)
response = self.conduct(session, 'GET', url, headers=headers, params=params,
expected_status=(200, expected_failure, V2ProtocolSteps.LIST_TAGS))
data = response.json()
assert len(data['tags']) <= page_size
@ -421,7 +424,8 @@ class V2Protocol(RegistryProtocol):
params['n'] = page_size
while True:
response = self.conduct(session, 'GET', url, headers=headers, params=params)
response = self.conduct(session, 'GET', url, headers=headers, params=params,
expected_status=(200, expected_failure, V2ProtocolSteps.CATALOG))
data = response.json()
assert len(data['repositories']) <= page_size

View file

@ -739,6 +739,21 @@ def test_catalog_caching(v2_protocol, basic_images, liveserver_session, app_relo
assert set(cached_results) == set(results)
def test_catalog_disabled_namespace(v2_protocol, basic_images, liveserver_session, app_reloader,
liveserver, registry_server_executor):
credentials = ('devtable', 'password')
# Get a valid token.
token, _ = v2_protocol.auth(liveserver_session, credentials, 'devtable', 'simple')
# Disable the devtable namespace.
registry_server_executor.on(liveserver).disable_namespace('devtable')
# Try to retrieve the catalog and ensure it fails to return any results.
results = v2_protocol.catalog(liveserver_session, bearer_token=token)
assert len(results) == 0
@pytest.mark.parametrize('username, namespace, repository', [
('devtable', 'devtable', 'simple'),
('devtable', 'devtable', 'gargantuan'),
@ -764,6 +779,18 @@ def test_tags(username, namespace, repository, page_size, v2_protocol, liveserve
assert set([r for r in results]) == set(expected_tags)
def test_tags_disabled_namespace(v2_protocol, basic_images, liveserver_session, app_reloader,
liveserver, registry_server_executor):
credentials = ('devtable', 'password')
# Disable the buynlarge namespace.
registry_server_executor.on(liveserver).disable_namespace('buynlarge')
# Try to retrieve the tags and ensure it fails.
v2_protocol.tags(liveserver_session, credentials=credentials, namespace='buynlarge',
repo_name='orgrepo', expected_failure=Failures.NAMESPACE_DISABLED)
def test_pull_torrent(pusher, basic_images, liveserver_session, liveserver,
registry_server_executor, app_reloader):
""" Test: Retrieve a torrent for pulling the image via the Quay CLI. """
@ -799,6 +826,40 @@ def test_pull_torrent(pusher, basic_images, liveserver_session, liveserver,
assert expected == found
def test_squashed_image_disabled_namespace(pusher, sized_images, liveserver_session,
liveserver, registry_server_executor, app_reloader):
""" Test: Attempting to pull a squashed image from a disabled namespace. """
credentials = ('devtable', 'password')
# Push an image to download.
pusher.push(liveserver_session, 'buynlarge', 'newrepo', 'latest', sized_images,
credentials=credentials)
# Disable the buynlarge namespace.
registry_server_executor.on(liveserver).disable_namespace('buynlarge')
# Attempt to pull the squashed version.
response = liveserver_session.get('/c1/squash/buynlarge/newrepo/latest', auth=credentials)
assert response.status_code == 400
def test_squashed_image_disabled_user(pusher, sized_images, liveserver_session,
liveserver, registry_server_executor, app_reloader):
""" Test: Attempting to pull a squashed image via a disabled user. """
credentials = ('devtable', 'password')
# Push an image to download.
pusher.push(liveserver_session, 'buynlarge', 'newrepo', 'latest', sized_images,
credentials=credentials)
# Disable the devtable namespace.
registry_server_executor.on(liveserver).disable_namespace('devtable')
# Attempt to pull the squashed version.
response = liveserver_session.get('/c1/squash/buynlarge/newrepo/latest', auth=credentials)
assert response.status_code == 403
@pytest.mark.parametrize('use_estimates', [
False,
True,