""" List and manage repository vulnerabilities and other sec information. """ import logging import features import json import requests from app import secscan_api from data import model from endpoints.api import (require_repo_read, NotFound, DownstreamIssue, path_param, RepositoryParamResource, resource, nickname, show_if, parse_args, query_param) logger = logging.getLogger(__name__) class SCAN_STATUS(object): """ Security scan status enum """ SCANNED = 'scanned' FAILED = 'failed' QUEUED = 'queued' def _call_security_api(relative_url, *args, **kwargs): """ Issues an HTTP call to the sec API at the given relative URL. """ try: response = secscan_api.call(relative_url, None, *args, **kwargs) except requests.exceptions.Timeout: raise DownstreamIssue(payload=dict(message='API call timed out')) except requests.exceptions.ConnectionError: raise DownstreamIssue(payload=dict(message='Could not connect to downstream service')) if response.status_code == 404: raise NotFound() try: response_data = json.loads(response.text) except ValueError: raise DownstreamIssue(payload=dict(message='Non-json response from downstream service')) if response.status_code / 100 != 2: logger.warning('Got %s status code to call: %s', response.status_code, response.text) raise DownstreamIssue(payload=dict(message=response_data['Message'])) return response_data def _get_status(repo_image): if repo_image.security_indexed_engine is not None and repo_image.security_indexed_engine >= 0: return SCAN_STATUS.SCANNED if repo_image.security_indexed else SCAN_STATUS.FAILED return SCAN_STATUS.QUEUED @show_if(features.SECURITY_SCANNER) @resource('/v1/repository//image//vulnerabilities') @path_param('repository', 'The full path of the repository. e.g. namespace/name') @path_param('imageid', 'The image ID') class RepositoryImageVulnerabilities(RepositoryParamResource): """ Operations for managing the vulnerabilities in a repository image. """ @require_repo_read @nickname('getRepoImageVulnerabilities') @parse_args @query_param('minimumPriority', 'Minimum vulnerability priority', type=str, default='Low') def get(self, args, namespace, repository, imageid): """ Fetches the vulnerabilities (if any) for a repository tag. """ repo_image = model.image.get_repo_image(namespace, repository, imageid) if repo_image is None: raise NotFound() if not repo_image.security_indexed: logger.debug('Image %s under repository %s/%s not security indexed', repo_image.docker_image_id, namespace, repository) return { 'status': _get_status(repo_image), } layer_id = '%s.%s' % (repo_image.docker_image_id, repo_image.storage.uuid) data = _call_security_api('layers/%s/vulnerabilities', layer_id, minimumPriority=args.minimumPriority) return { 'status': _get_status(repo_image), 'data': data, } @show_if(features.SECURITY_SCANNER) @resource('/v1/repository//image//packages') @path_param('repository', 'The full path of the repository. e.g. namespace/name') @path_param('imageid', 'The image ID') class RepositoryImagePackages(RepositoryParamResource): """ Operations for listing the packages added/removed in an image. """ @require_repo_read @nickname('getRepoImagePackages') def get(self, namespace, repository, imageid): """ Fetches the packages added/removed in the given repo image. """ repo_image = model.image.get_repo_image(namespace, repository, imageid) if repo_image is None: raise NotFound() if not repo_image.security_indexed: return { 'status': _get_status(repo_image), } layer_id = '%s.%s' % (repo_image.docker_image_id, repo_image.storage.uuid) data = _call_security_api('layers/%s/packages', layer_id) return { 'status': _get_status(repo_image), 'data': data, }