118 lines
		
	
	
	
		
			4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			118 lines
		
	
	
	
		
			4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| """ List and manage repository vulnerabilities and other sec information. """
 | |
| 
 | |
| import logging
 | |
| import features
 | |
| import json
 | |
| import requests
 | |
| 
 | |
| from app import secscan_api
 | |
| from data import model
 | |
| from endpoints.api import (require_repo_read, NotFound, DownstreamIssue, path_param,
 | |
|                            RepositoryParamResource, resource, nickname, show_if, parse_args,
 | |
|                            query_param)
 | |
| 
 | |
| 
 | |
| logger = logging.getLogger(__name__)
 | |
| 
 | |
| 
 | |
| class SCAN_STATUS(object):
 | |
|   """ Security scan status enum """
 | |
|   SCANNED = 'scanned'
 | |
|   FAILED = 'failed'
 | |
|   QUEUED = 'queued'
 | |
| 
 | |
| 
 | |
| def _call_security_api(relative_url, *args, **kwargs):
 | |
|   """ Issues an HTTP call to the sec API at the given relative URL. """
 | |
|   try:
 | |
|     response = secscan_api.call(relative_url, None, *args, **kwargs)
 | |
|   except requests.exceptions.Timeout:
 | |
|     raise DownstreamIssue(payload=dict(message='API call timed out'))
 | |
|   except requests.exceptions.ConnectionError:
 | |
|     raise DownstreamIssue(payload=dict(message='Could not connect to downstream service'))
 | |
| 
 | |
|   if response.status_code == 404:
 | |
|     raise NotFound()
 | |
| 
 | |
|   try:
 | |
|     response_data = json.loads(response.text)
 | |
|   except ValueError:
 | |
|     raise DownstreamIssue(payload=dict(message='Non-json response from downstream service'))
 | |
| 
 | |
|   if response.status_code / 100 != 2:
 | |
|     logger.warning('Got %s status code to call: %s', response.status_code, response.text)
 | |
|     raise DownstreamIssue(payload=dict(message=response_data['Message']))
 | |
| 
 | |
|   return response_data
 | |
| 
 | |
| 
 | |
| def _get_status(repo_image):
 | |
|   if repo_image.security_indexed_engine is not None and repo_image.security_indexed_engine >= 0:
 | |
|     return SCAN_STATUS.SCANNED if repo_image.security_indexed else SCAN_STATUS.FAILED
 | |
| 
 | |
|   return SCAN_STATUS.QUEUED
 | |
| 
 | |
| 
 | |
| @show_if(features.SECURITY_SCANNER)
 | |
| @resource('/v1/repository/<repopath:repository>/image/<imageid>/vulnerabilities')
 | |
| @path_param('repository', 'The full path of the repository. e.g. namespace/name')
 | |
| @path_param('imageid', 'The image ID')
 | |
| class RepositoryImageVulnerabilities(RepositoryParamResource):
 | |
|   """ Operations for managing the vulnerabilities in a repository image. """
 | |
| 
 | |
|   @require_repo_read
 | |
|   @nickname('getRepoImageVulnerabilities')
 | |
|   @parse_args
 | |
|   @query_param('minimumPriority', 'Minimum vulnerability priority', type=str,
 | |
|                default='Low')
 | |
|   def get(self, args, namespace, repository, imageid):
 | |
|     """ Fetches the vulnerabilities (if any) for a repository tag. """
 | |
|     repo_image = model.image.get_repo_image(namespace, repository, imageid)
 | |
|     if repo_image is None:
 | |
|       raise NotFound()
 | |
| 
 | |
|     if not repo_image.security_indexed:
 | |
|       logger.debug('Image %s under repository %s/%s not security indexed',
 | |
|                    repo_image.docker_image_id, namespace, repository)
 | |
|       return {
 | |
|         'status': _get_status(repo_image),
 | |
|       }
 | |
| 
 | |
|     layer_id = '%s.%s' % (repo_image.docker_image_id, repo_image.storage.uuid)
 | |
|     data = _call_security_api('layers/%s/vulnerabilities', layer_id,
 | |
|                               minimumPriority=args.minimumPriority)
 | |
| 
 | |
|     return {
 | |
|       'status': _get_status(repo_image),
 | |
|       'data': data,
 | |
|     }
 | |
| 
 | |
| 
 | |
| @show_if(features.SECURITY_SCANNER)
 | |
| @resource('/v1/repository/<repopath:repository>/image/<imageid>/packages')
 | |
| @path_param('repository', 'The full path of the repository. e.g. namespace/name')
 | |
| @path_param('imageid', 'The image ID')
 | |
| class RepositoryImagePackages(RepositoryParamResource):
 | |
|   """ Operations for listing the packages added/removed in an image. """
 | |
| 
 | |
|   @require_repo_read
 | |
|   @nickname('getRepoImagePackages')
 | |
|   def get(self, namespace, repository, imageid):
 | |
|     """ Fetches the packages added/removed in the given repo image. """
 | |
|     repo_image = model.image.get_repo_image(namespace, repository, imageid)
 | |
|     if repo_image is None:
 | |
|       raise NotFound()
 | |
| 
 | |
|     if not repo_image.security_indexed:
 | |
|       return {
 | |
|         'status': _get_status(repo_image),
 | |
|       }
 | |
| 
 | |
|     layer_id = '%s.%s' % (repo_image.docker_image_id, repo_image.storage.uuid)
 | |
|     data = _call_security_api('layers/%s/packages', layer_id)
 | |
| 
 | |
|     return {
 | |
|       'status': _get_status(repo_image),
 | |
|       'data': data,
 | |
|     }
 | |
| 
 |