2015-07-15 21:25:41 +00:00
|
|
|
import logging
|
|
|
|
|
2015-11-19 09:03:14 +00:00
|
|
|
from peewee import JOIN_LEFT_OUTER, fn, SQL
|
2015-07-15 21:25:41 +00:00
|
|
|
|
2016-01-05 17:14:52 +00:00
|
|
|
from data.model import (config, db_transaction, InvalidImageException, TorrentInfoDoesNotExist,
|
|
|
|
DataModelException, _basequery)
|
|
|
|
from data.database import (ImageStorage, Image, ImageStoragePlacement, ImageStorageLocation,
|
|
|
|
ImageStorageTransformation, ImageStorageSignature,
|
2016-01-08 21:38:02 +00:00
|
|
|
ImageStorageSignatureKind, Repository, Namespace, TorrentInfo)
|
2015-07-15 21:25:41 +00:00
|
|
|
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
2015-06-28 10:29:22 +00:00
|
|
|
def add_storage_placement(storage, location_name):
|
|
|
|
""" Adds a storage placement for the given storage at the given location. """
|
|
|
|
location = ImageStorageLocation.get(name=location_name)
|
|
|
|
ImageStoragePlacement.create(location=location, storage=storage)
|
|
|
|
|
|
|
|
|
2015-07-15 21:25:41 +00:00
|
|
|
def garbage_collect_storage(storage_id_whitelist):
|
|
|
|
if len(storage_id_whitelist) == 0:
|
|
|
|
return
|
|
|
|
|
|
|
|
def placements_query_to_paths_set(placements_query):
|
2015-11-16 18:51:44 +00:00
|
|
|
return {(placement.location.name, get_layer_path(placement.storage))
|
2015-07-15 21:25:41 +00:00
|
|
|
for placement in placements_query}
|
|
|
|
|
|
|
|
def orphaned_storage_query(select_base_query, candidates, group_by):
|
|
|
|
return (select_base_query
|
|
|
|
.switch(ImageStorage)
|
|
|
|
.join(Image, JOIN_LEFT_OUTER)
|
|
|
|
.where(ImageStorage.id << list(candidates))
|
|
|
|
.group_by(*group_by)
|
2015-12-16 18:41:25 +00:00
|
|
|
.having(fn.Count(Image.id) == 0))
|
2015-07-15 21:25:41 +00:00
|
|
|
|
|
|
|
# Note: Both of these deletes must occur in the same transaction (unfortunately) because a
|
|
|
|
# storage without any placement is invalid, and a placement cannot exist without a storage.
|
|
|
|
# TODO(jake): We might want to allow for null storages on placements, which would allow us to
|
|
|
|
# delete the storages, then delete the placements in a non-transaction.
|
|
|
|
logger.debug('Garbage collecting storages from candidates: %s', storage_id_whitelist)
|
|
|
|
with db_transaction():
|
|
|
|
# Track all of the data that should be removed from blob storage
|
|
|
|
placements_to_remove = list(orphaned_storage_query(ImageStoragePlacement
|
|
|
|
.select(ImageStoragePlacement,
|
|
|
|
ImageStorage,
|
|
|
|
ImageStorageLocation)
|
|
|
|
.join(ImageStorageLocation)
|
|
|
|
.switch(ImageStoragePlacement)
|
|
|
|
.join(ImageStorage),
|
|
|
|
storage_id_whitelist,
|
2015-12-22 16:35:49 +00:00
|
|
|
(ImageStorage.id, ImageStoragePlacement.id,
|
|
|
|
ImageStorageLocation.id)))
|
2015-07-15 21:25:41 +00:00
|
|
|
|
|
|
|
paths_to_remove = placements_query_to_paths_set(placements_to_remove)
|
|
|
|
|
|
|
|
# Remove the placements for orphaned storages
|
|
|
|
if len(placements_to_remove) > 0:
|
|
|
|
placement_ids_to_remove = [placement.id for placement in placements_to_remove]
|
|
|
|
placements_removed = (ImageStoragePlacement
|
|
|
|
.delete()
|
|
|
|
.where(ImageStoragePlacement.id << placement_ids_to_remove)
|
|
|
|
.execute())
|
|
|
|
logger.debug('Removed %s image storage placements', placements_removed)
|
|
|
|
|
|
|
|
# Remove all orphaned storages
|
|
|
|
# The comma after ImageStorage.id is VERY important, it makes it a tuple, which is a sequence
|
|
|
|
orphaned_storages = list(orphaned_storage_query(ImageStorage.select(ImageStorage.id),
|
|
|
|
storage_id_whitelist,
|
|
|
|
(ImageStorage.id,)).alias('osq'))
|
|
|
|
if len(orphaned_storages) > 0:
|
2016-01-12 16:43:07 +00:00
|
|
|
torrents_removed = (TorrentInfo
|
|
|
|
.delete()
|
|
|
|
.where(TorrentInfo.storage << orphaned_storages)
|
|
|
|
.execute())
|
|
|
|
logger.debug('Removed %s torrent info records', torrents_removed)
|
|
|
|
|
2015-07-15 21:25:41 +00:00
|
|
|
storages_removed = (ImageStorage
|
|
|
|
.delete()
|
|
|
|
.where(ImageStorage.id << orphaned_storages)
|
|
|
|
.execute())
|
|
|
|
logger.debug('Removed %s image storage records', storages_removed)
|
|
|
|
|
|
|
|
# We are going to make the conscious decision to not delete image storage blobs inside
|
|
|
|
# transactions.
|
|
|
|
# This may end up producing garbage in s3, trading off for higher availability in the database.
|
|
|
|
for location_name, image_path in paths_to_remove:
|
|
|
|
logger.debug('Removing %s from %s', image_path, location_name)
|
|
|
|
config.store.remove({location_name}, image_path)
|
|
|
|
|
|
|
|
|
2015-08-12 20:39:32 +00:00
|
|
|
def create_v1_storage(location_name):
|
|
|
|
storage = ImageStorage.create(cas_path=False)
|
2015-07-15 21:25:41 +00:00
|
|
|
location = ImageStorageLocation.get(name=location_name)
|
|
|
|
ImageStoragePlacement.create(location=location, storage=storage)
|
|
|
|
storage.locations = {location_name}
|
|
|
|
return storage
|
|
|
|
|
|
|
|
|
|
|
|
def find_or_create_storage_signature(storage, signature_kind):
|
|
|
|
found = lookup_storage_signature(storage, signature_kind)
|
|
|
|
if found is None:
|
|
|
|
kind = ImageStorageSignatureKind.get(name=signature_kind)
|
|
|
|
found = ImageStorageSignature.create(storage=storage, kind=kind)
|
|
|
|
|
|
|
|
return found
|
|
|
|
|
|
|
|
|
|
|
|
def lookup_storage_signature(storage, signature_kind):
|
|
|
|
kind = ImageStorageSignatureKind.get(name=signature_kind)
|
|
|
|
try:
|
|
|
|
return (ImageStorageSignature
|
2015-08-12 20:39:32 +00:00
|
|
|
.select()
|
|
|
|
.where(ImageStorageSignature.storage == storage, ImageStorageSignature.kind == kind)
|
|
|
|
.get())
|
2015-07-15 21:25:41 +00:00
|
|
|
except ImageStorageSignature.DoesNotExist:
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
2015-08-12 20:39:32 +00:00
|
|
|
def _get_storage(query_modifier):
|
|
|
|
query = (ImageStoragePlacement
|
|
|
|
.select(ImageStoragePlacement, ImageStorage, ImageStorageLocation)
|
|
|
|
.join(ImageStorageLocation)
|
|
|
|
.switch(ImageStoragePlacement)
|
|
|
|
.join(ImageStorage))
|
|
|
|
|
|
|
|
placements = list(query_modifier(query))
|
2015-07-15 21:25:41 +00:00
|
|
|
|
|
|
|
if not placements:
|
2015-08-12 20:39:32 +00:00
|
|
|
raise InvalidImageException()
|
2015-07-15 21:25:41 +00:00
|
|
|
|
|
|
|
found = placements[0].storage
|
|
|
|
found.locations = {placement.location.name for placement in placements}
|
|
|
|
|
|
|
|
return found
|
|
|
|
|
|
|
|
|
2015-12-03 21:19:22 +00:00
|
|
|
def get_storage_by_subquery(subquery):
|
|
|
|
""" Returns the storage (and its locations) for the storage id returned by the subquery. The
|
|
|
|
subquery must return at most 1 result, which is a storage ID. """
|
|
|
|
def filter_by_subquery(query):
|
|
|
|
return query.where(ImageStorage.id == subquery)
|
|
|
|
|
|
|
|
return _get_storage(filter_by_subquery)
|
|
|
|
|
|
|
|
|
2015-08-12 20:39:32 +00:00
|
|
|
def get_storage_by_uuid(storage_uuid):
|
|
|
|
def filter_to_uuid(query):
|
|
|
|
return query.where(ImageStorage.uuid == storage_uuid)
|
|
|
|
|
|
|
|
try:
|
|
|
|
return _get_storage(filter_to_uuid)
|
|
|
|
except InvalidImageException:
|
|
|
|
raise InvalidImageException('No storage found with uuid: %s', storage_uuid)
|
|
|
|
|
|
|
|
|
2015-08-18 15:53:48 +00:00
|
|
|
def get_layer_path(storage_record):
|
2015-08-13 21:14:17 +00:00
|
|
|
""" Returns the path in the storage engine to the layer data referenced by the storage row. """
|
2015-08-18 15:53:48 +00:00
|
|
|
store = config.store
|
2015-08-13 21:14:17 +00:00
|
|
|
if not storage_record.cas_path:
|
2015-11-16 18:51:44 +00:00
|
|
|
logger.debug('Serving layer from legacy v1 path')
|
2015-08-13 21:14:17 +00:00
|
|
|
return store.v1_image_layer_path(storage_record.uuid)
|
|
|
|
|
2015-11-06 23:18:29 +00:00
|
|
|
return store.blob_path(storage_record.content_checksum)
|
2015-08-13 21:14:17 +00:00
|
|
|
|
2015-11-19 09:03:14 +00:00
|
|
|
|
2015-11-06 23:18:29 +00:00
|
|
|
def lookup_repo_storages_by_content_checksum(repo, checksums):
|
2015-09-29 21:53:39 +00:00
|
|
|
""" Looks up repository storages (without placements) matching the given repository
|
|
|
|
and checksum. """
|
2015-11-19 09:03:14 +00:00
|
|
|
# There may be many duplicates of the checksums, so for performance reasons we are going
|
|
|
|
# to use a union to select just one storage with each checksum
|
|
|
|
queries = []
|
|
|
|
|
2015-11-19 17:58:06 +00:00
|
|
|
for counter, checksum in enumerate(set(checksums)):
|
|
|
|
query_alias = 'q{0}'.format(counter)
|
2015-11-19 09:03:14 +00:00
|
|
|
candidate_subq = (ImageStorage
|
2015-11-20 16:12:34 +00:00
|
|
|
.select(ImageStorage.id, ImageStorage.content_checksum, ImageStorage.image_size)
|
2015-11-19 09:03:14 +00:00
|
|
|
.join(Image)
|
|
|
|
.where(Image.repository == repo, ImageStorage.content_checksum == checksum)
|
2015-11-19 17:58:06 +00:00
|
|
|
.limit(1)
|
|
|
|
.alias(query_alias))
|
2015-11-19 09:03:14 +00:00
|
|
|
queries.append(ImageStorage
|
|
|
|
.select(SQL('*'))
|
|
|
|
.from_(candidate_subq))
|
|
|
|
|
2015-11-23 20:50:25 +00:00
|
|
|
return _reduce_as_tree(queries)
|
|
|
|
|
|
|
|
|
|
|
|
def _reduce_as_tree(queries_to_reduce):
|
|
|
|
""" This method will split a list of queries into halves recursively until we reach individual
|
|
|
|
queries, at which point it will start unioning the queries, or the already unioned subqueries.
|
|
|
|
This works around a bug in peewee SQL generation where reducing linearly generates a chain
|
|
|
|
of queries that will exceed the recursion depth limit when it has around 80 queries.
|
|
|
|
"""
|
|
|
|
mid = len(queries_to_reduce)/2
|
|
|
|
left = queries_to_reduce[:mid]
|
|
|
|
right = queries_to_reduce[mid:]
|
|
|
|
|
|
|
|
to_reduce_right = right[0]
|
|
|
|
if len(right) > 1:
|
|
|
|
to_reduce_right = _reduce_as_tree(right)
|
|
|
|
|
|
|
|
if len(left) > 1:
|
|
|
|
to_reduce_left = _reduce_as_tree(left)
|
|
|
|
elif len(left) == 1:
|
|
|
|
to_reduce_left = left[0]
|
|
|
|
else:
|
|
|
|
return to_reduce_right
|
|
|
|
|
|
|
|
return to_reduce_left.union_all(to_reduce_right)
|
2015-11-19 09:03:14 +00:00
|
|
|
|
2015-11-17 22:42:52 +00:00
|
|
|
|
2016-01-05 17:14:52 +00:00
|
|
|
def set_image_storage_metadata(docker_image_id, namespace_name, repository_name, image_size,
|
|
|
|
uncompressed_size):
|
|
|
|
""" Sets metadata that is specific to the binary storage of the data, irrespective of how it
|
|
|
|
is used in the layer tree.
|
|
|
|
"""
|
|
|
|
if image_size is None:
|
|
|
|
raise DataModelException('Empty image size field')
|
|
|
|
|
|
|
|
try:
|
2016-01-08 21:38:02 +00:00
|
|
|
image = (Image
|
|
|
|
.select(Image, ImageStorage)
|
|
|
|
.join(Repository)
|
|
|
|
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
|
|
|
|
.switch(Image)
|
|
|
|
.join(ImageStorage)
|
|
|
|
.where(Repository.name == repository_name, Namespace.username == namespace_name,
|
|
|
|
Image.docker_image_id == docker_image_id)
|
|
|
|
.get())
|
2016-01-05 17:14:52 +00:00
|
|
|
except ImageStorage.DoesNotExist:
|
|
|
|
raise InvalidImageException('No image with specified id and repository')
|
|
|
|
|
|
|
|
# We MUST do this here, it can't be done in the corresponding image call because the storage
|
|
|
|
# has not yet been pushed
|
|
|
|
image.aggregate_size = _basequery.calculate_image_aggregate_size(image.ancestors, image_size,
|
|
|
|
image.parent)
|
|
|
|
image.save()
|
|
|
|
|
|
|
|
image.storage.image_size = image_size
|
|
|
|
image.storage.uncompressed_size = uncompressed_size
|
|
|
|
image.storage.save()
|
|
|
|
return image.storage
|
|
|
|
|
|
|
|
|
2015-11-17 22:42:52 +00:00
|
|
|
def get_storage_locations(uuid):
|
|
|
|
query = (ImageStoragePlacement
|
|
|
|
.select()
|
|
|
|
.join(ImageStorageLocation)
|
|
|
|
.switch(ImageStoragePlacement)
|
2015-12-03 21:19:22 +00:00
|
|
|
.join(ImageStorage)
|
2015-11-17 22:42:52 +00:00
|
|
|
.where(ImageStorage.uuid == uuid))
|
|
|
|
|
|
|
|
return [location.location.name for location in query]
|
2015-12-30 22:19:19 +00:00
|
|
|
|
|
|
|
|
|
|
|
def save_torrent_info(storage_object, piece_length, pieces):
|
|
|
|
TorrentInfo.create(storage=storage_object, piece_length=piece_length, pieces=pieces)
|
2015-12-31 19:09:50 +00:00
|
|
|
|
|
|
|
def get_torrent_info(blob):
|
|
|
|
try:
|
|
|
|
return (TorrentInfo
|
|
|
|
.select()
|
2016-01-11 20:10:46 +00:00
|
|
|
.where(TorrentInfo.storage == blob)
|
2015-12-31 19:09:50 +00:00
|
|
|
.get())
|
|
|
|
except TorrentInfo.DoesNotExist:
|
|
|
|
raise TorrentInfoDoesNotExist
|