Add a batch get_matching_tags_for_images
method
This will be used in the security notification worker to retrieving the tags needed in a set of batch calls, rather than multiple calls per image
This commit is contained in:
parent
e583be3914
commit
74dd0ef8e8
4 changed files with 162 additions and 36 deletions
|
@ -6,6 +6,29 @@ from data.database import (Repository, User, Team, TeamMember, RepositoryPermiss
|
||||||
Namespace, Visibility, ImageStorage, Image, RepositoryKind,
|
Namespace, Visibility, ImageStorage, Image, RepositoryKind,
|
||||||
db_for_update)
|
db_for_update)
|
||||||
|
|
||||||
|
def reduce_as_tree(queries_to_reduce):
|
||||||
|
""" This method will split a list of queries into halves recursively until we reach individual
|
||||||
|
queries, at which point it will start unioning the queries, or the already unioned subqueries.
|
||||||
|
This works around a bug in peewee SQL generation where reducing linearly generates a chain
|
||||||
|
of queries that will exceed the recursion depth limit when it has around 80 queries.
|
||||||
|
"""
|
||||||
|
mid = len(queries_to_reduce)/2
|
||||||
|
left = queries_to_reduce[:mid]
|
||||||
|
right = queries_to_reduce[mid:]
|
||||||
|
|
||||||
|
to_reduce_right = right[0]
|
||||||
|
if len(right) > 1:
|
||||||
|
to_reduce_right = reduce_as_tree(right)
|
||||||
|
|
||||||
|
if len(left) > 1:
|
||||||
|
to_reduce_left = reduce_as_tree(left)
|
||||||
|
elif len(left) == 1:
|
||||||
|
to_reduce_left = left[0]
|
||||||
|
else:
|
||||||
|
return to_reduce_right
|
||||||
|
|
||||||
|
return to_reduce_left.union_all(to_reduce_right)
|
||||||
|
|
||||||
|
|
||||||
def get_existing_repository(namespace_name, repository_name, for_update=False, kind_filter=None):
|
def get_existing_repository(namespace_name, repository_name, for_update=False, kind_filter=None):
|
||||||
query = (Repository
|
query = (Repository
|
||||||
|
|
|
@ -68,7 +68,8 @@ def _orphaned_storage_query(candidate_ids):
|
||||||
.from_(storage_subq))
|
.from_(storage_subq))
|
||||||
|
|
||||||
# Build the set of storages that are missing. These storages are orphaned.
|
# Build the set of storages that are missing. These storages are orphaned.
|
||||||
nonorphaned_storage_ids = {storage.id for storage in _reduce_as_tree(nonorphaned_queries)}
|
nonorphaned_storage_ids = {storage.id for storage
|
||||||
|
in _basequery.reduce_as_tree(nonorphaned_queries)}
|
||||||
return list(candidate_ids - nonorphaned_storage_ids)
|
return list(candidate_ids - nonorphaned_storage_ids)
|
||||||
|
|
||||||
|
|
||||||
|
@ -275,31 +276,7 @@ def lookup_repo_storages_by_content_checksum(repo, checksums):
|
||||||
.select(SQL('*'))
|
.select(SQL('*'))
|
||||||
.from_(candidate_subq))
|
.from_(candidate_subq))
|
||||||
|
|
||||||
return _reduce_as_tree(queries)
|
return _basequery.reduce_as_tree(queries)
|
||||||
|
|
||||||
|
|
||||||
def _reduce_as_tree(queries_to_reduce):
|
|
||||||
""" This method will split a list of queries into halves recursively until we reach individual
|
|
||||||
queries, at which point it will start unioning the queries, or the already unioned subqueries.
|
|
||||||
This works around a bug in peewee SQL generation where reducing linearly generates a chain
|
|
||||||
of queries that will exceed the recursion depth limit when it has around 80 queries.
|
|
||||||
"""
|
|
||||||
mid = len(queries_to_reduce)/2
|
|
||||||
left = queries_to_reduce[:mid]
|
|
||||||
right = queries_to_reduce[mid:]
|
|
||||||
|
|
||||||
to_reduce_right = right[0]
|
|
||||||
if len(right) > 1:
|
|
||||||
to_reduce_right = _reduce_as_tree(right)
|
|
||||||
|
|
||||||
if len(left) > 1:
|
|
||||||
to_reduce_left = _reduce_as_tree(left)
|
|
||||||
elif len(left) == 1:
|
|
||||||
to_reduce_left = left[0]
|
|
||||||
else:
|
|
||||||
return to_reduce_right
|
|
||||||
|
|
||||||
return to_reduce_left.union_all(to_reduce_right)
|
|
||||||
|
|
||||||
|
|
||||||
def set_image_storage_metadata(docker_image_id, namespace_name, repository_name, image_size,
|
def set_image_storage_metadata(docker_image_id, namespace_name, repository_name, image_size,
|
||||||
|
|
|
@ -53,6 +53,73 @@ def _tag_alive(query, now_ts=None):
|
||||||
(RepositoryTag.lifetime_end_ts > now_ts))
|
(RepositoryTag.lifetime_end_ts > now_ts))
|
||||||
|
|
||||||
|
|
||||||
|
_MAX_SUB_QUERIES = 100
|
||||||
|
|
||||||
|
def get_matching_tags_for_images(image_pairs, filter_query=None, selections=None):
|
||||||
|
""" Returns all tags that contain the images with the given docker_image_id and storage_uuid,
|
||||||
|
as specified as an iterable of pairs. """
|
||||||
|
if not image_pairs:
|
||||||
|
return []
|
||||||
|
|
||||||
|
image_pairs = set(image_pairs)
|
||||||
|
|
||||||
|
# Find all possible matching image+storages.
|
||||||
|
ids = [image_pair[0] for image_pair in image_pairs]
|
||||||
|
uuids = [image_pair[1] for image_pair in image_pairs]
|
||||||
|
images_query = (Image
|
||||||
|
.select(Image.id, Image.docker_image_id, Image.ancestors, ImageStorage.uuid)
|
||||||
|
.join(ImageStorage)
|
||||||
|
.where(Image.docker_image_id << ids, ImageStorage.uuid << uuids))
|
||||||
|
|
||||||
|
# Filter down to those images actually in the pairs set and build the set of queries to run.
|
||||||
|
individual_image_queries = []
|
||||||
|
|
||||||
|
for img in images_query:
|
||||||
|
# Make sure the actual image was requested.
|
||||||
|
pair = (img.docker_image_id, img.storage.uuid)
|
||||||
|
if pair not in image_pairs:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Remove the pair so we don't try it again.
|
||||||
|
image_pairs.remove(pair)
|
||||||
|
|
||||||
|
ancestors_str = '%s%s/%%' % (img.ancestors, img.id)
|
||||||
|
query = (Image
|
||||||
|
.select(Image.id)
|
||||||
|
.where((Image.id == img.id) | (Image.ancestors ** ancestors_str)))
|
||||||
|
|
||||||
|
individual_image_queries.append(query)
|
||||||
|
|
||||||
|
if not individual_image_queries:
|
||||||
|
return []
|
||||||
|
|
||||||
|
# Shard based on the max subquery count. This is used to prevent going over the DB's max query
|
||||||
|
# size, as well as to prevent the DB from locking up on a massive query.
|
||||||
|
sharded_queries = []
|
||||||
|
while individual_image_queries:
|
||||||
|
shard = individual_image_queries[0:_MAX_SUB_QUERIES]
|
||||||
|
sharded_queries.append(_basequery.reduce_as_tree(shard))
|
||||||
|
individual_image_queries = individual_image_queries[_MAX_SUB_QUERIES:]
|
||||||
|
|
||||||
|
# Collect IDs of the tags found for each query.
|
||||||
|
tags = {}
|
||||||
|
for query in sharded_queries:
|
||||||
|
tag_query = (_tag_alive(RepositoryTag
|
||||||
|
.select(*(selections or []))
|
||||||
|
.distinct()
|
||||||
|
.join(Image)
|
||||||
|
.where(RepositoryTag.hidden == False)
|
||||||
|
.where(Image.id << query)))
|
||||||
|
|
||||||
|
if filter_query is not None:
|
||||||
|
tag_query = filter_query(tag_query)
|
||||||
|
|
||||||
|
for tag in tag_query:
|
||||||
|
tags[tag.id] = tag
|
||||||
|
|
||||||
|
return tags.values()
|
||||||
|
|
||||||
|
|
||||||
def get_matching_tags(docker_image_id, storage_uuid, *args):
|
def get_matching_tags(docker_image_id, storage_uuid, *args):
|
||||||
""" Returns a query pointing to all tags that contain the image with the
|
""" Returns a query pointing to all tags that contain the image with the
|
||||||
given docker_image_id and storage_uuid. """
|
given docker_image_id and storage_uuid. """
|
||||||
|
|
|
@ -1,25 +1,84 @@
|
||||||
from data.database import Image, RepositoryTag
|
from data.database import Image, RepositoryTag, ImageStorage, Repository
|
||||||
from data.model.repository import create_repository
|
from data.model.repository import create_repository
|
||||||
from data.model.tag import (list_active_repo_tags, create_or_update_tag, delete_tag,
|
from data.model.tag import (list_active_repo_tags, create_or_update_tag, delete_tag,
|
||||||
get_matching_tags, _tag_alive)
|
get_matching_tags, _tag_alive, get_matching_tags_for_images)
|
||||||
from data.model.image import find_create_or_link_image
|
from data.model.image import find_create_or_link_image
|
||||||
|
|
||||||
from test.fixtures import *
|
from test.fixtures import *
|
||||||
|
|
||||||
|
def _get_expected_tags(image):
|
||||||
|
expected_query = (RepositoryTag
|
||||||
|
.select()
|
||||||
|
.join(Image)
|
||||||
|
.where(RepositoryTag.hidden == False)
|
||||||
|
.where((Image.id == image.id) | (Image.ancestors ** ('%%/%s/%%' % image.id))))
|
||||||
|
return set([tag.id for tag in _tag_alive(expected_query)])
|
||||||
|
|
||||||
|
|
||||||
def test_get_matching_tags(initialized_db):
|
def test_get_matching_tags(initialized_db):
|
||||||
# Test for every image in the test database.
|
# Test for every image in the test database.
|
||||||
for image in Image.select():
|
for image in Image.select(Image, ImageStorage).join(ImageStorage):
|
||||||
matching_query = get_matching_tags(image.docker_image_id, image.storage.uuid)
|
matching_query = get_matching_tags(image.docker_image_id, image.storage.uuid)
|
||||||
expected_query = (RepositoryTag
|
|
||||||
.select()
|
|
||||||
.join(Image)
|
|
||||||
.where(RepositoryTag.hidden == False)
|
|
||||||
.where((Image.id == image.id) | (Image.ancestors ** ('%%/%s/%%' % image.id))))
|
|
||||||
|
|
||||||
matching_tags = set([tag.id for tag in matching_query])
|
matching_tags = set([tag.id for tag in matching_query])
|
||||||
expected_tags = set([tag.id for tag in _tag_alive(expected_query)])
|
expected_tags = _get_expected_tags(image)
|
||||||
assert matching_tags == expected_tags, "mismatch for image %s" % image.id
|
assert matching_tags == expected_tags, "mismatch for image %s" % image.id
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_matching_tag_ids_for_images(initialized_db):
|
||||||
|
# Try for various sets of the first N images.
|
||||||
|
for count in [5, 10, 15]:
|
||||||
|
pairs = []
|
||||||
|
expected_tags_ids = set()
|
||||||
|
for image in Image.select(Image, ImageStorage).join(ImageStorage):
|
||||||
|
if len(pairs) >= count:
|
||||||
|
break
|
||||||
|
|
||||||
|
pairs.append((image.docker_image_id, image.storage.uuid))
|
||||||
|
expected_tags_ids.update(_get_expected_tags(image))
|
||||||
|
|
||||||
|
matching_tags_ids = set([tag.id for tag in get_matching_tags_for_images(pairs)])
|
||||||
|
assert matching_tags_ids == expected_tags_ids
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_matching_tag_ids_for_all_images(initialized_db):
|
||||||
|
pairs = []
|
||||||
|
for image in Image.select(Image, ImageStorage).join(ImageStorage):
|
||||||
|
pairs.append((image.docker_image_id, image.storage.uuid))
|
||||||
|
|
||||||
|
expected_tags_ids = set([tag.id for tag in _tag_alive(RepositoryTag.select())])
|
||||||
|
matching_tags_ids = set([tag.id for tag in get_matching_tags_for_images(pairs)])
|
||||||
|
|
||||||
|
# Ensure every alive tag was found.
|
||||||
|
assert matching_tags_ids == expected_tags_ids
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_matching_tag_ids_images_filtered(initialized_db):
|
||||||
|
def filter_query(query):
|
||||||
|
return query.join(Repository).where(Repository.name == 'simple')
|
||||||
|
|
||||||
|
filtered_images = filter_query(Image
|
||||||
|
.select(Image, ImageStorage)
|
||||||
|
.join(RepositoryTag)
|
||||||
|
.switch(Image)
|
||||||
|
.join(ImageStorage)
|
||||||
|
.switch(Image))
|
||||||
|
|
||||||
|
expected_tags_query = _tag_alive(filter_query(RepositoryTag
|
||||||
|
.select()))
|
||||||
|
|
||||||
|
pairs = []
|
||||||
|
for image in filtered_images:
|
||||||
|
pairs.append((image.docker_image_id, image.storage.uuid))
|
||||||
|
|
||||||
|
matching_tags = get_matching_tags_for_images(pairs, filter_query=filter_query)
|
||||||
|
|
||||||
|
expected_tag_ids = set([tag.id for tag in expected_tags_query])
|
||||||
|
matching_tags_ids = set([tag.id for tag in matching_tags])
|
||||||
|
|
||||||
|
# Ensure every alive tag was found.
|
||||||
|
assert matching_tags_ids == expected_tag_ids
|
||||||
|
|
||||||
|
|
||||||
def assert_tags(repository, *args):
|
def assert_tags(repository, *args):
|
||||||
tags = list(list_active_repo_tags(repository))
|
tags = list(list_active_repo_tags(repository))
|
||||||
assert len(tags) == len(args)
|
assert len(tags) == len(args)
|
||||||
|
|
Reference in a new issue