5225642850
Add support to GC to invoke a callback with the image+storages removed. Only images whose storage was also removed will be sent to the callback. This will be used by security scanning for its own GC in the followup change.
320 lines
12 KiB
Python
320 lines
12 KiB
Python
import logging
|
|
|
|
from peewee import SQL, IntegrityError
|
|
from cachetools import lru_cache
|
|
from collections import namedtuple
|
|
|
|
from data.model import (config, db_transaction, InvalidImageException, TorrentInfoDoesNotExist,
|
|
DataModelException, _basequery)
|
|
from data.database import (ImageStorage, Image, ImageStoragePlacement, ImageStorageLocation,
|
|
ImageStorageTransformation, ImageStorageSignature,
|
|
ImageStorageSignatureKind, Repository, Namespace, TorrentInfo)
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_Location = namedtuple('location', ['id', 'name'])
|
|
|
|
@lru_cache(maxsize=1)
|
|
def get_image_locations():
|
|
location_map = {}
|
|
for location in ImageStorageLocation.select():
|
|
location_tuple = _Location(location.id, location.name)
|
|
location_map[location.id] = location_tuple
|
|
location_map[location.name] = location_tuple
|
|
|
|
return location_map
|
|
|
|
|
|
def get_image_location_for_name(location_name):
|
|
locations = get_image_locations()
|
|
return locations[location_name]
|
|
|
|
|
|
def get_image_location_for_id(location_id):
|
|
locations = get_image_locations()
|
|
return locations[location_id]
|
|
|
|
|
|
def add_storage_placement(storage, location_name):
|
|
""" Adds a storage placement for the given storage at the given location. """
|
|
location = get_image_location_for_name(location_name)
|
|
try:
|
|
ImageStoragePlacement.create(location=location.id, storage=storage)
|
|
except IntegrityError:
|
|
# Placement already exists. Nothing to do.
|
|
pass
|
|
|
|
|
|
def _orphaned_storage_query(candidate_ids):
|
|
""" Returns the subset of the candidate ImageStorage IDs representing storages that are no
|
|
longer referenced by images.
|
|
"""
|
|
# Issue a union query to find all storages that are still referenced by a candidate storage. This
|
|
# is much faster than the group_by and having call we used to use here.
|
|
nonorphaned_queries = []
|
|
for counter, candidate_id in enumerate(candidate_ids):
|
|
query_alias = 'q{0}'.format(counter)
|
|
storage_subq = (ImageStorage
|
|
.select(ImageStorage.id)
|
|
.join(Image)
|
|
.where(ImageStorage.id == candidate_id)
|
|
.limit(1)
|
|
.alias(query_alias))
|
|
|
|
nonorphaned_queries.append(ImageStorage
|
|
.select(SQL('*'))
|
|
.from_(storage_subq))
|
|
|
|
# Build the set of storages that are missing. These storages are orphaned.
|
|
nonorphaned_storage_ids = {storage.id for storage in _reduce_as_tree(nonorphaned_queries)}
|
|
return list(candidate_ids - nonorphaned_storage_ids)
|
|
|
|
|
|
def garbage_collect_storage(storage_id_whitelist):
|
|
""" Performs GC on a possible subset of the storage's with the IDs found in the
|
|
whitelist. The storages in the whitelist will be checked, and any orphaned will
|
|
be removed, with those IDs being returned.
|
|
"""
|
|
if len(storage_id_whitelist) == 0:
|
|
return []
|
|
|
|
def placements_query_to_paths_set(placements_query):
|
|
return {(get_image_location_for_id(placement.location_id).name,
|
|
get_layer_path(placement.storage))
|
|
for placement in placements_query}
|
|
|
|
# Note: Both of these deletes must occur in the same transaction (unfortunately) because a
|
|
# storage without any placement is invalid, and a placement cannot exist without a storage.
|
|
# TODO(jake): We might want to allow for null storages on placements, which would allow us to
|
|
# delete the storages, then delete the placements in a non-transaction.
|
|
logger.debug('Garbage collecting storages from candidates: %s', storage_id_whitelist)
|
|
with db_transaction():
|
|
orphaned_storage_ids = _orphaned_storage_query(storage_id_whitelist)
|
|
if len(orphaned_storage_ids) == 0:
|
|
# Nothing to GC.
|
|
return []
|
|
|
|
placements_to_remove = list(ImageStoragePlacement
|
|
.select()
|
|
.join(ImageStorage)
|
|
.where(ImageStorage.id << orphaned_storage_ids))
|
|
|
|
paths_to_remove = placements_query_to_paths_set(placements_to_remove)
|
|
|
|
# Remove the placements for orphaned storages
|
|
if len(placements_to_remove) > 0:
|
|
placement_ids_to_remove = [placement.id for placement in placements_to_remove]
|
|
placements_removed = (ImageStoragePlacement
|
|
.delete()
|
|
.where(ImageStoragePlacement.id << placement_ids_to_remove)
|
|
.execute())
|
|
logger.debug('Removed %s image storage placements', placements_removed)
|
|
|
|
# Remove all orphaned storages
|
|
torrents_removed = (TorrentInfo
|
|
.delete()
|
|
.where(TorrentInfo.storage << orphaned_storage_ids)
|
|
.execute())
|
|
logger.debug('Removed %s torrent info records', torrents_removed)
|
|
|
|
signatures_removed = (ImageStorageSignature
|
|
.delete()
|
|
.where(ImageStorageSignature.storage << orphaned_storage_ids)
|
|
.execute())
|
|
logger.debug('Removed %s image storage signatures', signatures_removed)
|
|
|
|
storages_removed = (ImageStorage
|
|
.delete()
|
|
.where(ImageStorage.id << orphaned_storage_ids)
|
|
.execute())
|
|
logger.debug('Removed %s image storage records', storages_removed)
|
|
|
|
# We are going to make the conscious decision to not delete image storage blobs inside
|
|
# transactions.
|
|
# This may end up producing garbage in s3, trading off for higher availability in the database.
|
|
for location_name, image_path in paths_to_remove:
|
|
logger.debug('Removing %s from %s', image_path, location_name)
|
|
config.store.remove({location_name}, image_path)
|
|
|
|
return orphaned_storage_ids
|
|
|
|
|
|
def create_v1_storage(location_name):
|
|
storage = ImageStorage.create(cas_path=False)
|
|
location = get_image_location_for_name(location_name)
|
|
ImageStoragePlacement.create(location=location.id, storage=storage)
|
|
storage.locations = {location_name}
|
|
return storage
|
|
|
|
|
|
def find_or_create_storage_signature(storage, signature_kind_name):
|
|
found = lookup_storage_signature(storage, signature_kind_name)
|
|
if found is None:
|
|
kind = ImageStorageSignatureKind.get(name=signature_kind_name)
|
|
found = ImageStorageSignature.create(storage=storage, kind=kind)
|
|
|
|
return found
|
|
|
|
|
|
def lookup_storage_signature(storage, signature_kind_name):
|
|
kind = ImageStorageSignatureKind.get(name=signature_kind_name)
|
|
try:
|
|
return (ImageStorageSignature
|
|
.select()
|
|
.where(ImageStorageSignature.storage == storage, ImageStorageSignature.kind == kind)
|
|
.get())
|
|
except ImageStorageSignature.DoesNotExist:
|
|
return None
|
|
|
|
|
|
def _get_storage(query_modifier):
|
|
query = (ImageStoragePlacement
|
|
.select(ImageStoragePlacement, ImageStorage)
|
|
.switch(ImageStoragePlacement)
|
|
.join(ImageStorage))
|
|
|
|
placements = list(query_modifier(query))
|
|
|
|
if not placements:
|
|
raise InvalidImageException()
|
|
|
|
found = placements[0].storage
|
|
found.locations = {get_image_location_for_id(placement.location_id).name
|
|
for placement in placements}
|
|
return found
|
|
|
|
|
|
def get_storage_by_subquery(subquery):
|
|
""" Returns the storage (and its locations) for the storage id returned by the subquery. The
|
|
subquery must return at most 1 result, which is a storage ID. """
|
|
def filter_by_subquery(query):
|
|
return query.where(ImageStorage.id == subquery)
|
|
|
|
return _get_storage(filter_by_subquery)
|
|
|
|
|
|
def get_storage_by_uuid(storage_uuid):
|
|
def filter_to_uuid(query):
|
|
return query.where(ImageStorage.uuid == storage_uuid)
|
|
|
|
try:
|
|
return _get_storage(filter_to_uuid)
|
|
except InvalidImageException:
|
|
raise InvalidImageException('No storage found with uuid: %s', storage_uuid)
|
|
|
|
|
|
def get_layer_path(storage_record):
|
|
""" Returns the path in the storage engine to the layer data referenced by the storage row. """
|
|
store = config.store
|
|
if not storage_record.cas_path:
|
|
logger.debug('Serving layer from legacy v1 path')
|
|
return store.v1_image_layer_path(storage_record.uuid)
|
|
|
|
return store.blob_path(storage_record.content_checksum)
|
|
|
|
|
|
def lookup_repo_storages_by_content_checksum(repo, checksums):
|
|
""" Looks up repository storages (without placements) matching the given repository
|
|
and checksum. """
|
|
# There may be many duplicates of the checksums, so for performance reasons we are going
|
|
# to use a union to select just one storage with each checksum
|
|
queries = []
|
|
|
|
for counter, checksum in enumerate(set(checksums)):
|
|
query_alias = 'q{0}'.format(counter)
|
|
candidate_subq = (ImageStorage
|
|
.select(ImageStorage.id, ImageStorage.content_checksum,
|
|
ImageStorage.image_size, ImageStorage.uuid)
|
|
.join(Image)
|
|
.where(Image.repository == repo, ImageStorage.content_checksum == checksum)
|
|
.limit(1)
|
|
.alias(query_alias))
|
|
queries.append(ImageStorage
|
|
.select(SQL('*'))
|
|
.from_(candidate_subq))
|
|
|
|
return _reduce_as_tree(queries)
|
|
|
|
|
|
def _reduce_as_tree(queries_to_reduce):
|
|
""" This method will split a list of queries into halves recursively until we reach individual
|
|
queries, at which point it will start unioning the queries, or the already unioned subqueries.
|
|
This works around a bug in peewee SQL generation where reducing linearly generates a chain
|
|
of queries that will exceed the recursion depth limit when it has around 80 queries.
|
|
"""
|
|
mid = len(queries_to_reduce)/2
|
|
left = queries_to_reduce[:mid]
|
|
right = queries_to_reduce[mid:]
|
|
|
|
to_reduce_right = right[0]
|
|
if len(right) > 1:
|
|
to_reduce_right = _reduce_as_tree(right)
|
|
|
|
if len(left) > 1:
|
|
to_reduce_left = _reduce_as_tree(left)
|
|
elif len(left) == 1:
|
|
to_reduce_left = left[0]
|
|
else:
|
|
return to_reduce_right
|
|
|
|
return to_reduce_left.union_all(to_reduce_right)
|
|
|
|
|
|
def set_image_storage_metadata(docker_image_id, namespace_name, repository_name, image_size,
|
|
uncompressed_size):
|
|
""" Sets metadata that is specific to the binary storage of the data, irrespective of how it
|
|
is used in the layer tree.
|
|
"""
|
|
if image_size is None:
|
|
raise DataModelException('Empty image size field')
|
|
|
|
try:
|
|
image = (Image
|
|
.select(Image, ImageStorage)
|
|
.join(Repository)
|
|
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
|
|
.switch(Image)
|
|
.join(ImageStorage)
|
|
.where(Repository.name == repository_name, Namespace.username == namespace_name,
|
|
Image.docker_image_id == docker_image_id)
|
|
.get())
|
|
except ImageStorage.DoesNotExist:
|
|
raise InvalidImageException('No image with specified id and repository')
|
|
|
|
# We MUST do this here, it can't be done in the corresponding image call because the storage
|
|
# has not yet been pushed
|
|
image.aggregate_size = _basequery.calculate_image_aggregate_size(image.ancestors, image_size,
|
|
image.parent)
|
|
image.save()
|
|
|
|
image.storage.image_size = image_size
|
|
image.storage.uncompressed_size = uncompressed_size
|
|
image.storage.save()
|
|
return image.storage
|
|
|
|
|
|
def get_storage_locations(uuid):
|
|
query = (ImageStoragePlacement
|
|
.select()
|
|
.join(ImageStorage)
|
|
.where(ImageStorage.uuid == uuid))
|
|
|
|
return [get_image_location_for_id(placement.location_id).name for placement in query]
|
|
|
|
|
|
def save_torrent_info(storage_object, piece_length, pieces):
|
|
try:
|
|
TorrentInfo.create(storage=storage_object, piece_length=piece_length, pieces=pieces)
|
|
except IntegrityError:
|
|
# TorrentInfo already exists for this storage.
|
|
pass
|
|
|
|
def get_torrent_info(blob):
|
|
try:
|
|
return (TorrentInfo
|
|
.select()
|
|
.where(TorrentInfo.storage == blob)
|
|
.get())
|
|
except TorrentInfo.DoesNotExist:
|
|
raise TorrentInfoDoesNotExist
|