""" List and manage repository vulnerabilities and other sec information. """ import logging import features import requests import json from urlparse import urljoin from app import app from data import model from endpoints.api import (require_repo_read, NotFound, DownstreamIssue, path_param, RepositoryParamResource, resource, nickname, show_if, parse_args, query_param) logger = logging.getLogger(__name__) def _call_security_api(relative_url, *args, **kwargs): """ Issues an HTTP call to the sec API at the given relative URL. """ url = urljoin(app.config['SECURITY_SCANNER']['ENDPOINT'], relative_url % args) client = app.config['HTTPCLIENT'] timeout = app.config['SECURITY_SCANNER'].get('API_CALL_TIMEOUT', 1) logger.debug('Looking up sec information: %s', url) try: response = client.get(url, params=kwargs, timeout=timeout) except requests.exceptions.Timeout: raise DownstreamIssue(payload=dict(message='API call timed out')) except requests.exceptions.ConnectionError: raise DownstreamIssue(payload=dict(message='Could not connect to downstream service')) if response.status_code == 404: raise NotFound() try: response_data = json.loads(response.text) except ValueError: raise DownstreamIssue(payload=dict(message='Non-json response from downstream service')) if response.status_code / 100 != 2: logger.warning('Got %s status code to call %s: %s', response.status_code, url, response.text) raise DownstreamIssue(payload=dict(message=response_data['Message'])) return response_data @show_if(features.SECURITY_SCANNER) @resource('/v1/repository//tag//vulnerabilities') @path_param('repository', 'The full path of the repository. e.g. namespace/name') @path_param('tag', 'The name of the tag') class RepositoryTagVulnerabilities(RepositoryParamResource): """ Operations for managing the vulnerabilities in a repository tag. """ @require_repo_read @nickname('getRepoTagVulnerabilities') @parse_args @query_param('minimumPriority', 'Minimum vulnerability priority', type=str, default='Low') def get(self, args, namespace, repository, tag): """ Fetches the vulnerabilities (if any) for a repository tag. """ try: tag_image = model.tag.get_tag_image(namespace, repository, tag) except model.DataModelException: raise NotFound() if not tag_image.security_indexed: logger.debug('Image %s for tag %s under repository %s/%s not security indexed', tag_image.docker_image_id, tag, namespace, repository) return { 'security_indexed': False } data = _call_security_api('/layers/%s/vulnerabilities', tag_image.docker_image_id, minimumPriority=args.minimumPriority) return { 'security_indexed': True, 'data': data, } @show_if(features.SECURITY_SCANNER) @resource('/v1/repository//image//packages') @path_param('repository', 'The full path of the repository. e.g. namespace/name') @path_param('imageid', 'The image ID') class RepositoryImagePackages(RepositoryParamResource): """ Operations for listing the packages added/removed in an image. """ @require_repo_read @nickname('getRepoImagePackages') def get(self, namespace, repository, imageid): """ Fetches the packages added/removed in the given repo image. """ repo_image = model.image.get_repo_image(namespace, repository, imageid) if repo_image is None: raise NotFound() if not repo_image.security_indexed: return { 'security_indexed': False } data = _call_security_api('/layers/%s/packages', repo_image.docker_image_id) return { 'security_indexed': True, 'data': data, }