This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/endpoints/api/secscan.py

105 lines
3.6 KiB
Python
Raw Normal View History

""" List and manage repository vulnerabilities and other sec information. """
import logging
import features
import json
import requests
from app import secscan_api
from data import model
from endpoints.api import (require_repo_read, NotFound, DownstreamIssue, path_param,
RepositoryParamResource, resource, nickname, show_if, parse_args,
query_param)
logger = logging.getLogger(__name__)
def _call_security_api(relative_url, *args, **kwargs):
""" Issues an HTTP call to the sec API at the given relative URL. """
try:
response = secscan_api.call(relative_url, None, *args, **kwargs)
except requests.exceptions.Timeout:
raise DownstreamIssue(payload=dict(message='API call timed out'))
except requests.exceptions.ConnectionError:
raise DownstreamIssue(payload=dict(message='Could not connect to downstream service'))
if response.status_code == 404:
raise NotFound()
try:
response_data = json.loads(response.text)
except ValueError:
raise DownstreamIssue(payload=dict(message='Non-json response from downstream service'))
if response.status_code / 100 != 2:
logger.warning('Got %s status code to call: %s', response.status_code, response.text)
raise DownstreamIssue(payload=dict(message=response_data['Message']))
return response_data
@show_if(features.SECURITY_SCANNER)
@resource('/v1/repository/<repopath:repository>/image/<imageid>/vulnerabilities')
@path_param('repository', 'The full path of the repository. e.g. namespace/name')
@path_param('imageid', 'The image ID')
class RepositoryImageVulnerabilities(RepositoryParamResource):
""" Operations for managing the vulnerabilities in a repository image. """
@require_repo_read
@nickname('getRepoImageVulnerabilities')
@parse_args
@query_param('minimumPriority', 'Minimum vulnerability priority', type=str,
default='Low')
def get(self, args, namespace, repository, imageid):
""" Fetches the vulnerabilities (if any) for a repository tag. """
repo_image = model.image.get_repo_image(namespace, repository, imageid)
if repo_image is None:
raise NotFound()
if not repo_image.security_indexed:
logger.debug('Image %s under repository %s/%s not security indexed',
repo_image.docker_image_id, namespace, repository)
return {
'security_indexed': False
}
layer_id = '%s.%s' % (repo_image.docker_image_id, repo_image.storage.uuid)
data = _call_security_api('layers/%s/vulnerabilities', layer_id,
minimumPriority=args.minimumPriority)
return {
'security_indexed': True,
'data': data,
}
@show_if(features.SECURITY_SCANNER)
@resource('/v1/repository/<repopath:repository>/image/<imageid>/packages')
@path_param('repository', 'The full path of the repository. e.g. namespace/name')
@path_param('imageid', 'The image ID')
class RepositoryImagePackages(RepositoryParamResource):
""" Operations for listing the packages added/removed in an image. """
@require_repo_read
@nickname('getRepoImagePackages')
def get(self, namespace, repository, imageid):
""" Fetches the packages added/removed in the given repo image. """
repo_image = model.image.get_repo_image(namespace, repository, imageid)
if repo_image is None:
raise NotFound()
if not repo_image.security_indexed:
return {
'security_indexed': False
}
layer_id = '%s.%s' % (repo_image.docker_image_id, repo_image.storage.uuid)
data = _call_security_api('layers/%s/packages', layer_id)
return {
'security_indexed': True,
'data': data,
}