105 lines
		
	
	
	
		
			3.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			105 lines
		
	
	
	
		
			3.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| """ List and manage repository vulnerabilities and other security information. """
 | |
| 
 | |
| import logging
 | |
| import features
 | |
| 
 | |
| from app import secscan_api
 | |
| from data import model
 | |
| from endpoints.api import (require_repo_read, path_param,
 | |
|                            RepositoryParamResource, resource, nickname, show_if, parse_args,
 | |
|                            query_param, truthy_bool, disallow_for_app_repositories)
 | |
| from endpoints.exception import NotFound, DownstreamIssue
 | |
| from endpoints.api.manifest import MANIFEST_DIGEST_ROUTE
 | |
| from util.secscan.api import APIRequestFailure
 | |
| 
 | |
| 
 | |
| logger = logging.getLogger(__name__)
 | |
| 
 | |
| 
 | |
| class SCAN_STATUS(object):
 | |
|   """ Security scan status enum """
 | |
|   SCANNED = 'scanned'
 | |
|   FAILED = 'failed'
 | |
|   QUEUED = 'queued'
 | |
| 
 | |
| 
 | |
| def _get_status(repo_image):
 | |
|   """ Returns the SCAN_STATUS value for the given image. """
 | |
|   if repo_image.security_indexed_engine is not None and repo_image.security_indexed_engine >= 0:
 | |
|     return SCAN_STATUS.SCANNED if repo_image.security_indexed else SCAN_STATUS.FAILED
 | |
| 
 | |
|   return SCAN_STATUS.QUEUED
 | |
| 
 | |
| def _security_status_for_image(namespace, repository, repo_image, include_vulnerabilities=True):
 | |
|   """ Returns a dict representing the result of a call to the security status API for the given
 | |
|       image.
 | |
|   """
 | |
|   if not repo_image.security_indexed:
 | |
|     logger.debug('Image %s under repository %s/%s not security indexed',
 | |
|                  repo_image.docker_image_id, namespace, repository)
 | |
|     return {
 | |
|       'status': _get_status(repo_image),
 | |
|     }
 | |
| 
 | |
|   try:
 | |
|     if include_vulnerabilities:
 | |
|       data = secscan_api.get_layer_data(repo_image, include_vulnerabilities=True)
 | |
|     else:
 | |
|       data = secscan_api.get_layer_data(repo_image, include_features=True)
 | |
|   except APIRequestFailure as arf:
 | |
|     raise DownstreamIssue({'message': arf.message})
 | |
| 
 | |
|   if data is None:
 | |
|     raise NotFound()
 | |
| 
 | |
|   return {
 | |
|     'status': _get_status(repo_image),
 | |
|     'data': data,
 | |
|   }
 | |
| 
 | |
| 
 | |
| @show_if(features.SECURITY_SCANNER)
 | |
| @resource('/v1/repository/<apirepopath:repository>/image/<imageid>/security')
 | |
| @path_param('repository', 'The full path of the repository. e.g. namespace/name')
 | |
| @path_param('imageid', 'The image ID')
 | |
| class RepositoryImageSecurity(RepositoryParamResource):
 | |
|   """ Operations for managing the vulnerabilities in a repository image. """
 | |
| 
 | |
|   @require_repo_read
 | |
|   @nickname('getRepoImageSecurity')
 | |
|   @disallow_for_app_repositories
 | |
|   @parse_args()
 | |
|   @query_param('vulnerabilities', 'Include vulnerabilities informations', type=truthy_bool,
 | |
|                default=False)
 | |
|   def get(self, namespace, repository, imageid, parsed_args):
 | |
|     """ Fetches the features and vulnerabilities (if any) for a repository image. """
 | |
|     repo_image = model.image.get_repo_image(namespace, repository, imageid)
 | |
|     if repo_image is None:
 | |
|       raise NotFound()
 | |
| 
 | |
|     return _security_status_for_image(namespace, repository, repo_image,
 | |
|                                       parsed_args.vulnerabilities)
 | |
| 
 | |
| @show_if(features.SECURITY_SCANNER)
 | |
| @resource(MANIFEST_DIGEST_ROUTE + '/security')
 | |
| @path_param('repository', 'The full path of the repository. e.g. namespace/name')
 | |
| @path_param('manifestref', 'The digest of the manifest')
 | |
| class RepositoryManifestSecurity(RepositoryParamResource):
 | |
|   """ Operations for managing the vulnerabilities in a repository manifest. """
 | |
| 
 | |
|   @require_repo_read
 | |
|   @nickname('getRepoManifestSecurity')
 | |
|   @disallow_for_app_repositories
 | |
|   @parse_args()
 | |
|   @query_param('vulnerabilities', 'Include vulnerabilities informations', type=truthy_bool,
 | |
|                default=False)
 | |
|   def get(self, namespace, repository, manifestref, parsed_args):
 | |
|     try:
 | |
|       tag_manifest = model.tag.load_manifest_by_digest(namespace, repository, manifestref)
 | |
|     except model.DataModelException:
 | |
|       raise NotFound()
 | |
| 
 | |
|     repo_image = tag_manifest.tag.image
 | |
| 
 | |
|     return _security_status_for_image(namespace, repository, repo_image,
 | |
|                                       parsed_args.vulnerabilities)
 |