Change secscan API endpoints to use new registry model interface
This commit is contained in:
parent
dcaa98a428
commit
46edebe6b0
6 changed files with 81 additions and 37 deletions
|
@ -1,3 +1,5 @@
|
|||
from enum import Enum, unique
|
||||
|
||||
from data.registry_model.datatype import datatype, requiresinput
|
||||
|
||||
class RepositoryReference(datatype('Repository', [])):
|
||||
|
@ -107,3 +109,11 @@ class LegacyImage(datatype('LegacyImage', ['docker_image_id', 'created', 'commen
|
|||
return []
|
||||
|
||||
return [Tag.for_repository_tag(tag) for tag in tags]
|
||||
|
||||
|
||||
@unique
|
||||
class SecurityScanStatus(Enum):
|
||||
""" Security scan status enum """
|
||||
SCANNED = 'scanned'
|
||||
FAILED = 'failed'
|
||||
QUEUED = 'queued'
|
||||
|
|
|
@ -118,3 +118,7 @@ class RegistryDataInterface(object):
|
|||
@abstractmethod
|
||||
def get_legacy_images_owned_by_tag(self, tag):
|
||||
""" Returns all legacy images *solely owned and used* by the given tag. """
|
||||
|
||||
@abstractmethod
|
||||
def get_security_status(self, manifest_or_legacy_image):
|
||||
""" Returns the security status for the given manifest or legacy image or None if none. """
|
||||
|
|
|
@ -5,7 +5,8 @@ from collections import defaultdict
|
|||
from data import database
|
||||
from data import model
|
||||
from data.registry_model.interface import RegistryDataInterface
|
||||
from data.registry_model.datatypes import Tag, RepositoryReference, Manifest, LegacyImage, Label
|
||||
from data.registry_model.datatypes import (Tag, RepositoryReference, Manifest, LegacyImage, Label,
|
||||
SecurityScanStatus)
|
||||
|
||||
|
||||
class PreOCIModel(RegistryDataInterface):
|
||||
|
@ -267,5 +268,26 @@ class PreOCIModel(RegistryDataInterface):
|
|||
|
||||
return [LegacyImage.for_image(image, images_map=images_map) for image in images]
|
||||
|
||||
def get_security_status(self, manifest_or_legacy_image):
|
||||
""" Returns the security status for the given manifest or legacy image or None if none. """
|
||||
image = None
|
||||
|
||||
if isinstance(manifest_or_legacy_image, Manifest):
|
||||
try:
|
||||
tag_manifest = database.TagManifest.get(id=manifest_or_legacy_image._db_id)
|
||||
image = tag_manifest.tag.image
|
||||
except database.TagManifest.DoesNotExist:
|
||||
return None
|
||||
else:
|
||||
try:
|
||||
image = database.Image.get(id=manifest_or_legacy_image._db_id)
|
||||
except database.Image.DoesNotExist:
|
||||
return None
|
||||
|
||||
if image.security_indexed_engine is not None and image.security_indexed_engine >= 0:
|
||||
return SecurityScanStatus.SCANNED if image.security_indexed else SecurityScanStatus.FAILED
|
||||
|
||||
return SecurityScanStatus.QUEUED
|
||||
|
||||
|
||||
pre_oci_model = PreOCIModel()
|
||||
|
|
|
@ -293,3 +293,12 @@ def test_get_legacy_images_owned_by_tag(repo_namespace, repo_name, expected_non_
|
|||
non_empty.add(tag.name)
|
||||
|
||||
assert non_empty == set(expected_non_empty)
|
||||
|
||||
|
||||
def test_get_security_status(pre_oci_model):
|
||||
repository_ref = pre_oci_model.lookup_repository('devtable', 'simple')
|
||||
tags = pre_oci_model.list_repository_tags(repository_ref, include_legacy_images=True)
|
||||
assert len(tags)
|
||||
|
||||
for tag in tags:
|
||||
assert pre_oci_model.get_security_status(tag.legacy_image)
|
||||
|
|
Reference in a new issue