Merge pull request #3224 from quay/joseph.schorr/QUAY-1030/interfacing-part6
Continue work on interfacing registry data access
This commit is contained in:
commit
f209c7e8ea
6 changed files with 58 additions and 313 deletions
|
@ -8,7 +8,6 @@ from data import model
|
|||
from data.registry_model import registry_model
|
||||
from data.registry_model.datatypes import RepositoryReference
|
||||
from data.database import UseThenDisconnect
|
||||
from util.imagetree import ImageTree
|
||||
from util.morecollections import AttrDict
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
|
|
@ -22,8 +22,7 @@ def image_dict(image, with_history=False, with_tags=False):
|
|||
image_data['tags'] = [tag.name for tag in image.tags]
|
||||
|
||||
if with_history:
|
||||
image_data['history'] = [image_dict(parent, with_history, with_tags)
|
||||
for parent in image.parents]
|
||||
image_data['history'] = [image_dict(parent) for parent in image.parents]
|
||||
|
||||
# Calculate the ancestors string, with the DBID's replaced with the docker IDs.
|
||||
parent_docker_ids = [parent_image.docker_image_id for parent_image in image.parents]
|
||||
|
|
|
@ -1,22 +1,24 @@
|
|||
import pytest
|
||||
|
||||
from data import model
|
||||
from data.registry_model import registry_model
|
||||
from endpoints.api.manifest import RepositoryManifest
|
||||
from endpoints.api.test.shared import conduct_api_call
|
||||
from endpoints.test.shared import client_with_identity
|
||||
|
||||
from test.fixtures import *
|
||||
|
||||
def test_repository_manifest(client):
|
||||
with client_with_identity('devtable', client) as cl:
|
||||
tags = model.tag.list_repository_tags('devtable', 'simple')
|
||||
digests = model.tag.get_tag_manifest_digests(tags)
|
||||
repo_ref = registry_model.lookup_repository('devtable', 'simple')
|
||||
tags = registry_model.list_repository_tags(repo_ref)
|
||||
for tag in tags:
|
||||
manifest = digests[tag.id]
|
||||
manifest_digest = tag.manifest_digest
|
||||
if manifest_digest is None:
|
||||
continue
|
||||
|
||||
params = {
|
||||
'repository': 'devtable/simple',
|
||||
'manifestref': manifest,
|
||||
'manifestref': manifest_digest,
|
||||
}
|
||||
result = conduct_api_call(cl, RepositoryManifest, 'GET', params, None, 200).json
|
||||
assert result['digest'] == manifest
|
||||
assert result['digest'] == manifest_digest
|
||||
assert result['manifest_data']
|
||||
assert result['image']
|
||||
|
|
|
@ -27,6 +27,7 @@ from app import app, config_provider, all_queues, dockerfile_build_queue, notifi
|
|||
from buildtrigger.basehandler import BuildTriggerHandler
|
||||
from initdb import setup_database_for_testing, finished_database_for_testing
|
||||
from data import database, model, appr_model
|
||||
from data.registry_model import registry_model
|
||||
from data.appr_model.models import NEW_MODELS
|
||||
from data.database import RepositoryActionCount, Repository as RepositoryTable
|
||||
from test.helpers import assert_action_logged
|
||||
|
@ -2148,8 +2149,9 @@ class TestDeleteRepository(ApiTestCase):
|
|||
self.getResponse(Repository, params=dict(repository=self.COMPLEX_REPO))
|
||||
|
||||
# Make sure the repository has some images and tags.
|
||||
self.assertTrue(len(list(model.image.get_repository_images(ADMIN_ACCESS_USER, 'complex'))) > 0)
|
||||
self.assertTrue(len(list(model.tag.list_repository_tags(ADMIN_ACCESS_USER, 'complex'))) > 0)
|
||||
repo_ref = registry_model.lookup_repository(ADMIN_ACCESS_USER, 'complex')
|
||||
self.assertTrue(len(list(registry_model.get_legacy_images(repo_ref))) > 0)
|
||||
self.assertTrue(len(list(registry_model.list_repository_tags(repo_ref))) > 0)
|
||||
|
||||
# Add some data for the repository, in addition to is already existing images and tags.
|
||||
repository = model.repository.get_repository(ADMIN_ACCESS_USER, 'complex')
|
||||
|
@ -2196,16 +2198,17 @@ class TestDeleteRepository(ApiTestCase):
|
|||
RepositoryActionCount.create(
|
||||
repository=repository, date=datetime.datetime.now() - datetime.timedelta(days=5), count=6)
|
||||
|
||||
repo_ref = registry_model.lookup_repository(ADMIN_ACCESS_USER, 'complex')
|
||||
tag = registry_model.get_repo_tag(repo_ref, 'prod')
|
||||
manifest = registry_model.get_manifest_for_tag(tag)
|
||||
|
||||
# Create some labels.
|
||||
pre_delete_label_count = database.Label.select().count()
|
||||
registry_model.create_manifest_label(manifest, 'foo', 'bar', 'manifest')
|
||||
registry_model.create_manifest_label(manifest, 'foo', 'baz', 'manifest')
|
||||
registry_model.create_manifest_label(manifest, 'something', '{}', 'api',
|
||||
media_type_name='application/json')
|
||||
|
||||
tag_manifest = model.tag.load_tag_manifest(ADMIN_ACCESS_USER, 'complex', 'prod')
|
||||
model.label.create_manifest_label(tag_manifest, 'foo', 'bar', 'manifest')
|
||||
model.label.create_manifest_label(tag_manifest, 'foo', 'baz', 'manifest')
|
||||
model.label.create_manifest_label(tag_manifest, 'something', '{}', 'api',
|
||||
media_type_name='application/json')
|
||||
|
||||
model.label.create_manifest_label(tag_manifest, 'something', '{"some": "json"}', 'manifest')
|
||||
registry_model.create_manifest_label(manifest, 'something', '{"some": "json"}', 'manifest')
|
||||
|
||||
# Delete the repository.
|
||||
with check_transitive_modifications():
|
||||
|
@ -2214,10 +2217,6 @@ class TestDeleteRepository(ApiTestCase):
|
|||
# Verify the repo was deleted.
|
||||
self.getResponse(Repository, params=dict(repository=self.COMPLEX_REPO), expected_code=404)
|
||||
|
||||
# Verify the labels are gone.
|
||||
post_delete_label_count = database.Label.select().count()
|
||||
self.assertEquals(post_delete_label_count, pre_delete_label_count)
|
||||
|
||||
|
||||
class TestGetRepository(ApiTestCase):
|
||||
PUBLIC_REPO = PUBLIC_USER + '/publicrepo'
|
||||
|
@ -2908,23 +2907,19 @@ class TestListAndDeleteTag(ApiTestCase):
|
|||
def test_listtagpagination(self):
|
||||
self.login(ADMIN_ACCESS_USER)
|
||||
|
||||
latest_image = model.tag.get_tag_image(ADMIN_ACCESS_USER, "complex", "prod")
|
||||
repo_ref = registry_model.lookup_repository(ADMIN_ACCESS_USER, "simple")
|
||||
latest_tag = registry_model.get_repo_tag(repo_ref, 'latest', include_legacy_image=True)
|
||||
|
||||
# Create 10 tags in an empty repo.
|
||||
user = model.user.get_user_or_org(ADMIN_ACCESS_USER)
|
||||
repo = model.repository.create_repository(ADMIN_ACCESS_USER, "empty", user)
|
||||
|
||||
image = model.image.find_create_or_link_image(latest_image.docker_image_id, repo,
|
||||
ADMIN_ACCESS_USER, {}, ['local_us'])
|
||||
remaining_tags = set()
|
||||
for i in xrange(1, 11):
|
||||
# Create 8 tags in the simple repo.
|
||||
remaining_tags = {'latest', 'prod'}
|
||||
for i in xrange(1, 9):
|
||||
tag_name = "tag" + str(i)
|
||||
remaining_tags.add(tag_name)
|
||||
model.tag.create_or_update_tag(ADMIN_ACCESS_USER, "empty", tag_name, image.docker_image_id)
|
||||
registry_model.retarget_tag(repo_ref, tag_name, latest_tag.legacy_image)
|
||||
|
||||
# Make sure we can iterate over all of them.
|
||||
json = self.getJsonResponse(ListRepositoryTags, params=dict(
|
||||
repository=ADMIN_ACCESS_USER + '/empty', page=1, limit=5))
|
||||
repository=ADMIN_ACCESS_USER + '/simple', page=1, limit=5))
|
||||
self.assertEquals(1, json['page'])
|
||||
self.assertEquals(5, len(json['tags']))
|
||||
self.assertTrue(json['has_additional'])
|
||||
|
@ -2934,7 +2929,7 @@ class TestListAndDeleteTag(ApiTestCase):
|
|||
self.assertEquals(5, len(remaining_tags))
|
||||
|
||||
json = self.getJsonResponse(ListRepositoryTags, params=dict(
|
||||
repository=ADMIN_ACCESS_USER + '/empty', page=2, limit=5))
|
||||
repository=ADMIN_ACCESS_USER + '/simple', page=2, limit=5))
|
||||
|
||||
self.assertEquals(2, json['page'])
|
||||
self.assertEquals(5, len(json['tags']))
|
||||
|
@ -2945,7 +2940,7 @@ class TestListAndDeleteTag(ApiTestCase):
|
|||
self.assertEquals(0, len(remaining_tags))
|
||||
|
||||
json = self.getJsonResponse(ListRepositoryTags, params=dict(
|
||||
repository=ADMIN_ACCESS_USER + '/empty', page=3, limit=5))
|
||||
repository=ADMIN_ACCESS_USER + '/simple', page=3, limit=5))
|
||||
|
||||
self.assertEquals(3, json['page'])
|
||||
self.assertEquals(0, len(json['tags']))
|
||||
|
@ -4136,77 +4131,6 @@ class TestSuperUserConfig(ApiTestCase):
|
|||
mockldap.stop()
|
||||
|
||||
|
||||
class TestRepositoryImageSecurity(ApiTestCase):
|
||||
def test_get_vulnerabilities(self):
|
||||
self.login(ADMIN_ACCESS_USER)
|
||||
|
||||
tag = model.tag.get_active_tag(ADMIN_ACCESS_USER, 'simple', 'latest')
|
||||
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, 'simple', 'latest')
|
||||
|
||||
tag_manifest = database.TagManifest.get(tag=tag)
|
||||
|
||||
# Grab the security info for the tag. It should be queued.
|
||||
manifest_response = self.getJsonResponse(RepositoryManifestSecurity, params=dict(
|
||||
repository=ADMIN_ACCESS_USER + '/simple', manifestref=tag_manifest.digest,
|
||||
vulnerabilities='true'))
|
||||
|
||||
image_response = self.getJsonResponse(
|
||||
RepositoryImageSecurity, params=dict(repository=ADMIN_ACCESS_USER + '/simple',
|
||||
imageid=layer.docker_image_id, vulnerabilities='true'))
|
||||
|
||||
self.assertEquals(manifest_response, image_response)
|
||||
self.assertEquals('queued', image_response['status'])
|
||||
|
||||
# Mark the layer as indexed.
|
||||
layer.security_indexed = True
|
||||
layer.security_indexed_engine = app.config['SECURITY_SCANNER_ENGINE_VERSION_TARGET']
|
||||
layer.save()
|
||||
|
||||
# Grab the security info again.
|
||||
with fake_security_scanner() as security_scanner:
|
||||
security_scanner.add_layer(security_scanner.layer_id(layer))
|
||||
|
||||
manifest_response = self.getJsonResponse(RepositoryManifestSecurity, params=dict(
|
||||
repository=ADMIN_ACCESS_USER + '/simple', manifestref=tag_manifest.digest,
|
||||
vulnerabilities='true'))
|
||||
|
||||
image_response = self.getJsonResponse(RepositoryImageSecurity, params=dict(
|
||||
repository=ADMIN_ACCESS_USER + '/simple', imageid=layer.docker_image_id,
|
||||
vulnerabilities='true'))
|
||||
|
||||
self.assertEquals(manifest_response, image_response)
|
||||
self.assertEquals('scanned', image_response['status'])
|
||||
self.assertEquals(1, image_response['data']['Layer']['IndexedByVersion'])
|
||||
|
||||
def test_get_vulnerabilities_read_failover(self):
|
||||
self.login(ADMIN_ACCESS_USER)
|
||||
|
||||
# Get a layer and mark it as indexed.
|
||||
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, 'simple', 'latest')
|
||||
layer.security_indexed = True
|
||||
layer.security_indexed_engine = app.config['SECURITY_SCANNER_ENGINE_VERSION_TARGET']
|
||||
layer.save()
|
||||
|
||||
with fake_security_scanner(hostname='failoverscanner') as security_scanner:
|
||||
# Query the wrong security scanner URL without failover.
|
||||
self.getResponse(RepositoryImageSecurity, params=dict(
|
||||
repository=ADMIN_ACCESS_USER + '/simple', imageid=layer.docker_image_id,
|
||||
vulnerabilities='true'), expected_code=520)
|
||||
|
||||
# Set the failover URL in the global config.
|
||||
with AppConfigChange({
|
||||
'SECURITY_SCANNER_READONLY_FAILOVER_ENDPOINTS': ['https://failoverscanner']
|
||||
}):
|
||||
# Configure the API to return 200 for this layer.
|
||||
layer_id = security_scanner.layer_id(layer)
|
||||
security_scanner.set_ok_layer_id(layer_id)
|
||||
|
||||
# Call the API and succeed on failover.
|
||||
self.getResponse(RepositoryImageSecurity, params=dict(
|
||||
repository=ADMIN_ACCESS_USER + '/simple', imageid=layer.docker_image_id,
|
||||
vulnerabilities='true'), expected_code=200)
|
||||
|
||||
|
||||
class TestSuperUserCustomCertificates(ApiTestCase):
|
||||
def test_custom_certificates(self):
|
||||
self.login(ADMIN_ACCESS_USER)
|
||||
|
@ -4466,46 +4390,46 @@ class TestRepositoryManifestLabels(ApiTestCase):
|
|||
def test_basic_labels(self):
|
||||
self.login(ADMIN_ACCESS_USER)
|
||||
|
||||
# Find the manifest digest for the prod tag in the complex repo.
|
||||
tag_manifest = model.tag.load_tag_manifest(ADMIN_ACCESS_USER, 'complex', 'prod')
|
||||
repo_ref = registry_model.lookup_repository(ADMIN_ACCESS_USER, 'complex')
|
||||
tag = registry_model.get_repo_tag(repo_ref, 'prod')
|
||||
repository = ADMIN_ACCESS_USER + '/complex'
|
||||
|
||||
# Check the existing labels on the complex repo, which should be empty
|
||||
json = self.getJsonResponse(
|
||||
RepositoryManifestLabels,
|
||||
params=dict(repository=repository, manifestref=tag_manifest.digest))
|
||||
params=dict(repository=repository, manifestref=tag.manifest_digest))
|
||||
|
||||
self.assertEquals(0, len(json['labels']))
|
||||
|
||||
self.postJsonResponse(RepositoryManifestLabels, params=dict(repository=repository,
|
||||
manifestref=tag_manifest.digest),
|
||||
manifestref=tag.manifest_digest),
|
||||
data=dict(key='bad_label', value='world',
|
||||
media_type='text/plain'), expected_code=400)
|
||||
|
||||
self.postJsonResponse(RepositoryManifestLabels, params=dict(repository=repository,
|
||||
manifestref=tag_manifest.digest),
|
||||
manifestref=tag.manifest_digest),
|
||||
data=dict(key='hello', value='world',
|
||||
media_type='bad_media_type'), expected_code=400)
|
||||
|
||||
# Add some labels to the manifest.
|
||||
with assert_action_logged('manifest_label_add'):
|
||||
label1 = self.postJsonResponse(RepositoryManifestLabels, params=dict(
|
||||
repository=repository, manifestref=tag_manifest.digest), data=dict(
|
||||
repository=repository, manifestref=tag.manifest_digest), data=dict(
|
||||
key='hello', value='world', media_type='text/plain'), expected_code=201)
|
||||
|
||||
with assert_action_logged('manifest_label_add'):
|
||||
label2 = self.postJsonResponse(RepositoryManifestLabels, params=dict(
|
||||
repository=repository, manifestref=tag_manifest.digest), data=dict(
|
||||
repository=repository, manifestref=tag.manifest_digest), data=dict(
|
||||
key='hi', value='there', media_type='text/plain'), expected_code=201)
|
||||
|
||||
with assert_action_logged('manifest_label_add'):
|
||||
label3 = self.postJsonResponse(RepositoryManifestLabels, params=dict(
|
||||
repository=repository, manifestref=tag_manifest.digest), data=dict(
|
||||
repository=repository, manifestref=tag.manifest_digest), data=dict(
|
||||
key='hello', value='someone', media_type='application/json'), expected_code=201)
|
||||
|
||||
# Ensure we have *3* labels
|
||||
json = self.getJsonResponse(RepositoryManifestLabels, params=dict(
|
||||
repository=repository, manifestref=tag_manifest.digest))
|
||||
repository=repository, manifestref=tag.manifest_digest))
|
||||
|
||||
self.assertEquals(3, len(json['labels']))
|
||||
|
||||
|
@ -4520,73 +4444,75 @@ class TestRepositoryManifestLabels(ApiTestCase):
|
|||
# Ensure we can retrieve each of the labels.
|
||||
for label in json['labels']:
|
||||
label_json = self.getJsonResponse(ManageRepositoryManifestLabel, params=dict(
|
||||
repository=repository, manifestref=tag_manifest.digest, labelid=label['id']))
|
||||
repository=repository, manifestref=tag.manifest_digest, labelid=label['id']))
|
||||
self.assertEquals(label['id'], label_json['id'])
|
||||
|
||||
# Delete a label.
|
||||
with assert_action_logged('manifest_label_delete'):
|
||||
self.deleteEmptyResponse(ManageRepositoryManifestLabel, params=dict(
|
||||
repository=repository, manifestref=tag_manifest.digest, labelid=label1['label']['id']))
|
||||
repository=repository, manifestref=tag.manifest_digest, labelid=label1['label']['id']))
|
||||
|
||||
# Ensure the label is gone.
|
||||
json = self.getJsonResponse(RepositoryManifestLabels, params=dict(
|
||||
repository=repository, manifestref=tag_manifest.digest))
|
||||
repository=repository, manifestref=tag.manifest_digest))
|
||||
|
||||
self.assertEquals(2, len(json['labels']))
|
||||
|
||||
# Check filtering.
|
||||
json = self.getJsonResponse(RepositoryManifestLabels, params=dict(
|
||||
repository=repository, manifestref=tag_manifest.digest, filter='hello'))
|
||||
repository=repository, manifestref=tag.manifest_digest, filter='hello'))
|
||||
|
||||
self.assertEquals(1, len(json['labels']))
|
||||
|
||||
def test_prefixed_labels(self):
|
||||
self.login(ADMIN_ACCESS_USER)
|
||||
|
||||
# Find the manifest digest for the prod tag in the complex repo.
|
||||
tag_manifest = model.tag.load_tag_manifest(ADMIN_ACCESS_USER, 'complex', 'prod')
|
||||
repo_ref = registry_model.lookup_repository(ADMIN_ACCESS_USER, 'complex')
|
||||
tag = registry_model.get_repo_tag(repo_ref, 'prod')
|
||||
repository = ADMIN_ACCESS_USER + '/complex'
|
||||
|
||||
self.postJsonResponse(RepositoryManifestLabels, params=dict(repository=repository,
|
||||
manifestref=tag_manifest.digest),
|
||||
manifestref=tag.manifest_digest),
|
||||
data=dict(key='com.dockers.whatever', value='pants',
|
||||
media_type='text/plain'), expected_code=201)
|
||||
|
||||
self.postJsonResponse(RepositoryManifestLabels, params=dict(repository=repository,
|
||||
manifestref=tag_manifest.digest),
|
||||
manifestref=tag.manifest_digest),
|
||||
data=dict(key='my.cool.prefix.for.my.label', value='value',
|
||||
media_type='text/plain'), expected_code=201)
|
||||
|
||||
def test_add_invalid_media_type(self):
|
||||
self.login(ADMIN_ACCESS_USER)
|
||||
|
||||
tag_manifest = model.tag.load_tag_manifest(ADMIN_ACCESS_USER, 'complex', 'prod')
|
||||
repo_ref = registry_model.lookup_repository(ADMIN_ACCESS_USER, 'complex')
|
||||
tag = registry_model.get_repo_tag(repo_ref, 'prod')
|
||||
repository = ADMIN_ACCESS_USER + '/complex'
|
||||
|
||||
self.postResponse(RepositoryManifestLabels, params=dict(repository=repository,
|
||||
manifestref=tag_manifest.digest),
|
||||
manifestref=tag.manifest_digest),
|
||||
data=dict(key='hello', value='world', media_type='some/invalid'),
|
||||
expected_code=400)
|
||||
|
||||
def test_add_invalid_key(self):
|
||||
self.login(ADMIN_ACCESS_USER)
|
||||
|
||||
tag_manifest = model.tag.load_tag_manifest(ADMIN_ACCESS_USER, 'complex', 'prod')
|
||||
repo_ref = registry_model.lookup_repository(ADMIN_ACCESS_USER, 'complex')
|
||||
tag = registry_model.get_repo_tag(repo_ref, 'prod')
|
||||
repository = ADMIN_ACCESS_USER + '/complex'
|
||||
|
||||
# Try to add an empty label key.
|
||||
self.postResponse(RepositoryManifestLabels, params=dict(repository=repository,
|
||||
manifestref=tag_manifest.digest),
|
||||
manifestref=tag.manifest_digest),
|
||||
data=dict(key='', value='world'), expected_code=400)
|
||||
|
||||
# Try to add an invalid label key.
|
||||
self.postResponse(RepositoryManifestLabels, params=dict(repository=repository,
|
||||
manifestref=tag_manifest.digest),
|
||||
manifestref=tag.manifest_digest),
|
||||
data=dict(key='invalid___key', value='world'), expected_code=400)
|
||||
|
||||
# Try to add a label key in a reserved namespace.
|
||||
self.postResponse(RepositoryManifestLabels, params=dict(repository=repository,
|
||||
manifestref=tag_manifest.digest),
|
||||
manifestref=tag.manifest_digest),
|
||||
data=dict(key='io.docker.whatever', value='world'), expected_code=400)
|
||||
|
||||
|
||||
|
|
|
@ -1,99 +0,0 @@
|
|||
from collections import defaultdict
|
||||
|
||||
|
||||
class ImageTreeNode(object):
|
||||
""" A node in the image tree. """
|
||||
def __init__(self, image, child_map):
|
||||
self.image = image
|
||||
self.parent = None
|
||||
self.tags = []
|
||||
|
||||
self._child_map = child_map
|
||||
|
||||
@property
|
||||
def children(self):
|
||||
return self._child_map[self.image.id]
|
||||
|
||||
def add_tag(self, tag):
|
||||
self.tags.append(tag)
|
||||
|
||||
|
||||
class ImageTree(object):
|
||||
""" In-memory tree for easy traversal and lookup of images in a repository. """
|
||||
|
||||
def __init__(self, all_images, all_tags, base_filter=None):
|
||||
self._image_map = {}
|
||||
self._child_map = defaultdict(list)
|
||||
|
||||
self._build(all_images, all_tags, base_filter)
|
||||
|
||||
def _build(self, all_images, all_tags, base_filter=None):
|
||||
# Build nodes for each of the images.
|
||||
for image in all_images:
|
||||
ancestors = image.ancestor_id_list()
|
||||
|
||||
# Filter any unneeded images.
|
||||
if base_filter is not None:
|
||||
if image.id != base_filter and not base_filter in ancestors:
|
||||
continue
|
||||
|
||||
# Create the node for the image.
|
||||
image_node = ImageTreeNode(image, self._child_map)
|
||||
self._image_map[image.id] = image_node
|
||||
|
||||
# Add the node to the child map for its parent image (if any).
|
||||
parent_image_id = image.parent_id
|
||||
if parent_image_id is not None:
|
||||
self._child_map[parent_image_id].append(image_node)
|
||||
|
||||
# Build the tag map.
|
||||
for tag in all_tags:
|
||||
image_node = self._image_map.get(tag.image.id)
|
||||
if not image_node:
|
||||
continue
|
||||
|
||||
image_node.add_tag(tag.name)
|
||||
|
||||
def find_longest_path(self, image_id, checker):
|
||||
""" Returns a list of images representing the longest path that matches the given
|
||||
checker function, starting from the given image_id *exclusive*.
|
||||
"""
|
||||
start_node = self._image_map.get(image_id)
|
||||
if not start_node:
|
||||
return []
|
||||
|
||||
return self._find_longest_path(start_node, checker, -1)[1:]
|
||||
|
||||
def _find_longest_path(self, image_node, checker, index):
|
||||
found_path = []
|
||||
|
||||
for child_node in image_node.children:
|
||||
if not checker(index + 1, child_node.image):
|
||||
continue
|
||||
|
||||
found = self._find_longest_path(child_node, checker, index + 1)
|
||||
if found and len(found) > len(found_path):
|
||||
found_path = found
|
||||
|
||||
return [image_node.image] + found_path
|
||||
|
||||
def tag_containing_image(self, image):
|
||||
""" Returns the name of the closest tag containing the given image. """
|
||||
if not image:
|
||||
return None
|
||||
|
||||
# Check the current image for a tag.
|
||||
image_node = self._image_map.get(image.id)
|
||||
if image_node is None:
|
||||
return None
|
||||
|
||||
if image_node.tags:
|
||||
return image_node.tags[0]
|
||||
|
||||
# Check any deriving images for a tag.
|
||||
for child_node in image_node.children:
|
||||
found = self.tag_containing_image(child_node.image)
|
||||
if found is not None:
|
||||
return found
|
||||
|
||||
return None
|
|
@ -1,82 +0,0 @@
|
|||
from data import model
|
||||
from util.imagetree import ImageTree
|
||||
|
||||
from test.fixtures import *
|
||||
|
||||
NAMESPACE = 'devtable'
|
||||
SIMPLE_REPO = 'simple'
|
||||
COMPLEX_REPO = 'complex'
|
||||
|
||||
def _get_base_image(all_images):
|
||||
for image in all_images:
|
||||
if image.ancestors == '/':
|
||||
return image
|
||||
|
||||
return None
|
||||
|
||||
def test_longest_path_simple_repo(initialized_db):
|
||||
all_images = list(model.image.get_repository_images(NAMESPACE, SIMPLE_REPO))
|
||||
all_tags = list(model.tag.list_repository_tags(NAMESPACE, SIMPLE_REPO))
|
||||
tree = ImageTree(all_images, all_tags)
|
||||
|
||||
base_image = _get_base_image(all_images)
|
||||
tag_image = all_tags[0].image
|
||||
|
||||
def checker(index, image):
|
||||
return True
|
||||
|
||||
ancestors = tag_image.ancestors.split('/')[2:-1] # Skip the first image.
|
||||
result = tree.find_longest_path(base_image.id, checker)
|
||||
assert len(result) == 3
|
||||
for index in range(0, 2):
|
||||
assert result[index].id == int(ancestors[index])
|
||||
|
||||
assert tree.tag_containing_image(result[-1]) == 'latest'
|
||||
|
||||
def test_longest_path_complex_repo(initialized_db):
|
||||
all_images = list(model.image.get_repository_images(NAMESPACE, COMPLEX_REPO))
|
||||
all_tags = list(model.tag.list_repository_tags(NAMESPACE, COMPLEX_REPO))
|
||||
tree = ImageTree(all_images, all_tags)
|
||||
|
||||
base_image = _get_base_image(all_images)
|
||||
|
||||
def checker(index, image):
|
||||
return True
|
||||
|
||||
result = tree.find_longest_path(base_image.id, checker)
|
||||
assert len(result) == 5
|
||||
assert tree.tag_containing_image(result[-1]) == 'prod'
|
||||
|
||||
def test_filtering(initialized_db):
|
||||
all_images = list(model.image.get_repository_images(NAMESPACE, COMPLEX_REPO))
|
||||
all_tags = list(model.tag.list_repository_tags(NAMESPACE, COMPLEX_REPO))
|
||||
tree = ImageTree(all_images, all_tags, base_filter=1245)
|
||||
|
||||
base_image = _get_base_image(all_images)
|
||||
|
||||
def checker(index, image):
|
||||
return True
|
||||
|
||||
result = tree.find_longest_path(base_image.id, checker)
|
||||
assert len(result) == 0
|
||||
|
||||
def test_longest_path_simple_repo_direct_lookup(initialized_db):
|
||||
repository = model.repository.get_repository(NAMESPACE, SIMPLE_REPO)
|
||||
all_images = list(model.image.get_repository_images(NAMESPACE, SIMPLE_REPO))
|
||||
all_tags = list(model.tag.list_repository_tags(NAMESPACE, SIMPLE_REPO))
|
||||
|
||||
base_image = _get_base_image(all_images)
|
||||
|
||||
def checker(index, image):
|
||||
return True
|
||||
|
||||
filtered_images = model.image.get_repository_images_without_placements(
|
||||
repository,
|
||||
with_ancestor=base_image)
|
||||
assert set([a.id for a in all_images]) == set([f.id for f in filtered_images])
|
||||
|
||||
tree = ImageTree(filtered_images, all_tags)
|
||||
|
||||
result = tree.find_longest_path(base_image.id, checker)
|
||||
assert len(result) == 3
|
||||
assert tree.tag_containing_image(result[-1]) == 'latest'
|
Reference in a new issue