diff --git a/endpoints/api/secscan.py b/endpoints/api/secscan.py index f8727140c..7295e6604 100644 --- a/endpoints/api/secscan.py +++ b/endpoints/api/secscan.py @@ -9,6 +9,7 @@ from endpoints.api import (require_repo_read, path_param, RepositoryParamResource, resource, nickname, show_if, parse_args, query_param, truthy_bool) from endpoints.exception import NotFound, DownstreamIssue +from endpoints.api.manifest import MANIFEST_DIGEST_ROUTE from util.secscan.api import APIRequestFailure @@ -23,11 +24,39 @@ class SCAN_STATUS(object): def _get_status(repo_image): + """ Returns the SCAN_STATUS value for the given image. """ if repo_image.security_indexed_engine is not None and repo_image.security_indexed_engine >= 0: return SCAN_STATUS.SCANNED if repo_image.security_indexed else SCAN_STATUS.FAILED return SCAN_STATUS.QUEUED +def _security_status_for_image(namespace, repository, repo_image, include_vulnerabilities=True): + """ Returns a dict representing the result of a call to the security status API for the given + image. + """ + if not repo_image.security_indexed: + logger.debug('Image %s under repository %s/%s not security indexed', + repo_image.docker_image_id, namespace, repository) + return { + 'status': _get_status(repo_image), + } + + try: + if include_vulnerabilities: + data = secscan_api.get_layer_data(repo_image, include_vulnerabilities=True) + else: + data = secscan_api.get_layer_data(repo_image, include_features=True) + except APIRequestFailure as arf: + raise DownstreamIssue({'message': arf.message}) + + if data is None: + raise NotFound() + + return { + 'status': _get_status(repo_image), + 'data': data, + } + @show_if(features.SECURITY_SCANNER) @resource('/v1/repository//image//security') @@ -47,25 +76,28 @@ class RepositoryImageSecurity(RepositoryParamResource): if repo_image is None: raise NotFound() - if not repo_image.security_indexed: - logger.debug('Image %s under repository %s/%s not security indexed', - repo_image.docker_image_id, namespace, repository) - return { - 'status': _get_status(repo_image), - } + return _security_status_for_image(namespace, repository, repo_image, + parsed_args.vulnerabilities) +@show_if(features.SECURITY_SCANNER) +@resource(MANIFEST_DIGEST_ROUTE + '/security') +@path_param('repository', 'The full path of the repository. e.g. namespace/name') +@path_param('manifestref', 'The digest of the manifest') +class RepositoryManifestSecurity(RepositoryParamResource): + """ Operations for managing the vulnerabilities in a repository manifest. """ + + @require_repo_read + @nickname('getRepoManifestSecurity') + @parse_args() + @query_param('vulnerabilities', 'Include vulnerabilities informations', type=truthy_bool, + default=False) + def get(self, namespace, repository, manifestref, parsed_args): try: - if parsed_args.vulnerabilities: - data = secscan_api.get_layer_data(repo_image, include_vulnerabilities=True) - else: - data = secscan_api.get_layer_data(repo_image, include_features=True) - except APIRequestFailure as arf: - raise DownstreamIssue({'message': arf.message}) - - if data is None: + tag_manifest = model.tag.load_manifest_by_digest(namespace, repository, manifestref) + except model.DataModelException: raise NotFound() - return { - 'status': _get_status(repo_image), - 'data': data, - } + repo_image = tag_manifest.tag.image + + return _security_status_for_image(namespace, repository, repo_image, + parsed_args.vulnerabilities) diff --git a/endpoints/wellknown.py b/endpoints/wellknown.py index c1a86d18d..fc580d6b4 100644 --- a/endpoints/wellknown.py +++ b/endpoints/wellknown.py @@ -10,8 +10,12 @@ wellknown = Blueprint('wellknown', __name__) @wellknown.route('/app-capabilities', methods=['GET']) def app_capabilities(): view_image_tmpl = '%s/{namespace}/{reponame}:{tag}' % get_app_url() - image_security_tmpl = ('%s/api/v1/repository/{namespace}/{reponame}/image/{imageid}/security' % - get_app_url()) + + image_security = '%s/api/v1/repository/{namespace}/{reponame}/image/{imageid}/security' + image_security_tmpl = image_security % get_app_url() + + manifest_security = '%s/api/v1/repository/{namespace}/{reponame}/manifest/{digest}/security' + manifest_security_tmpl = manifest_security % get_app_url() metadata = { 'appName': 'io.quay', @@ -23,6 +27,10 @@ def app_capabilities(): 'io.quay.image-security': { 'rest-api-template': image_security_tmpl, }, + + 'io.quay.manifest-security': { + 'rest-api-template': manifest_security_tmpl, + }, }, } diff --git a/test/test_api_security.py b/test/test_api_security.py index 78ef90308..fef2b4527 100644 --- a/test/test_api_security.py +++ b/test/test_api_security.py @@ -56,7 +56,7 @@ from endpoints.api.superuser import (SuperUserLogs, SuperUserList, SuperUserMana SuperUserCustomCertificate, SuperUserRepositoryBuildLogs, SuperUserRepositoryBuildResource, SuperUserRepositoryBuildStatus) from endpoints.api.globalmessages import GlobalUserMessage, GlobalUserMessages -from endpoints.api.secscan import RepositoryImageSecurity +from endpoints.api.secscan import RepositoryImageSecurity, RepositoryManifestSecurity from endpoints.api.manifest import RepositoryManifestLabels, ManageRepositoryManifestLabel @@ -4522,6 +4522,23 @@ class TestRepositoryImageSecurity(ApiTestCase): self._run_test('GET', 404, 'devtable', None) +class TestRepositoryManifestSecurity(ApiTestCase): + def setUp(self): + ApiTestCase.setUp(self) + self._set_url(RepositoryManifestSecurity, repository='devtable/simple', manifestref='sha256:abcd') + + def test_get_anonymous(self): + self._run_test('GET', 401, None, None) + + def test_get_freshuser(self): + self._run_test('GET', 403, 'freshuser', None) + + def test_get_reader(self): + self._run_test('GET', 403, 'reader', None) + + def test_get_devtable(self): + self._run_test('GET', 404, 'devtable', None) + class TestRepositoryManifestLabels(ApiTestCase): def setUp(self): ApiTestCase.setUp(self) diff --git a/test/test_api_usage.py b/test/test_api_usage.py index a3cdf7eb0..d5a0f5b4c 100644 --- a/test/test_api_usage.py +++ b/test/test_api_usage.py @@ -70,7 +70,7 @@ from endpoints.api.superuser import (SuperUserLogs, SuperUserList, SuperUserMana SuperUserServiceKeyApproval, SuperUserTakeOwnership, SuperUserCustomCertificates, SuperUserCustomCertificate) from endpoints.api.globalmessages import (GlobalUserMessage, GlobalUserMessages,) -from endpoints.api.secscan import RepositoryImageSecurity +from endpoints.api.secscan import RepositoryImageSecurity, RepositoryManifestSecurity from endpoints.api.suconfig import (SuperUserRegistryStatus, SuperUserConfig, SuperUserConfigFile, SuperUserCreateInitialSuperUser) from endpoints.api.manifest import RepositoryManifestLabels, ManageRepositoryManifestLabel @@ -4257,14 +4257,24 @@ class TestRepositoryImageSecurity(ApiTestCase): def test_get_vulnerabilities(self): self.login(ADMIN_ACCESS_USER) + tag = model.tag.get_active_tag(ADMIN_ACCESS_USER, 'simple', 'latest') layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, 'simple', 'latest') + tag_manifest = database.TagManifest.get(tag=tag) + # Grab the security info for the tag. It should be queued. - response = self.getJsonResponse(RepositoryImageSecurity, - params=dict(repository=ADMIN_ACCESS_USER + '/simple', - imageid=layer.docker_image_id, - vulnerabilities='true')) - self.assertEquals('queued', response['status']) + manifest_response = self.getJsonResponse(RepositoryManifestSecurity, + params=dict(repository=ADMIN_ACCESS_USER + '/simple', + manifestref=tag_manifest.digest, + vulnerabilities='true')) + + image_response = self.getJsonResponse(RepositoryImageSecurity, + params=dict(repository=ADMIN_ACCESS_USER + '/simple', + imageid=layer.docker_image_id, + vulnerabilities='true')) + + self.assertEquals(manifest_response, image_response) + self.assertEquals('queued', image_response['status']) # Mark the layer as indexed. layer.security_indexed = True @@ -4275,12 +4285,19 @@ class TestRepositoryImageSecurity(ApiTestCase): with fake_security_scanner() as security_scanner: security_scanner.add_layer(security_scanner.layer_id(layer)) - response = self.getJsonResponse(RepositoryImageSecurity, - params=dict(repository=ADMIN_ACCESS_USER + '/simple', - imageid=layer.docker_image_id, - vulnerabilities='true')) - self.assertEquals('scanned', response['status']) - self.assertEquals(1, response['data']['Layer']['IndexedByVersion']) + manifest_response = self.getJsonResponse(RepositoryManifestSecurity, + params=dict(repository=ADMIN_ACCESS_USER + '/simple', + manifestref=tag_manifest.digest, + vulnerabilities='true')) + + image_response = self.getJsonResponse(RepositoryImageSecurity, + params=dict(repository=ADMIN_ACCESS_USER + '/simple', + imageid=layer.docker_image_id, + vulnerabilities='true')) + + self.assertEquals(manifest_response, image_response) + self.assertEquals('scanned', image_response['status']) + self.assertEquals(1, image_response['data']['Layer']['IndexedByVersion']) class TestSuperUserCustomCertificates(ApiTestCase):