import logging import dateutil.parser from peewee import JOIN_LEFT_OUTER, fn from datetime import datetime from data.model import (DataModelException, db_transaction, _basequery, storage, InvalidImageException, config) from data.database import (Image, Repository, ImageStoragePlacement, Namespace, ImageStorage, ImageStorageLocation, RepositoryPermission, db_for_update) logger = logging.getLogger(__name__) def get_parent_images(namespace_name, repository_name, image_obj): """ Returns a list of parent Image objects in chronilogical order. """ parents = image_obj.ancestors # Ancestors are in the format ///...//, with each path section # containing the database Id of the image row. parent_db_ids = parents.strip('/').split('/') if parent_db_ids == ['']: return [] def filter_to_parents(query): return query.where(Image.id << parent_db_ids) parents = get_repository_images_base(namespace_name, repository_name, filter_to_parents) id_to_image = {unicode(image.id): image for image in parents} return [id_to_image[parent_id] for parent_id in parent_db_ids] def get_repo_image(namespace_name, repository_name, docker_image_id): def limit_to_image_id(query): return query.where(Image.docker_image_id == docker_image_id).limit(1) query = _get_repository_images(namespace_name, repository_name, limit_to_image_id) try: return query.get() except Image.DoesNotExist: return None def get_repo_image_extended(namespace_name, repository_name, docker_image_id): def limit_to_image_id(query): return query.where(Image.docker_image_id == docker_image_id).limit(1) images = get_repository_images_base(namespace_name, repository_name, limit_to_image_id) if not images: return None return images[0] def _get_repository_images(namespace_name, repository_name, query_modifier): query = (Image .select() .join(Repository) .join(Namespace, on=(Repository.namespace_user == Namespace.id)) .where(Repository.name == repository_name, Namespace.username == namespace_name)) query = query_modifier(query) return query def get_repository_images_base(namespace_name, repository_name, query_modifier): query = (ImageStoragePlacement .select(ImageStoragePlacement, Image, ImageStorage, ImageStorageLocation) .join(ImageStorageLocation) .switch(ImageStoragePlacement) .join(ImageStorage, JOIN_LEFT_OUTER) .join(Image) .join(Repository) .join(Namespace, on=(Repository.namespace_user == Namespace.id)) .where(Repository.name == repository_name, Namespace.username == namespace_name)) query = query_modifier(query) return _translate_placements_to_images_with_locations(query) def _translate_placements_to_images_with_locations(query): location_list = list(query) images = {} for location in location_list: # Make sure we're always retrieving the same image object. image = location.storage.image # Set the storage to the one we got from the location, to prevent another query image.storage = location.storage if not image.id in images: images[image.id] = image image.storage.locations = set() else: image = images[image.id] # Add the location to the image's locations set. image.storage.locations.add(location.location.name) return images.values() def lookup_repository_images(namespace_name, repository_name, docker_image_ids): return (Image .select() .join(Repository) .join(Namespace, on=(Repository.namespace_user == Namespace.id)) .where(Repository.name == repository_name, Namespace.username == namespace_name, Image.docker_image_id << docker_image_ids)) def get_matching_repository_images(namespace_name, repository_name, docker_image_ids): def modify_query(query): return query.where(Image.docker_image_id << list(docker_image_ids)) return get_repository_images_base(namespace_name, repository_name, modify_query) def get_repository_images_without_placements(repo_obj, with_ancestor=None): query = (Image .select(Image, ImageStorage) .join(ImageStorage) .where(Image.repository == repo_obj)) if with_ancestor: ancestors_string = '%s%s/' % (with_ancestor.ancestors, with_ancestor.id) query = query.where((Image.ancestors ** (ancestors_string + '%')) | (Image.id == with_ancestor.id)) return query def get_repository_images(namespace_name, repository_name): return get_repository_images_base(namespace_name, repository_name, lambda q: q) def get_image_by_id(namespace_name, repository_name, docker_image_id): image = get_repo_image_extended(namespace_name, repository_name, docker_image_id) if not image: raise InvalidImageException('Unable to find image \'%s\' for repo \'%s/%s\'' % (docker_image_id, namespace_name, repository_name)) return image def __translate_ancestry(old_ancestry, translations, repo_obj, username, preferred_location): if old_ancestry == '/': return '/' def translate_id(old_id, docker_image_id): logger.debug('Translating id: %s', old_id) if old_id not in translations: image_in_repo = find_create_or_link_image(docker_image_id, repo_obj, username, translations, preferred_location) translations[old_id] = image_in_repo.id return translations[old_id] # Select all the ancestor Docker IDs in a single query. old_ids = [int(id_str) for id_str in old_ancestry.split('/')[1:-1]] query = Image.select(Image.id, Image.docker_image_id).where(Image.id << old_ids) old_images = {i.id: i.docker_image_id for i in query} # Translate the old images into new ones. new_ids = [str(translate_id(old_id, old_images[old_id])) for old_id in old_ids] return '/%s/' % '/'.join(new_ids) def _find_or_link_image(existing_image, repo_obj, username, translations, preferred_location): # TODO(jake): This call is currently recursively done under a single transaction. Can we make # it instead be done under a set of transactions? with db_transaction(): # Check for an existing image, under the transaction, to make sure it doesn't already exist. repo_image = get_repo_image(repo_obj.namespace_user.username, repo_obj.name, existing_image.docker_image_id) if repo_image: return repo_image # Make sure the existing base image still exists. try: to_copy = Image.select().join(ImageStorage).where(Image.id == existing_image.id).get() msg = 'Linking image to existing storage with docker id: %s and uuid: %s' logger.debug(msg, existing_image.docker_image_id, to_copy.storage.uuid) new_image_ancestry = __translate_ancestry(to_copy.ancestors, translations, repo_obj, username, preferred_location) copied_storage = to_copy.storage copied_storage.locations = {placement.location.name for placement in copied_storage.imagestorageplacement_set} new_image = Image.create(docker_image_id=existing_image.docker_image_id, repository=repo_obj, storage=copied_storage, ancestors=new_image_ancestry) logger.debug('Storing translation %s -> %s', existing_image.id, new_image.id) translations[existing_image.id] = new_image.id return new_image except Image.DoesNotExist: return None def find_create_or_link_image(docker_image_id, repo_obj, username, translations, preferred_location): # First check for the image existing in the repository. If found, we simply return it. repo_image = get_repo_image(repo_obj.namespace_user.username, repo_obj.name, docker_image_id) if repo_image: return repo_image # We next check to see if there is an existing storage the new image can link to. existing_image_query = (Image .select(Image, ImageStorage) .distinct() .join(ImageStorage) .switch(Image) .join(Repository) .join(RepositoryPermission, JOIN_LEFT_OUTER) .switch(Repository) .join(Namespace, on=(Repository.namespace_user == Namespace.id)) .where(ImageStorage.uploading == False, Image.docker_image_id == docker_image_id)) existing_image_query = _basequery.filter_to_repos_for_user(existing_image_query, username) # If there is an existing image, we try to translate its ancestry and copy its storage. new_image = None try: logger.debug('Looking up existing image for ID: %s', docker_image_id) existing_image = existing_image_query.get() logger.debug('Existing image %s found for ID: %s', existing_image.id, docker_image_id) new_image = _find_or_link_image(existing_image, repo_obj, username, translations, preferred_location) if new_image: return new_image except Image.DoesNotExist: logger.debug('No existing image found for ID: %s', docker_image_id) # Otherwise, create a new storage directly. with db_transaction(): # Final check for an existing image, under the transaction. repo_image = get_repo_image(repo_obj.namespace_user.username, repo_obj.name, docker_image_id) if repo_image: return repo_image logger.debug('Creating new storage for docker id: %s', docker_image_id) new_storage = storage.create_v1_storage(preferred_location) return Image.create(docker_image_id=docker_image_id, repository=repo_obj, storage=new_storage, ancestors='/') def set_image_metadata(docker_image_id, namespace_name, repository_name, created_date_str, comment, command, v1_json_metadata, parent=None): with db_transaction(): query = (Image .select(Image, ImageStorage) .join(Repository) .join(Namespace, on=(Repository.namespace_user == Namespace.id)) .switch(Image) .join(ImageStorage) .where(Repository.name == repository_name, Namespace.username == namespace_name, Image.docker_image_id == docker_image_id)) try: fetched = db_for_update(query).get() except Image.DoesNotExist: raise DataModelException('No image with specified id and repository') # We cleanup any old checksum in case it's a retry after a fail fetched.storage.checksum = None fetched.created = datetime.now() if created_date_str is not None: try: fetched.storage.created = dateutil.parser.parse(created_date_str).replace(tzinfo=None) except: # parse raises different exceptions, so we cannot use a specific kind of handler here. pass fetched.comment = comment fetched.command = command fetched.v1_json_metadata = v1_json_metadata if parent: fetched.ancestors = '%s%s/' % (parent.ancestors, parent.id) fetched.save() return fetched def set_image_size(docker_image_id, namespace_name, repository_name, image_size, uncompressed_size): try: image = (Image .select(Image, ImageStorage) .join(Repository) .join(Namespace, on=(Repository.namespace_user == Namespace.id)) .switch(Image) .join(ImageStorage, JOIN_LEFT_OUTER) .where(Repository.name == repository_name, Namespace.username == namespace_name, Image.docker_image_id == docker_image_id) .get()) except Image.DoesNotExist: raise DataModelException('No image with specified id and repository') image.storage.image_size = image_size image.storage.uncompressed_size = uncompressed_size ancestors = image.ancestors.split('/')[1:-1] if ancestors: try: # TODO(jschorr): Switch to this faster route once we have full ancestor aggregate_size # parent_image = Image.get(Image.id == ancestors[-1]) # total_size = image_size + parent_image.storage.aggregate_size total_size = (ImageStorage .select(fn.Sum(ImageStorage.image_size)) .join(Image) .where(Image.id << ancestors) .scalar()) + image_size image.storage.aggregate_size = total_size except Image.DoesNotExist: pass else: image.storage.aggregate_size = image_size image.storage.save() return image def get_image(repo, docker_image_id): try: return Image.get(Image.docker_image_id == docker_image_id, Image.repository == repo) except Image.DoesNotExist: return None def get_repo_image_by_storage_checksum(namespace, repository_name, storage_checksum): try: return (Image .select() .join(ImageStorage) .switch(Image) .join(Repository) .join(Namespace, on=(Namespace.id == Repository.namespace_user)) .where(Repository.name == repository_name, Namespace.username == namespace, ImageStorage.checksum == storage_checksum, ImageStorage.uploading == False) .get()) except Image.DoesNotExist: msg = 'Image with storage checksum {0} does not exist in repo {1}/{2}'.format(storage_checksum, namespace, repository_name) raise InvalidImageException(msg) def has_image_json(image): """ Returns the whether there exists a JSON definition data for the image. """ if image.v1_json_metadata: return bool(image.v1_json_metadata) store = config.store return store.exists(image.storage.locations, store.image_json_path(image.storage.uuid)) def get_image_json(image): """ Returns the JSON definition data for the image. """ if image.v1_json_metadata: return image.v1_json_metadata store = config.store return store.get_content(image.storage.locations, store.image_json_path(image.storage.uuid)) def get_image_layers(image): """ Returns a list of the full layers of an image, including itself (if specified), sorted from base image outward. """ ancestors = image.ancestors.split('/')[1:-1] image_ids = [ancestor_id for ancestor_id in ancestors if ancestor_id] image_ids.append(str(image.id)) query = (ImageStoragePlacement .select(ImageStoragePlacement, Image, ImageStorage, ImageStorageLocation) .join(ImageStorageLocation) .switch(ImageStoragePlacement) .join(ImageStorage, JOIN_LEFT_OUTER) .join(Image) .where(Image.id << image_ids)) image_list = list(_translate_placements_to_images_with_locations(query)) image_list.sort(key=lambda image: image_ids.index(str(image.id))) return image_list def synthesize_v1_image(namespace, repository_name, storage_checksum, docker_image_id, created_date_str, comment, command, v1_json_metadata, parent_docker_id): """ Find an existing image with this docker image id, and if none exists, write one with the specified metadata. """ repo = _basequery.get_existing_repository(namespace, repository_name) # Sometimes the manifest may reference an image that already exists found = get_image(repo, docker_image_id) if found is not None: # The image already exists, nothing to do return found the_bits = storage.get_repo_storage_by_checksum(namespace, repository_name, storage_checksum) ancestors = '/' if parent_docker_id is not None: parent = get_repo_image(namespace, repository_name, parent_docker_id) if parent is None: msg = 'Parent not found with docker image id {0} in repo {1}/{2}'.format(parent_docker_id, namespace, repository_name) raise InvalidImageException(msg) ancestors = '{0}{1}/'.format(parent.ancestors, parent.id) created = None if created_date_str is not None: try: created = dateutil.parser.parse(created_date_str).replace(tzinfo=None) except: # parse raises different exceptions, so we cannot use a specific kind of handler here. pass return Image.create(docker_image_id=docker_image_id, ancestors=ancestors, comment=comment, command=command, v1_json_metadata=v1_json_metadata, created=created, storage=the_bits, repository=repo)