This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/endpoints/api/secscan.py

119 lines
3.9 KiB
Python
Raw Normal View History

""" List and manage repository vulnerabilities and other sec information. """
import logging
import features
import json
import requests
from app import secscan_api
from data import model
from endpoints.api import (require_repo_read, NotFound, DownstreamIssue, path_param,
RepositoryParamResource, resource, nickname, show_if, parse_args,
query_param)
logger = logging.getLogger(__name__)
class SCAN_STATUS(object):
""" Security scan status enum """
SCANNED = 'scanned'
FAILED = 'failed'
QUEUED = 'queued'
def _call_security_api(relative_url, *args, **kwargs):
""" Issues an HTTP call to the sec API at the given relative URL. """
try:
response = secscan_api.call(relative_url, None, *args, **kwargs)
except requests.exceptions.Timeout:
raise DownstreamIssue(payload=dict(message='API call timed out'))
except requests.exceptions.ConnectionError:
raise DownstreamIssue(payload=dict(message='Could not connect to downstream service'))
if response.status_code == 404:
raise NotFound()
try:
response_data = json.loads(response.text)
except ValueError:
raise DownstreamIssue(payload=dict(message='Non-json response from downstream service'))
if response.status_code / 100 != 2:
logger.warning('Got %s status code to call: %s', response.status_code, response.text)
raise DownstreamIssue(payload=dict(message=response_data['Message']))
return response_data
def _get_status(repo_image):
if repo_image.security_indexed_engine:
return SCAN_STATUS.SCANNED if repo_image.security_indexed else SCAN_STATUS.FAILED
return SCAN_STATUS.QUEUED
@show_if(features.SECURITY_SCANNER)
@resource('/v1/repository/<repopath:repository>/image/<imageid>/vulnerabilities')
@path_param('repository', 'The full path of the repository. e.g. namespace/name')
@path_param('imageid', 'The image ID')
class RepositoryImageVulnerabilities(RepositoryParamResource):
""" Operations for managing the vulnerabilities in a repository image. """
@require_repo_read
@nickname('getRepoImageVulnerabilities')
@parse_args
@query_param('minimumPriority', 'Minimum vulnerability priority', type=str,
default='Low')
def get(self, args, namespace, repository, imageid):
""" Fetches the vulnerabilities (if any) for a repository tag. """
repo_image = model.image.get_repo_image(namespace, repository, imageid)
if repo_image is None:
raise NotFound()
if not repo_image.security_indexed:
logger.debug('Image %s under repository %s/%s not security indexed',
repo_image.docker_image_id, namespace, repository)
return {
'status': _get_status(repo_image),
}
layer_id = '%s.%s' % (repo_image.docker_image_id, repo_image.storage.uuid)
data = _call_security_api('layers/%s/vulnerabilities', layer_id,
minimumPriority=args.minimumPriority)
return {
'status': _get_status(repo_image),
'data': data,
}
@show_if(features.SECURITY_SCANNER)
@resource('/v1/repository/<repopath:repository>/image/<imageid>/packages')
@path_param('repository', 'The full path of the repository. e.g. namespace/name')
@path_param('imageid', 'The image ID')
class RepositoryImagePackages(RepositoryParamResource):
""" Operations for listing the packages added/removed in an image. """
@require_repo_read
@nickname('getRepoImagePackages')
def get(self, namespace, repository, imageid):
""" Fetches the packages added/removed in the given repo image. """
repo_image = model.image.get_repo_image(namespace, repository, imageid)
if repo_image is None:
raise NotFound()
if not repo_image.security_indexed:
return {
'status': _get_status(repo_image),
}
layer_id = '%s.%s' % (repo_image.docker_image_id, repo_image.storage.uuid)
data = _call_security_api('layers/%s/packages', layer_id)
return {
'status': _get_status(repo_image),
'data': data,
}