71 lines
		
	
	
	
		
			2.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			71 lines
		
	
	
	
		
			2.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| """ List and manage repository vulnerabilities and other security information. """
 | |
| 
 | |
| import logging
 | |
| import features
 | |
| 
 | |
| from app import secscan_api
 | |
| from data import model
 | |
| from endpoints.api import (require_repo_read, path_param,
 | |
|                            RepositoryParamResource, resource, nickname, show_if, parse_args,
 | |
|                            query_param, truthy_bool)
 | |
| from endpoints.exception import NotFound, DownstreamIssue
 | |
| from util.secscan.api import APIRequestFailure
 | |
| 
 | |
| 
 | |
| logger = logging.getLogger(__name__)
 | |
| 
 | |
| 
 | |
| class SCAN_STATUS(object):
 | |
|   """ Security scan status enum """
 | |
|   SCANNED = 'scanned'
 | |
|   FAILED = 'failed'
 | |
|   QUEUED = 'queued'
 | |
| 
 | |
| 
 | |
| def _get_status(repo_image):
 | |
|   if repo_image.security_indexed_engine is not None and repo_image.security_indexed_engine >= 0:
 | |
|     return SCAN_STATUS.SCANNED if repo_image.security_indexed else SCAN_STATUS.FAILED
 | |
| 
 | |
|   return SCAN_STATUS.QUEUED
 | |
| 
 | |
| 
 | |
| @show_if(features.SECURITY_SCANNER)
 | |
| @resource('/v1/repository/<apirepopath:repository>/image/<imageid>/security')
 | |
| @path_param('repository', 'The full path of the repository. e.g. namespace/name')
 | |
| @path_param('imageid', 'The image ID')
 | |
| class RepositoryImageSecurity(RepositoryParamResource):
 | |
|   """ Operations for managing the vulnerabilities in a repository image. """
 | |
| 
 | |
|   @require_repo_read
 | |
|   @nickname('getRepoImageSecurity')
 | |
|   @parse_args()
 | |
|   @query_param('vulnerabilities', 'Include vulnerabilities informations', type=truthy_bool,
 | |
|                default=False)
 | |
|   def get(self, namespace, repository, imageid, parsed_args):
 | |
|     """ Fetches the features and vulnerabilities (if any) for a repository image. """
 | |
|     repo_image = model.image.get_repo_image(namespace, repository, imageid)
 | |
|     if repo_image is None:
 | |
|       raise NotFound()
 | |
| 
 | |
|     if not repo_image.security_indexed:
 | |
|       logger.debug('Image %s under repository %s/%s not security indexed',
 | |
|                    repo_image.docker_image_id, namespace, repository)
 | |
|       return {
 | |
|         'status': _get_status(repo_image),
 | |
|       }
 | |
| 
 | |
|     try:
 | |
|       if parsed_args.vulnerabilities:
 | |
|         data = secscan_api.get_layer_data(repo_image, include_vulnerabilities=True)
 | |
|       else:
 | |
|         data = secscan_api.get_layer_data(repo_image, include_features=True)
 | |
|     except APIRequestFailure as arf:
 | |
|       raise DownstreamIssue({'message': arf.message})
 | |
| 
 | |
|     if data is None:
 | |
|       raise NotFound()
 | |
| 
 | |
|     return {
 | |
|       'status': _get_status(repo_image),
 | |
|       'data': data,
 | |
|     }
 |