This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/endpoints/api/secscan.py
Joseph Schorr a69c9e12fd Update quay sec code to fix problems identified in previous review
- Change get_repository_images_recursive to operate over a single docker image and storage uuid
- Move endpoints/sec to endpoints/secscan
- Change notification system to work with new Quay-sec format

Fixes #768
2015-11-09 17:14:35 -05:00

103 lines
3.5 KiB
Python

""" List and manage repository vulnerabilities and other sec information. """
import logging
import features
import json
import requests
from app import secscan_endpoint
from data import model
from endpoints.api import (require_repo_read, NotFound, DownstreamIssue, path_param,
RepositoryParamResource, resource, nickname, show_if, parse_args,
query_param)
logger = logging.getLogger(__name__)
def _call_security_api(relative_url, *args, **kwargs):
""" Issues an HTTP call to the sec API at the given relative URL. """
try:
response = secscan_endpoint.call_api(relative_url, body=None, *args, **kwargs)
except requests.exceptions.Timeout:
raise DownstreamIssue(payload=dict(message='API call timed out'))
except requests.exceptions.ConnectionError:
raise DownstreamIssue(payload=dict(message='Could not connect to downstream service'))
if response.status_code == 404:
raise NotFound()
try:
response_data = json.loads(response.text)
except ValueError:
raise DownstreamIssue(payload=dict(message='Non-json response from downstream service'))
if response.status_code / 100 != 2:
logger.warning('Got %s status code to call: %s', response.status_code, response.text)
raise DownstreamIssue(payload=dict(message=response_data['Message']))
return response_data
@show_if(features.SECURITY_SCANNER)
@resource('/v1/repository/<repopath:repository>/tag/<tag>/vulnerabilities')
@path_param('repository', 'The full path of the repository. e.g. namespace/name')
@path_param('tag', 'The name of the tag')
class RepositoryTagVulnerabilities(RepositoryParamResource):
""" Operations for managing the vulnerabilities in a repository tag. """
@require_repo_read
@nickname('getRepoTagVulnerabilities')
@parse_args
@query_param('minimumPriority', 'Minimum vulnerability priority', type=str,
default='Low')
def get(self, args, namespace, repository, tag):
""" Fetches the vulnerabilities (if any) for a repository tag. """
try:
tag_image = model.tag.get_tag_image(namespace, repository, tag)
except model.DataModelException:
raise NotFound()
if not tag_image.security_indexed:
logger.debug('Image %s for tag %s under repository %s/%s not security indexed',
tag_image.docker_image_id, tag, namespace, repository)
return {
'security_indexed': False
}
data = _call_security_api('layers/%s/vulnerabilities', tag_image.docker_image_id,
minimumPriority=args.minimumPriority)
return {
'security_indexed': True,
'data': data,
}
@show_if(features.SECURITY_SCANNER)
@resource('/v1/repository/<repopath:repository>/image/<imageid>/packages')
@path_param('repository', 'The full path of the repository. e.g. namespace/name')
@path_param('imageid', 'The image ID')
class RepositoryImagePackages(RepositoryParamResource):
""" Operations for listing the packages added/removed in an image. """
@require_repo_read
@nickname('getRepoImagePackages')
def get(self, namespace, repository, imageid):
""" Fetches the packages added/removed in the given repo image. """
repo_image = model.image.get_repo_image(namespace, repository, imageid)
if repo_image is None:
raise NotFound()
if not repo_image.security_indexed:
return {
'security_indexed': False
}
data = _call_security_api('layers/%s/packages/diff', repo_image.docker_image_id)
return {
'security_indexed': True,
'data': data,
}