236 lines
		
	
	
	
		
			9.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			236 lines
		
	
	
	
		
			9.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import logging
 | |
| 
 | |
| from peewee import JOIN_LEFT_OUTER, fn
 | |
| 
 | |
| from data.model import config, db_transaction, InvalidImageException
 | |
| from data.database import (ImageStorage, Image, DerivedImageStorage, ImageStoragePlacement,
 | |
|                            ImageStorageLocation, ImageStorageTransformation, ImageStorageSignature,
 | |
|                            ImageStorageSignatureKind, Repository, Namespace)
 | |
| 
 | |
| 
 | |
| logger = logging.getLogger(__name__)
 | |
| 
 | |
| 
 | |
| def add_storage_placement(storage, location_name):
 | |
|   """ Adds a storage placement for the given storage at the given location. """
 | |
|   location = ImageStorageLocation.get(name=location_name)
 | |
|   ImageStoragePlacement.create(location=location, storage=storage)
 | |
| 
 | |
| 
 | |
| def find_or_create_derived_storage(source, transformation_name, preferred_location):
 | |
|   existing = find_derived_storage(source, transformation_name)
 | |
|   if existing is not None:
 | |
|     return existing
 | |
| 
 | |
|   logger.debug('Creating storage dervied from source: %s', source.uuid)
 | |
|   trans = ImageStorageTransformation.get(name=transformation_name)
 | |
|   new_storage = create_v1_storage(preferred_location)
 | |
|   DerivedImageStorage.create(source=source, derivative=new_storage, transformation=trans)
 | |
|   return new_storage
 | |
| 
 | |
| 
 | |
| def garbage_collect_storage(storage_id_whitelist):
 | |
|   if len(storage_id_whitelist) == 0:
 | |
|     return
 | |
| 
 | |
|   def placements_query_to_paths_set(placements_query):
 | |
|     return {(placement.location.name, config.store.image_path(placement.storage.uuid))
 | |
|             for placement in placements_query}
 | |
| 
 | |
|   def orphaned_storage_query(select_base_query, candidates, group_by):
 | |
|     return (select_base_query
 | |
|             .switch(ImageStorage)
 | |
|             .join(Image, JOIN_LEFT_OUTER)
 | |
|             .switch(ImageStorage)
 | |
|             .join(DerivedImageStorage, JOIN_LEFT_OUTER,
 | |
|                   on=(ImageStorage.id == DerivedImageStorage.derivative))
 | |
|             .where(ImageStorage.id << list(candidates))
 | |
|             .group_by(*group_by)
 | |
|             .having((fn.Count(Image.id) == 0) & (fn.Count(DerivedImageStorage.id) == 0)))
 | |
| 
 | |
|   # Note: We remove the derived image storage in its own transaction as a way to reduce the
 | |
|   # time that the transaction holds on the database indicies. This could result in a derived
 | |
|   # image storage being deleted for an image storage which is later reused during this time,
 | |
|   # but since these are caches anyway, it isn't terrible and worth the tradeoff (for now).
 | |
|   logger.debug('Garbage collecting derived storage from candidates: %s', storage_id_whitelist)
 | |
|   with db_transaction():
 | |
|     # Find out which derived storages will be removed, and add them to the whitelist
 | |
|     # The comma after ImageStorage.id is VERY important, it makes it a tuple, which is a sequence
 | |
|     orphaned_from_candidates = list(orphaned_storage_query(ImageStorage.select(ImageStorage.id),
 | |
|                                                            storage_id_whitelist,
 | |
|                                                            (ImageStorage.id,)))
 | |
| 
 | |
|     if len(orphaned_from_candidates) > 0:
 | |
|       derived_to_remove = (ImageStorage
 | |
|                            .select(ImageStorage.id)
 | |
|                            .join(DerivedImageStorage,
 | |
|                                  on=(ImageStorage.id == DerivedImageStorage.derivative))
 | |
|                            .where(DerivedImageStorage.source << orphaned_from_candidates))
 | |
|       storage_id_whitelist.update({derived.id for derived in derived_to_remove})
 | |
| 
 | |
|       # Remove the dervived image storages with sources of orphaned storages
 | |
|       (DerivedImageStorage
 | |
|        .delete()
 | |
|        .where(DerivedImageStorage.source << orphaned_from_candidates)
 | |
|        .execute())
 | |
| 
 | |
|   # Note: Both of these deletes must occur in the same transaction (unfortunately) because a
 | |
|   # storage without any placement is invalid, and a placement cannot exist without a storage.
 | |
|   # TODO(jake): We might want to allow for null storages on placements, which would allow us to
 | |
|   # delete the storages, then delete the placements in a non-transaction.
 | |
|   logger.debug('Garbage collecting storages from candidates: %s', storage_id_whitelist)
 | |
|   with db_transaction():
 | |
|     # Track all of the data that should be removed from blob storage
 | |
|     placements_to_remove = list(orphaned_storage_query(ImageStoragePlacement
 | |
|                                                        .select(ImageStoragePlacement,
 | |
|                                                                ImageStorage,
 | |
|                                                                ImageStorageLocation)
 | |
|                                                        .join(ImageStorageLocation)
 | |
|                                                        .switch(ImageStoragePlacement)
 | |
|                                                        .join(ImageStorage),
 | |
|                                                        storage_id_whitelist,
 | |
|                                                        (ImageStorage, ImageStoragePlacement,
 | |
|                                                         ImageStorageLocation)))
 | |
| 
 | |
|     paths_to_remove = placements_query_to_paths_set(placements_to_remove)
 | |
| 
 | |
|     # Remove the placements for orphaned storages
 | |
|     if len(placements_to_remove) > 0:
 | |
|       placement_ids_to_remove = [placement.id for placement in placements_to_remove]
 | |
|       placements_removed = (ImageStoragePlacement
 | |
|                             .delete()
 | |
|                             .where(ImageStoragePlacement.id << placement_ids_to_remove)
 | |
|                             .execute())
 | |
|       logger.debug('Removed %s image storage placements', placements_removed)
 | |
| 
 | |
|     # Remove all orphaned storages
 | |
|     # The comma after ImageStorage.id is VERY important, it makes it a tuple, which is a sequence
 | |
|     orphaned_storages = list(orphaned_storage_query(ImageStorage.select(ImageStorage.id),
 | |
|                                                     storage_id_whitelist,
 | |
|                                                     (ImageStorage.id,)).alias('osq'))
 | |
|     if len(orphaned_storages) > 0:
 | |
|       storages_removed = (ImageStorage
 | |
|                           .delete()
 | |
|                           .where(ImageStorage.id << orphaned_storages)
 | |
|                           .execute())
 | |
|       logger.debug('Removed %s image storage records', storages_removed)
 | |
| 
 | |
|   # We are going to make the conscious decision to not delete image storage blobs inside
 | |
|   # transactions.
 | |
|   # This may end up producing garbage in s3, trading off for higher availability in the database.
 | |
|   for location_name, image_path in paths_to_remove:
 | |
|     logger.debug('Removing %s from %s', image_path, location_name)
 | |
|     config.store.remove({location_name}, image_path)
 | |
| 
 | |
| 
 | |
| def create_v1_storage(location_name):
 | |
|   storage = ImageStorage.create(cas_path=False)
 | |
|   location = ImageStorageLocation.get(name=location_name)
 | |
|   ImageStoragePlacement.create(location=location, storage=storage)
 | |
|   storage.locations = {location_name}
 | |
|   return storage
 | |
| 
 | |
| 
 | |
| def find_or_create_storage_signature(storage, signature_kind):
 | |
|   found = lookup_storage_signature(storage, signature_kind)
 | |
|   if found is None:
 | |
|     kind = ImageStorageSignatureKind.get(name=signature_kind)
 | |
|     found = ImageStorageSignature.create(storage=storage, kind=kind)
 | |
| 
 | |
|   return found
 | |
| 
 | |
| 
 | |
| def lookup_storage_signature(storage, signature_kind):
 | |
|   kind = ImageStorageSignatureKind.get(name=signature_kind)
 | |
|   try:
 | |
|     return (ImageStorageSignature
 | |
|             .select()
 | |
|             .where(ImageStorageSignature.storage == storage, ImageStorageSignature.kind == kind)
 | |
|             .get())
 | |
|   except ImageStorageSignature.DoesNotExist:
 | |
|     return None
 | |
| 
 | |
| 
 | |
| def find_derived_storage(source, transformation_name):
 | |
|   try:
 | |
|     found = (ImageStorage
 | |
|              .select(ImageStorage, DerivedImageStorage)
 | |
|              .join(DerivedImageStorage, on=(ImageStorage.id == DerivedImageStorage.derivative))
 | |
|              .join(ImageStorageTransformation)
 | |
|              .where(DerivedImageStorage.source == source,
 | |
|                     ImageStorageTransformation.name == transformation_name)
 | |
|              .get())
 | |
| 
 | |
|     found.locations = {placement.location.name for placement in found.imagestorageplacement_set}
 | |
|     return found
 | |
|   except ImageStorage.DoesNotExist:
 | |
|     return None
 | |
| 
 | |
| 
 | |
| def delete_derived_storage_by_uuid(storage_uuid):
 | |
|   try:
 | |
|     image_storage = get_storage_by_uuid(storage_uuid)
 | |
|   except InvalidImageException:
 | |
|     return
 | |
| 
 | |
|   try:
 | |
|     DerivedImageStorage.get(derivative=image_storage)
 | |
|   except DerivedImageStorage.DoesNotExist:
 | |
|     return
 | |
| 
 | |
|   image_storage.delete_instance(recursive=True)
 | |
| 
 | |
| 
 | |
| def _get_storage(query_modifier):
 | |
|   query = (ImageStoragePlacement
 | |
|            .select(ImageStoragePlacement, ImageStorage, ImageStorageLocation)
 | |
|            .join(ImageStorageLocation)
 | |
|            .switch(ImageStoragePlacement)
 | |
|            .join(ImageStorage))
 | |
| 
 | |
|   placements = list(query_modifier(query))
 | |
| 
 | |
|   if not placements:
 | |
|     raise InvalidImageException()
 | |
| 
 | |
|   found = placements[0].storage
 | |
|   found.locations = {placement.location.name for placement in placements}
 | |
| 
 | |
|   return found
 | |
| 
 | |
| 
 | |
| def get_storage_by_uuid(storage_uuid):
 | |
|   def filter_to_uuid(query):
 | |
|     return query.where(ImageStorage.uuid == storage_uuid)
 | |
| 
 | |
|   try:
 | |
|     return _get_storage(filter_to_uuid)
 | |
|   except InvalidImageException:
 | |
|     raise InvalidImageException('No storage found with uuid: %s', storage_uuid)
 | |
| 
 | |
| 
 | |
| def get_repo_storage_by_checksum(namespace, repository_name, checksum):
 | |
|   def filter_to_repo_and_checksum(query):
 | |
|     return (query
 | |
|             .join(Image)
 | |
|             .join(Repository)
 | |
|             .join(Namespace, on=(Repository.namespace_user == Namespace.id))
 | |
|             .where(Repository.name == repository_name, Namespace.username == namespace,
 | |
|                    ImageStorage.checksum == checksum))
 | |
| 
 | |
|   try:
 | |
|     return _get_storage(filter_to_repo_and_checksum)
 | |
|   except InvalidImageException:
 | |
|     raise InvalidImageException('No storage found with checksum {0}'.format(checksum))
 | |
| 
 | |
| 
 | |
| def get_layer_path(storage_record):
 | |
|   """ Returns the path in the storage engine to the layer data referenced by the storage row. """
 | |
|   store = config.store
 | |
|   if not storage_record.cas_path:
 | |
|     return store.v1_image_layer_path(storage_record.uuid)
 | |
| 
 | |
|   return store.blob_path(storage_record.checksum)
 | |
| 
 | |
| 
 | |
| 
 |