""" List and manage repository vulnerabilities and other security information. """ import logging import features from app import secscan_api from data import model from endpoints.api import (require_repo_read, path_param, RepositoryParamResource, resource, nickname, show_if, parse_args, query_param, truthy_bool) from endpoints.exception import NotFound, DownstreamIssue from endpoints.api.manifest import MANIFEST_DIGEST_ROUTE from util.secscan.api import APIRequestFailure logger = logging.getLogger(__name__) class SCAN_STATUS(object): """ Security scan status enum """ SCANNED = 'scanned' FAILED = 'failed' QUEUED = 'queued' def _get_status(repo_image): """ Returns the SCAN_STATUS value for the given image. """ if repo_image.security_indexed_engine is not None and repo_image.security_indexed_engine >= 0: return SCAN_STATUS.SCANNED if repo_image.security_indexed else SCAN_STATUS.FAILED return SCAN_STATUS.QUEUED def _security_status_for_image(namespace, repository, repo_image, include_vulnerabilities=True): """ Returns a dict representing the result of a call to the security status API for the given image. """ if not repo_image.security_indexed: logger.debug('Image %s under repository %s/%s not security indexed', repo_image.docker_image_id, namespace, repository) return { 'status': _get_status(repo_image), } try: if include_vulnerabilities: data = secscan_api.get_layer_data(repo_image, include_vulnerabilities=True) else: data = secscan_api.get_layer_data(repo_image, include_features=True) except APIRequestFailure as arf: raise DownstreamIssue({'message': arf.message}) if data is None: raise NotFound() return { 'status': _get_status(repo_image), 'data': data, } @show_if(features.SECURITY_SCANNER) @resource('/v1/repository//image//security') @path_param('repository', 'The full path of the repository. e.g. namespace/name') @path_param('imageid', 'The image ID') class RepositoryImageSecurity(RepositoryParamResource): """ Operations for managing the vulnerabilities in a repository image. """ @require_repo_read @nickname('getRepoImageSecurity') @parse_args() @query_param('vulnerabilities', 'Include vulnerabilities informations', type=truthy_bool, default=False) def get(self, namespace, repository, imageid, parsed_args): """ Fetches the features and vulnerabilities (if any) for a repository image. """ repo_image = model.image.get_repo_image(namespace, repository, imageid) if repo_image is None: raise NotFound() return _security_status_for_image(namespace, repository, repo_image, parsed_args.vulnerabilities) @show_if(features.SECURITY_SCANNER) @resource(MANIFEST_DIGEST_ROUTE + '/security') @path_param('repository', 'The full path of the repository. e.g. namespace/name') @path_param('manifestref', 'The digest of the manifest') class RepositoryManifestSecurity(RepositoryParamResource): """ Operations for managing the vulnerabilities in a repository manifest. """ @require_repo_read @nickname('getRepoManifestSecurity') @parse_args() @query_param('vulnerabilities', 'Include vulnerabilities informations', type=truthy_bool, default=False) def get(self, namespace, repository, manifestref, parsed_args): try: tag_manifest = model.tag.load_manifest_by_digest(namespace, repository, manifestref) except model.DataModelException: raise NotFound() repo_image = tag_manifest.tag.image return _security_status_for_image(namespace, repository, repo_image, parsed_args.vulnerabilities)