import logging from peewee import JOIN_LEFT_OUTER, fn, SQL from data.model import config, db_transaction, InvalidImageException from data.database import (ImageStorage, Image, DerivedStorageForImage, ImageStoragePlacement, ImageStorageLocation, ImageStorageTransformation, ImageStorageSignature, ImageStorageSignatureKind, Repository, Namespace) logger = logging.getLogger(__name__) def add_storage_placement(storage, location_name): """ Adds a storage placement for the given storage at the given location. """ location = ImageStorageLocation.get(name=location_name) ImageStoragePlacement.create(location=location, storage=storage) def garbage_collect_storage(storage_id_whitelist): if len(storage_id_whitelist) == 0: return def placements_query_to_paths_set(placements_query): return {(placement.location.name, get_layer_path(placement.storage)) for placement in placements_query} def orphaned_storage_query(select_base_query, candidates, group_by): return (select_base_query .switch(ImageStorage) .join(Image, JOIN_LEFT_OUTER) .switch(ImageStorage) .join(DerivedStorageForImage, JOIN_LEFT_OUTER, on=(ImageStorage.id == DerivedStorageForImage.derivative)) .where(ImageStorage.id << list(candidates)) .group_by(*group_by) .having((fn.Count(Image.id) == 0) & (fn.Count(DerivedStorageForImage.id) == 0))) # Note: Both of these deletes must occur in the same transaction (unfortunately) because a # storage without any placement is invalid, and a placement cannot exist without a storage. # TODO(jake): We might want to allow for null storages on placements, which would allow us to # delete the storages, then delete the placements in a non-transaction. logger.debug('Garbage collecting storages from candidates: %s', storage_id_whitelist) with db_transaction(): # Track all of the data that should be removed from blob storage placements_to_remove = list(orphaned_storage_query(ImageStoragePlacement .select(ImageStoragePlacement, ImageStorage, ImageStorageLocation) .join(ImageStorageLocation) .switch(ImageStoragePlacement) .join(ImageStorage), storage_id_whitelist, (ImageStorage, ImageStoragePlacement, ImageStorageLocation))) paths_to_remove = placements_query_to_paths_set(placements_to_remove) # Remove the placements for orphaned storages if len(placements_to_remove) > 0: placement_ids_to_remove = [placement.id for placement in placements_to_remove] placements_removed = (ImageStoragePlacement .delete() .where(ImageStoragePlacement.id << placement_ids_to_remove) .execute()) logger.debug('Removed %s image storage placements', placements_removed) # Remove all orphaned storages # The comma after ImageStorage.id is VERY important, it makes it a tuple, which is a sequence orphaned_storages = list(orphaned_storage_query(ImageStorage.select(ImageStorage.id), storage_id_whitelist, (ImageStorage.id,)).alias('osq')) if len(orphaned_storages) > 0: storages_removed = (ImageStorage .delete() .where(ImageStorage.id << orphaned_storages) .execute()) logger.debug('Removed %s image storage records', storages_removed) # We are going to make the conscious decision to not delete image storage blobs inside # transactions. # This may end up producing garbage in s3, trading off for higher availability in the database. for location_name, image_path in paths_to_remove: logger.debug('Removing %s from %s', image_path, location_name) config.store.remove({location_name}, image_path) def create_v1_storage(location_name): storage = ImageStorage.create(cas_path=False) location = ImageStorageLocation.get(name=location_name) ImageStoragePlacement.create(location=location, storage=storage) storage.locations = {location_name} return storage def find_or_create_storage_signature(storage, signature_kind): found = lookup_storage_signature(storage, signature_kind) if found is None: kind = ImageStorageSignatureKind.get(name=signature_kind) found = ImageStorageSignature.create(storage=storage, kind=kind) return found def lookup_storage_signature(storage, signature_kind): kind = ImageStorageSignatureKind.get(name=signature_kind) try: return (ImageStorageSignature .select() .where(ImageStorageSignature.storage == storage, ImageStorageSignature.kind == kind) .get()) except ImageStorageSignature.DoesNotExist: return None def _get_storage(query_modifier): query = (ImageStoragePlacement .select(ImageStoragePlacement, ImageStorage, ImageStorageLocation) .join(ImageStorageLocation) .switch(ImageStoragePlacement) .join(ImageStorage)) placements = list(query_modifier(query)) if not placements: raise InvalidImageException() found = placements[0].storage found.locations = {placement.location.name for placement in placements} return found def get_storage_by_subquery(subquery): """ Returns the storage (and its locations) for the storage id returned by the subquery. The subquery must return at most 1 result, which is a storage ID. """ def filter_by_subquery(query): return query.where(ImageStorage.id == subquery) return _get_storage(filter_by_subquery) def get_storage_by_uuid(storage_uuid): def filter_to_uuid(query): return query.where(ImageStorage.uuid == storage_uuid) try: return _get_storage(filter_to_uuid) except InvalidImageException: raise InvalidImageException('No storage found with uuid: %s', storage_uuid) def get_layer_path(storage_record): """ Returns the path in the storage engine to the layer data referenced by the storage row. """ store = config.store if not storage_record.cas_path: logger.debug('Serving layer from legacy v1 path') return store.v1_image_layer_path(storage_record.uuid) return store.blob_path(storage_record.content_checksum) def lookup_repo_storages_by_content_checksum(repo, checksums): """ Looks up repository storages (without placements) matching the given repository and checksum. """ # There may be many duplicates of the checksums, so for performance reasons we are going # to use a union to select just one storage with each checksum queries = [] for counter, checksum in enumerate(set(checksums)): query_alias = 'q{0}'.format(counter) candidate_subq = (ImageStorage .select(ImageStorage.id, ImageStorage.content_checksum, ImageStorage.image_size) .join(Image) .where(Image.repository == repo, ImageStorage.content_checksum == checksum) .limit(1) .alias(query_alias)) queries.append(ImageStorage .select(SQL('*')) .from_(candidate_subq)) return _reduce_as_tree(queries) def _reduce_as_tree(queries_to_reduce): """ This method will split a list of queries into halves recursively until we reach individual queries, at which point it will start unioning the queries, or the already unioned subqueries. This works around a bug in peewee SQL generation where reducing linearly generates a chain of queries that will exceed the recursion depth limit when it has around 80 queries. """ mid = len(queries_to_reduce)/2 left = queries_to_reduce[:mid] right = queries_to_reduce[mid:] to_reduce_right = right[0] if len(right) > 1: to_reduce_right = _reduce_as_tree(right) if len(left) > 1: to_reduce_left = _reduce_as_tree(left) elif len(left) == 1: to_reduce_left = left[0] else: return to_reduce_right return to_reduce_left.union_all(to_reduce_right) def get_storage_locations(uuid): query = (ImageStoragePlacement .select() .join(ImageStorageLocation) .switch(ImageStoragePlacement) .join(ImageStorage) .where(ImageStorage.uuid == uuid)) return [location.location.name for location in query]