Merge pull request #3297 from quay/joseph.schorr/QUAY-1241/secscan-v22-fix

Fix loading of security scan results for OCI-style manifests
This commit is contained in:
Joseph Schorr 2018-12-04 13:04:48 -05:00 committed by GitHub
commit cbe151c21d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -8,9 +8,9 @@ from urlparse import urljoin
import requests
from data import model
from data.database import CloseForLongOperation, TagManifest, Image
from data.database import CloseForLongOperation, TagManifest, Image, Manifest, ManifestLegacyImage
from data.model.storage import get_storage_locations
from data.registry_model.datatypes import Manifest, LegacyImage
from data.registry_model.datatypes import Manifest as ManifestDataType, LegacyImage
from util.abchelpers import nooper
from util.failover import failover, FailoverException
from util.secscan.validator import SecurityConfigValidator
@ -63,8 +63,15 @@ _API_METHOD_PING = 'metrics'
def compute_layer_id(layer):
""" Returns the ID for the layer in the security scanner. """
# NOTE: this is temporary until we switch to Clair V3.
if isinstance(layer, Manifest):
if isinstance(layer, ManifestDataType):
if layer._is_tag_manifest:
layer = TagManifest.get(id=layer._db_id).tag.image
else:
manifest = Manifest.get(id=layer._db_id)
try:
layer = ManifestLegacyImage.get(manifest=manifest).image
except ManifestLegacyImage.DoesNotExist:
return None
elif isinstance(layer, LegacyImage):
layer = Image.get(id=layer._db_id)
@ -211,12 +218,16 @@ class ImplementedSecurityScannerAPI(SecurityScannerAPIInterface):
""" Create the request body to submit the given layer for analysis. If the layer's URL cannot
be found, returns None.
"""
layer_id = compute_layer_id(layer)
if layer_id is None:
return None
url, auth_header = self._get_image_url_and_auth(layer)
if url is None:
return None
layer_request = {
'Name': compute_layer_id(layer),
'Name': layer_id,
'Path': url,
'Format': 'Docker',
}
@ -265,6 +276,9 @@ class ImplementedSecurityScannerAPI(SecurityScannerAPIInterface):
its database.
"""
layer_id = compute_layer_id(layer)
if layer_id is None:
return None
try:
self._call('DELETE', _API_METHOD_DELETE_LAYER % layer_id)
return True
@ -381,6 +395,9 @@ class ImplementedSecurityScannerAPI(SecurityScannerAPIInterface):
def get_layer_data(self, layer, include_features=False, include_vulnerabilities=False):
""" Returns the layer data for the specified layer. On error, returns None. """
layer_id = compute_layer_id(layer)
if layer_id is None:
return None
return self._get_layer_data(layer_id, include_features, include_vulnerabilities)
def _get_layer_data(self, layer_id, include_features=False, include_vulnerabilities=False):