import logging from peewee import SQL, IntegrityError from cachetools import lru_cache from collections import namedtuple from data.model import (config, db_transaction, InvalidImageException, TorrentInfoDoesNotExist, DataModelException, _basequery) from data.database import (ImageStorage, Image, ImageStoragePlacement, ImageStorageLocation, ImageStorageTransformation, ImageStorageSignature, ImageStorageSignatureKind, Repository, Namespace, TorrentInfo) logger = logging.getLogger(__name__) _Location = namedtuple('location', ['id', 'name']) @lru_cache(maxsize=1) def get_image_locations(): location_map = {} for location in ImageStorageLocation.select(): location_tuple = _Location(location.id, location.name) location_map[location.id] = location_tuple location_map[location.name] = location_tuple return location_map def get_image_location_for_name(location_name): locations = get_image_locations() return locations[location_name] def get_image_location_for_id(location_id): locations = get_image_locations() return locations[location_id] def add_storage_placement(storage, location_name): """ Adds a storage placement for the given storage at the given location. """ location = get_image_location_for_name(location_name) try: ImageStoragePlacement.create(location=location.id, storage=storage) except IntegrityError: # Placement already exists. Nothing to do. pass def _orphaned_storage_query(candidate_ids): """ Returns the subset of the candidate ImageStorage IDs representing storages that are no longer referenced by images. """ # Issue a union query to find all storages that are still referenced by a candidate storage. This # is much faster than the group_by and having call we used to use here. nonorphaned_queries = [] for counter, candidate_id in enumerate(candidate_ids): query_alias = 'q{0}'.format(counter) storage_subq = (ImageStorage .select(ImageStorage.id) .join(Image) .where(ImageStorage.id == candidate_id) .limit(1) .alias(query_alias)) nonorphaned_queries.append(ImageStorage .select(SQL('*')) .from_(storage_subq)) # Build the set of storages that are missing. These storages are orphaned. nonorphaned_storage_ids = {storage.id for storage in _reduce_as_tree(nonorphaned_queries)} return list(candidate_ids - nonorphaned_storage_ids) def garbage_collect_storage(storage_id_whitelist): if len(storage_id_whitelist) == 0: return def placements_query_to_paths_set(placements_query): return {(get_image_location_for_id(placement.location_id).name, get_layer_path(placement.storage)) for placement in placements_query} # Note: Both of these deletes must occur in the same transaction (unfortunately) because a # storage without any placement is invalid, and a placement cannot exist without a storage. # TODO(jake): We might want to allow for null storages on placements, which would allow us to # delete the storages, then delete the placements in a non-transaction. logger.debug('Garbage collecting storages from candidates: %s', storage_id_whitelist) with db_transaction(): orphaned_storage_ids = _orphaned_storage_query(storage_id_whitelist) if len(orphaned_storage_ids) == 0: # Nothing to GC. return placements_to_remove = list(ImageStoragePlacement .select() .join(ImageStorage) .where(ImageStorage.id << orphaned_storage_ids)) paths_to_remove = placements_query_to_paths_set(placements_to_remove) # Remove the placements for orphaned storages if len(placements_to_remove) > 0: placement_ids_to_remove = [placement.id for placement in placements_to_remove] placements_removed = (ImageStoragePlacement .delete() .where(ImageStoragePlacement.id << placement_ids_to_remove) .execute()) logger.debug('Removed %s image storage placements', placements_removed) # Remove all orphaned storages torrents_removed = (TorrentInfo .delete() .where(TorrentInfo.storage << orphaned_storage_ids) .execute()) logger.debug('Removed %s torrent info records', torrents_removed) signatures_removed = (ImageStorageSignature .delete() .where(ImageStorageSignature.storage << orphaned_storage_ids) .execute()) logger.debug('Removed %s image storage signatures', signatures_removed) storages_removed = (ImageStorage .delete() .where(ImageStorage.id << orphaned_storage_ids) .execute()) logger.debug('Removed %s image storage records', storages_removed) # We are going to make the conscious decision to not delete image storage blobs inside # transactions. # This may end up producing garbage in s3, trading off for higher availability in the database. for location_name, image_path in paths_to_remove: logger.debug('Removing %s from %s', image_path, location_name) config.store.remove({location_name}, image_path) def create_v1_storage(location_name): storage = ImageStorage.create(cas_path=False) location = get_image_location_for_name(location_name) ImageStoragePlacement.create(location=location.id, storage=storage) storage.locations = {location_name} return storage def find_or_create_storage_signature(storage, signature_kind_name): found = lookup_storage_signature(storage, signature_kind_name) if found is None: kind = ImageStorageSignatureKind.get(name=signature_kind_name) found = ImageStorageSignature.create(storage=storage, kind=kind) return found def lookup_storage_signature(storage, signature_kind_name): kind = ImageStorageSignatureKind.get(name=signature_kind_name) try: return (ImageStorageSignature .select() .where(ImageStorageSignature.storage == storage, ImageStorageSignature.kind == kind) .get()) except ImageStorageSignature.DoesNotExist: return None def _get_storage(query_modifier): query = (ImageStoragePlacement .select(ImageStoragePlacement, ImageStorage) .switch(ImageStoragePlacement) .join(ImageStorage)) placements = list(query_modifier(query)) if not placements: raise InvalidImageException() found = placements[0].storage found.locations = {get_image_location_for_id(placement.location_id).name for placement in placements} return found def get_storage_by_subquery(subquery): """ Returns the storage (and its locations) for the storage id returned by the subquery. The subquery must return at most 1 result, which is a storage ID. """ def filter_by_subquery(query): return query.where(ImageStorage.id == subquery) return _get_storage(filter_by_subquery) def get_storage_by_uuid(storage_uuid): def filter_to_uuid(query): return query.where(ImageStorage.uuid == storage_uuid) try: return _get_storage(filter_to_uuid) except InvalidImageException: raise InvalidImageException('No storage found with uuid: %s', storage_uuid) def get_layer_path(storage_record): """ Returns the path in the storage engine to the layer data referenced by the storage row. """ store = config.store if not storage_record.cas_path: logger.debug('Serving layer from legacy v1 path') return store.v1_image_layer_path(storage_record.uuid) return store.blob_path(storage_record.content_checksum) def lookup_repo_storages_by_content_checksum(repo, checksums): """ Looks up repository storages (without placements) matching the given repository and checksum. """ # There may be many duplicates of the checksums, so for performance reasons we are going # to use a union to select just one storage with each checksum queries = [] for counter, checksum in enumerate(set(checksums)): query_alias = 'q{0}'.format(counter) candidate_subq = (ImageStorage .select(ImageStorage.id, ImageStorage.content_checksum, ImageStorage.image_size, ImageStorage.uuid) .join(Image) .where(Image.repository == repo, ImageStorage.content_checksum == checksum) .limit(1) .alias(query_alias)) queries.append(ImageStorage .select(SQL('*')) .from_(candidate_subq)) return _reduce_as_tree(queries) def _reduce_as_tree(queries_to_reduce): """ This method will split a list of queries into halves recursively until we reach individual queries, at which point it will start unioning the queries, or the already unioned subqueries. This works around a bug in peewee SQL generation where reducing linearly generates a chain of queries that will exceed the recursion depth limit when it has around 80 queries. """ mid = len(queries_to_reduce)/2 left = queries_to_reduce[:mid] right = queries_to_reduce[mid:] to_reduce_right = right[0] if len(right) > 1: to_reduce_right = _reduce_as_tree(right) if len(left) > 1: to_reduce_left = _reduce_as_tree(left) elif len(left) == 1: to_reduce_left = left[0] else: return to_reduce_right return to_reduce_left.union_all(to_reduce_right) def set_image_storage_metadata(docker_image_id, namespace_name, repository_name, image_size, uncompressed_size): """ Sets metadata that is specific to the binary storage of the data, irrespective of how it is used in the layer tree. """ if image_size is None: raise DataModelException('Empty image size field') try: image = (Image .select(Image, ImageStorage) .join(Repository) .join(Namespace, on=(Repository.namespace_user == Namespace.id)) .switch(Image) .join(ImageStorage) .where(Repository.name == repository_name, Namespace.username == namespace_name, Image.docker_image_id == docker_image_id) .get()) except ImageStorage.DoesNotExist: raise InvalidImageException('No image with specified id and repository') # We MUST do this here, it can't be done in the corresponding image call because the storage # has not yet been pushed image.aggregate_size = _basequery.calculate_image_aggregate_size(image.ancestors, image_size, image.parent) image.save() image.storage.image_size = image_size image.storage.uncompressed_size = uncompressed_size image.storage.save() return image.storage def get_storage_locations(uuid): query = (ImageStoragePlacement .select() .join(ImageStorage) .where(ImageStorage.uuid == uuid)) return [get_image_location_for_id(placement.location_id).name for placement in query] def save_torrent_info(storage_object, piece_length, pieces): try: TorrentInfo.create(storage=storage_object, piece_length=piece_length, pieces=pieces) except IntegrityError: # TorrentInfo already exists for this storage. pass def get_torrent_info(blob): try: return (TorrentInfo .select() .where(TorrentInfo.storage == blob) .get()) except TorrentInfo.DoesNotExist: raise TorrentInfoDoesNotExist