This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/endpoints/api/secscan.py

91 lines
3.1 KiB
Python
Raw Normal View History

""" List and manage repository vulnerabilities and other security information. """
import logging
import features
import json
import requests
from app import secscan_api
from data import model
from endpoints.api import (require_repo_read, NotFound, DownstreamIssue, path_param,
RepositoryParamResource, resource, nickname, show_if, parse_args,
query_param, truthy_bool)
logger = logging.getLogger(__name__)
class SCAN_STATUS(object):
""" Security scan status enum """
SCANNED = 'scanned'
FAILED = 'failed'
QUEUED = 'queued'
def _call_security_api(relative_url, *args, **kwargs):
""" Issues an HTTP call to the sec API at the given relative URL. """
try:
response = secscan_api.call(relative_url, None, *args, **kwargs)
except requests.exceptions.Timeout:
raise DownstreamIssue(payload=dict(message='API call timed out'))
except requests.exceptions.ConnectionError:
raise DownstreamIssue(payload=dict(message='Could not connect to downstream service'))
if response.status_code == 404:
raise NotFound()
try:
response_data = json.loads(response.text)
except ValueError:
raise DownstreamIssue(payload=dict(message='Non-json response from downstream service'))
if response.status_code / 100 != 2:
logger.warning('Got %s status code to call: %s', response.status_code, response.text)
raise DownstreamIssue(payload=dict(message=response_data['Message']))
return response_data
def _get_status(repo_image):
2015-11-13 06:06:18 +00:00
if repo_image.security_indexed_engine is not None and repo_image.security_indexed_engine >= 0:
return SCAN_STATUS.SCANNED if repo_image.security_indexed else SCAN_STATUS.FAILED
return SCAN_STATUS.QUEUED
@show_if(features.SECURITY_SCANNER)
@resource('/v1/repository/<apirepopath:repository>/image/<imageid>/security')
@path_param('repository', 'The full path of the repository. e.g. namespace/name')
@path_param('imageid', 'The image ID')
class RepositoryImageSecurity(RepositoryParamResource):
""" Operations for managing the vulnerabilities in a repository image. """
@require_repo_read
@nickname('getRepoImageSecurity')
@parse_args()
@query_param('vulnerabilities', 'Include vulnerabilities informations', type=truthy_bool,
default=False)
def get(self, namespace, repository, imageid, parsed_args):
""" Fetches the features and vulnerabilities (if any) for a repository tag. """
repo_image = model.image.get_repo_image(namespace, repository, imageid)
if repo_image is None:
raise NotFound()
if not repo_image.security_indexed:
logger.debug('Image %s under repository %s/%s not security indexed',
repo_image.docker_image_id, namespace, repository)
return {
'status': _get_status(repo_image),
}
layer_id = '%s.%s' % (repo_image.docker_image_id, repo_image.storage.uuid)
if parsed_args.vulnerabilities:
data = _call_security_api('layers/%s?vulnerabilities', layer_id)
else:
data = _call_security_api('layers/%s?features', layer_id)
return {
'status': _get_status(repo_image),
'data': data,
}