Add API endpoint for retrieving security status by *manifest*, rather than Docker V1 image ID

This commit is contained in:
Joseph Schorr 2017-02-02 17:51:18 -05:00
parent 0150abc488
commit cf539487a1
4 changed files with 107 additions and 33 deletions

View file

@ -9,6 +9,7 @@ from endpoints.api import (require_repo_read, path_param,
RepositoryParamResource, resource, nickname, show_if, parse_args, RepositoryParamResource, resource, nickname, show_if, parse_args,
query_param, truthy_bool) query_param, truthy_bool)
from endpoints.exception import NotFound, DownstreamIssue from endpoints.exception import NotFound, DownstreamIssue
from endpoints.api.manifest import MANIFEST_DIGEST_ROUTE
from util.secscan.api import APIRequestFailure from util.secscan.api import APIRequestFailure
@ -23,11 +24,39 @@ class SCAN_STATUS(object):
def _get_status(repo_image): def _get_status(repo_image):
""" Returns the SCAN_STATUS value for the given image. """
if repo_image.security_indexed_engine is not None and repo_image.security_indexed_engine >= 0: if repo_image.security_indexed_engine is not None and repo_image.security_indexed_engine >= 0:
return SCAN_STATUS.SCANNED if repo_image.security_indexed else SCAN_STATUS.FAILED return SCAN_STATUS.SCANNED if repo_image.security_indexed else SCAN_STATUS.FAILED
return SCAN_STATUS.QUEUED return SCAN_STATUS.QUEUED
def _security_status_for_image(namespace, repository, repo_image, include_vulnerabilities=True):
""" Returns a dict representing the result of a call to the security status API for the given
image.
"""
if not repo_image.security_indexed:
logger.debug('Image %s under repository %s/%s not security indexed',
repo_image.docker_image_id, namespace, repository)
return {
'status': _get_status(repo_image),
}
try:
if include_vulnerabilities:
data = secscan_api.get_layer_data(repo_image, include_vulnerabilities=True)
else:
data = secscan_api.get_layer_data(repo_image, include_features=True)
except APIRequestFailure as arf:
raise DownstreamIssue({'message': arf.message})
if data is None:
raise NotFound()
return {
'status': _get_status(repo_image),
'data': data,
}
@show_if(features.SECURITY_SCANNER) @show_if(features.SECURITY_SCANNER)
@resource('/v1/repository/<apirepopath:repository>/image/<imageid>/security') @resource('/v1/repository/<apirepopath:repository>/image/<imageid>/security')
@ -47,25 +76,28 @@ class RepositoryImageSecurity(RepositoryParamResource):
if repo_image is None: if repo_image is None:
raise NotFound() raise NotFound()
if not repo_image.security_indexed: return _security_status_for_image(namespace, repository, repo_image,
logger.debug('Image %s under repository %s/%s not security indexed', parsed_args.vulnerabilities)
repo_image.docker_image_id, namespace, repository)
return {
'status': _get_status(repo_image),
}
@show_if(features.SECURITY_SCANNER)
@resource(MANIFEST_DIGEST_ROUTE + '/security')
@path_param('repository', 'The full path of the repository. e.g. namespace/name')
@path_param('manifestref', 'The digest of the manifest')
class RepositoryManifestSecurity(RepositoryParamResource):
""" Operations for managing the vulnerabilities in a repository manifest. """
@require_repo_read
@nickname('getRepoManifestSecurity')
@parse_args()
@query_param('vulnerabilities', 'Include vulnerabilities informations', type=truthy_bool,
default=False)
def get(self, namespace, repository, manifestref, parsed_args):
try: try:
if parsed_args.vulnerabilities: tag_manifest = model.tag.load_manifest_by_digest(namespace, repository, manifestref)
data = secscan_api.get_layer_data(repo_image, include_vulnerabilities=True) except model.DataModelException:
else:
data = secscan_api.get_layer_data(repo_image, include_features=True)
except APIRequestFailure as arf:
raise DownstreamIssue({'message': arf.message})
if data is None:
raise NotFound() raise NotFound()
return { repo_image = tag_manifest.tag.image
'status': _get_status(repo_image),
'data': data, return _security_status_for_image(namespace, repository, repo_image,
} parsed_args.vulnerabilities)

View file

@ -10,8 +10,12 @@ wellknown = Blueprint('wellknown', __name__)
@wellknown.route('/app-capabilities', methods=['GET']) @wellknown.route('/app-capabilities', methods=['GET'])
def app_capabilities(): def app_capabilities():
view_image_tmpl = '%s/{namespace}/{reponame}:{tag}' % get_app_url() view_image_tmpl = '%s/{namespace}/{reponame}:{tag}' % get_app_url()
image_security_tmpl = ('%s/api/v1/repository/{namespace}/{reponame}/image/{imageid}/security' %
get_app_url()) image_security = '%s/api/v1/repository/{namespace}/{reponame}/image/{imageid}/security'
image_security_tmpl = image_security % get_app_url()
manifest_security = '%s/api/v1/repository/{namespace}/{reponame}/manifest/{digest}/security'
manifest_security_tmpl = manifest_security % get_app_url()
metadata = { metadata = {
'appName': 'io.quay', 'appName': 'io.quay',
@ -23,6 +27,10 @@ def app_capabilities():
'io.quay.image-security': { 'io.quay.image-security': {
'rest-api-template': image_security_tmpl, 'rest-api-template': image_security_tmpl,
}, },
'io.quay.manifest-security': {
'rest-api-template': manifest_security_tmpl,
},
}, },
} }

View file

@ -56,7 +56,7 @@ from endpoints.api.superuser import (SuperUserLogs, SuperUserList, SuperUserMana
SuperUserCustomCertificate, SuperUserRepositoryBuildLogs, SuperUserCustomCertificate, SuperUserRepositoryBuildLogs,
SuperUserRepositoryBuildResource, SuperUserRepositoryBuildStatus) SuperUserRepositoryBuildResource, SuperUserRepositoryBuildStatus)
from endpoints.api.globalmessages import GlobalUserMessage, GlobalUserMessages from endpoints.api.globalmessages import GlobalUserMessage, GlobalUserMessages
from endpoints.api.secscan import RepositoryImageSecurity from endpoints.api.secscan import RepositoryImageSecurity, RepositoryManifestSecurity
from endpoints.api.manifest import RepositoryManifestLabels, ManageRepositoryManifestLabel from endpoints.api.manifest import RepositoryManifestLabels, ManageRepositoryManifestLabel
@ -4522,6 +4522,23 @@ class TestRepositoryImageSecurity(ApiTestCase):
self._run_test('GET', 404, 'devtable', None) self._run_test('GET', 404, 'devtable', None)
class TestRepositoryManifestSecurity(ApiTestCase):
def setUp(self):
ApiTestCase.setUp(self)
self._set_url(RepositoryManifestSecurity, repository='devtable/simple', manifestref='sha256:abcd')
def test_get_anonymous(self):
self._run_test('GET', 401, None, None)
def test_get_freshuser(self):
self._run_test('GET', 403, 'freshuser', None)
def test_get_reader(self):
self._run_test('GET', 403, 'reader', None)
def test_get_devtable(self):
self._run_test('GET', 404, 'devtable', None)
class TestRepositoryManifestLabels(ApiTestCase): class TestRepositoryManifestLabels(ApiTestCase):
def setUp(self): def setUp(self):
ApiTestCase.setUp(self) ApiTestCase.setUp(self)

View file

@ -70,7 +70,7 @@ from endpoints.api.superuser import (SuperUserLogs, SuperUserList, SuperUserMana
SuperUserServiceKeyApproval, SuperUserTakeOwnership, SuperUserServiceKeyApproval, SuperUserTakeOwnership,
SuperUserCustomCertificates, SuperUserCustomCertificate) SuperUserCustomCertificates, SuperUserCustomCertificate)
from endpoints.api.globalmessages import (GlobalUserMessage, GlobalUserMessages,) from endpoints.api.globalmessages import (GlobalUserMessage, GlobalUserMessages,)
from endpoints.api.secscan import RepositoryImageSecurity from endpoints.api.secscan import RepositoryImageSecurity, RepositoryManifestSecurity
from endpoints.api.suconfig import (SuperUserRegistryStatus, SuperUserConfig, SuperUserConfigFile, from endpoints.api.suconfig import (SuperUserRegistryStatus, SuperUserConfig, SuperUserConfigFile,
SuperUserCreateInitialSuperUser) SuperUserCreateInitialSuperUser)
from endpoints.api.manifest import RepositoryManifestLabels, ManageRepositoryManifestLabel from endpoints.api.manifest import RepositoryManifestLabels, ManageRepositoryManifestLabel
@ -4257,14 +4257,24 @@ class TestRepositoryImageSecurity(ApiTestCase):
def test_get_vulnerabilities(self): def test_get_vulnerabilities(self):
self.login(ADMIN_ACCESS_USER) self.login(ADMIN_ACCESS_USER)
tag = model.tag.get_active_tag(ADMIN_ACCESS_USER, 'simple', 'latest')
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, 'simple', 'latest') layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, 'simple', 'latest')
tag_manifest = database.TagManifest.get(tag=tag)
# Grab the security info for the tag. It should be queued. # Grab the security info for the tag. It should be queued.
response = self.getJsonResponse(RepositoryImageSecurity, manifest_response = self.getJsonResponse(RepositoryManifestSecurity,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', params=dict(repository=ADMIN_ACCESS_USER + '/simple',
imageid=layer.docker_image_id, manifestref=tag_manifest.digest,
vulnerabilities='true')) vulnerabilities='true'))
self.assertEquals('queued', response['status'])
image_response = self.getJsonResponse(RepositoryImageSecurity,
params=dict(repository=ADMIN_ACCESS_USER + '/simple',
imageid=layer.docker_image_id,
vulnerabilities='true'))
self.assertEquals(manifest_response, image_response)
self.assertEquals('queued', image_response['status'])
# Mark the layer as indexed. # Mark the layer as indexed.
layer.security_indexed = True layer.security_indexed = True
@ -4275,12 +4285,19 @@ class TestRepositoryImageSecurity(ApiTestCase):
with fake_security_scanner() as security_scanner: with fake_security_scanner() as security_scanner:
security_scanner.add_layer(security_scanner.layer_id(layer)) security_scanner.add_layer(security_scanner.layer_id(layer))
response = self.getJsonResponse(RepositoryImageSecurity, manifest_response = self.getJsonResponse(RepositoryManifestSecurity,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', params=dict(repository=ADMIN_ACCESS_USER + '/simple',
imageid=layer.docker_image_id, manifestref=tag_manifest.digest,
vulnerabilities='true')) vulnerabilities='true'))
self.assertEquals('scanned', response['status'])
self.assertEquals(1, response['data']['Layer']['IndexedByVersion']) image_response = self.getJsonResponse(RepositoryImageSecurity,
params=dict(repository=ADMIN_ACCESS_USER + '/simple',
imageid=layer.docker_image_id,
vulnerabilities='true'))
self.assertEquals(manifest_response, image_response)
self.assertEquals('scanned', image_response['status'])
self.assertEquals(1, image_response['data']['Layer']['IndexedByVersion'])
class TestSuperUserCustomCertificates(ApiTestCase): class TestSuperUserCustomCertificates(ApiTestCase):