import logging import hashlib import json import dateutil.parser from datetime import datetime from peewee import JOIN_LEFT_OUTER, IntegrityError, fn from data.model import (DataModelException, db_transaction, _basequery, storage, InvalidImageException) from data.database import (Image, Repository, ImageStoragePlacement, Namespace, ImageStorage, ImageStorageLocation, RepositoryPermission, DerivedStorageForImage, ImageStorageTransformation) from util.canonicaljson import canonicalize logger = logging.getLogger(__name__) def get_image_with_storage(docker_image_id, storage_uuid): """ Returns the image with the given docker image ID and storage uuid or None if none. """ try: return (Image .select() .join(ImageStorage) .where(Image.docker_image_id == docker_image_id, ImageStorage.uuid == storage_uuid) .get()) except Image.DoesNotExist: return None def get_parent_images_with_placements(namespace_name, repository_name, image_obj): """ Returns a list of parent Image objects starting with the most recent parent and ending with the base layer. The images in this query will include the storage and placements. """ return _get_parent_images(namespace_name, repository_name, image_obj, include_placements=True) def get_parent_images(namespace_name, repository_name, image_obj): """ Returns a list of parent Image objects starting with the most recent parent and ending with the base layer. The images in this query will include the storage but not the placements. """ return _get_parent_images(namespace_name, repository_name, image_obj, include_placements=False) def _get_parent_images(namespace_name, repository_name, image_obj, include_placements=False): parents = image_obj.ancestors # Ancestors are in the format /<root>/<intermediate>/.../<parent>/, with each path section # containing the database Id of the image row. parent_db_ids = parents.strip('/').split('/') if parent_db_ids == ['']: return [] def filter_to_parents(query): return query.where( << parent_db_ids) if include_placements: parents = get_repository_images_base(namespace_name, repository_name, filter_to_parents) else: parents = _get_repository_images_and_storages(namespace_name, repository_name, filter_to_parents) id_to_image = {unicode( image for image in parents} try: return [id_to_image[parent_id] for parent_id in reversed(parent_db_ids)] except KeyError: raise DataModelException('Unknown parent image') def get_repo_image(namespace_name, repository_name, docker_image_id): def limit_to_image_id(query): return query.where(Image.docker_image_id == docker_image_id).limit(1) query = _get_repository_images(namespace_name, repository_name, limit_to_image_id) try: return query.get() except Image.DoesNotExist: return None def get_repo_image_extended(namespace_name, repository_name, docker_image_id): def limit_to_image_id(query): return query.where(Image.docker_image_id == docker_image_id) images = get_repository_images_base(namespace_name, repository_name, limit_to_image_id) if not images: return None return images[0] def get_repo_image_and_storage(namespace_name, repository_name, docker_image_id): def limit_to_image_id(query): return query.where(Image.docker_image_id == docker_image_id) images = _get_repository_images_and_storages(namespace_name, repository_name, limit_to_image_id) if not images: return None return images[0] def _get_repository_images_and_storages(namespace_name, repository_name, query_modifier): query = (Image .select(Image, ImageStorage) .join(ImageStorage) .switch(Image) .join(Repository) .join(Namespace, on=(Repository.namespace_user == .where( == repository_name, Namespace.username == namespace_name)) query = query_modifier(query) return query def _get_repository_images(namespace_name, repository_name, query_modifier): query = (Image .select() .join(Repository) .join(Namespace, on=(Repository.namespace_user == .where( == repository_name, Namespace.username == namespace_name)) query = query_modifier(query) return query def get_repository_images_base(namespace_name, repository_name, query_modifier): query = (ImageStoragePlacement .select(ImageStoragePlacement, Image, ImageStorage, ImageStorageLocation) .join(ImageStorageLocation) .switch(ImageStoragePlacement) .join(ImageStorage, JOIN_LEFT_OUTER) .join(Image) .join(Repository) .join(Namespace, on=(Repository.namespace_user == .where( == repository_name, Namespace.username == namespace_name)) query = query_modifier(query) return invert_placement_query_results(query) def invert_placement_query_results(placement_query): """ This method will take a query which returns placements, storages, and images, and have it return images and their storages, along with the placement set on each storage. """ location_list = list(placement_query) images = {} for location in location_list: # Make sure we're always retrieving the same image object. image = # Set the storage to the one we got from the location, to prevent another query = if not in images: images[] = image = set() else: image = images[] # Add the location to the image's locations set. return images.values() def lookup_repository_images(repo, docker_image_ids): return (Image .select(Image, ImageStorage) .join(ImageStorage) .where(Image.repository == repo, Image.docker_image_id << docker_image_ids)) def get_matching_repository_images(namespace_name, repository_name, docker_image_ids): def modify_query(query): return query.where(Image.docker_image_id << list(docker_image_ids)) return get_repository_images_base(namespace_name, repository_name, modify_query) def get_repository_images_without_placements(repo_obj, with_ancestor=None): query = (Image .select(Image, ImageStorage) .join(ImageStorage) .where(Image.repository == repo_obj)) if with_ancestor: ancestors_string = '%s%s/' % (with_ancestor.ancestors, query = query.where((Image.ancestors ** (ancestors_string + '%')) | ( == return query def get_repository_images(namespace_name, repository_name): return get_repository_images_base(namespace_name, repository_name, lambda q: q) def get_image_by_id(namespace_name, repository_name, docker_image_id): image = get_repo_image_extended(namespace_name, repository_name, docker_image_id) if not image: raise InvalidImageException('Unable to find image \'%s\' for repo \'%s/%s\'' % (docker_image_id, namespace_name, repository_name)) return image def __translate_ancestry(old_ancestry, translations, repo_obj, username, preferred_location): if old_ancestry == '/': return '/' def translate_id(old_id, docker_image_id): logger.debug('Translating id: %s', old_id) if old_id not in translations: image_in_repo = find_create_or_link_image(docker_image_id, repo_obj, username, translations, preferred_location) translations[old_id] = return translations[old_id] # Select all the ancestor Docker IDs in a single query. old_ids = [int(id_str) for id_str in old_ancestry.split('/')[1:-1]] query =, Image.docker_image_id).where( << old_ids) old_images = { i.docker_image_id for i in query} # Translate the old images into new ones. new_ids = [str(translate_id(old_id, old_images[old_id])) for old_id in old_ids] return '/%s/' % '/'.join(new_ids) def _find_or_link_image(existing_image, repo_obj, username, translations, preferred_location): # TODO(jake): This call is currently recursively done under a single transaction. Can we make # it instead be done under a set of transactions? with db_transaction(): # Check for an existing image, under the transaction, to make sure it doesn't already exist. repo_image = get_repo_image(repo_obj.namespace_user.username,, existing_image.docker_image_id) if repo_image: return repo_image # Make sure the existing base image still exists. try: to_copy = == msg = 'Linking image to existing storage with docker id: %s and uuid: %s' logger.debug(msg, existing_image.docker_image_id, new_image_ancestry = __translate_ancestry(to_copy.ancestors, translations, repo_obj, username, preferred_location) copied_storage = copied_storage.locations = { for placement in copied_storage.imagestorageplacement_set} translated_parent_id = None if new_image_ancestry != '/': translated_parent_id = int(new_image_ancestry.split('/')[-2]) new_image = Image.create(docker_image_id=existing_image.docker_image_id, repository=repo_obj, storage=copied_storage, ancestors=new_image_ancestry, command=existing_image.command, created=existing_image.created, comment=existing_image.comment, v1_json_metadata=existing_image.v1_json_metadata, aggregate_size=existing_image.aggregate_size, parent=translated_parent_id, v1_checksum=existing_image.v1_checksum) logger.debug('Storing translation %s -> %s',, translations[] = return new_image except Image.DoesNotExist: return None def find_create_or_link_image(docker_image_id, repo_obj, username, translations, preferred_location): # First check for the image existing in the repository. If found, we simply return it. repo_image = get_repo_image(repo_obj.namespace_user.username,, docker_image_id) if repo_image: return repo_image # We next check to see if there is an existing storage the new image can link to. existing_image_query = (Image .select(Image, ImageStorage) .distinct() .join(ImageStorage) .switch(Image) .join(Repository) .join(RepositoryPermission, JOIN_LEFT_OUTER) .switch(Repository) .join(Namespace, on=(Repository.namespace_user == .where(ImageStorage.uploading == False, Image.docker_image_id == docker_image_id)) existing_image_query = _basequery.filter_to_repos_for_user(existing_image_query, username) # If there is an existing image, we try to translate its ancestry and copy its storage. new_image = None try: logger.debug('Looking up existing image for ID: %s', docker_image_id) existing_image = existing_image_query.get() logger.debug('Existing image %s found for ID: %s',, docker_image_id) new_image = _find_or_link_image(existing_image, repo_obj, username, translations, preferred_location) if new_image: return new_image except Image.DoesNotExist: logger.debug('No existing image found for ID: %s', docker_image_id) # Otherwise, create a new storage directly. with db_transaction(): # Final check for an existing image, under the transaction. repo_image = get_repo_image(repo_obj.namespace_user.username,, docker_image_id) if repo_image: return repo_image logger.debug('Creating new storage for docker id: %s', docker_image_id) new_storage = storage.create_v1_storage(preferred_location) return Image.create(docker_image_id=docker_image_id, repository=repo_obj, storage=new_storage, ancestors='/') def set_image_metadata(docker_image_id, namespace_name, repository_name, created_date_str, comment, command, v1_json_metadata, parent=None): """ Sets metadata that is specific to how a binary piece of storage fits into the layer tree. """ with db_transaction(): try: fetched = (Image .select(Image, ImageStorage) .join(Repository) .join(Namespace, on=(Repository.namespace_user == .switch(Image) .join(ImageStorage) .where( == repository_name, Namespace.username == namespace_name, Image.docker_image_id == docker_image_id) .get()) except Image.DoesNotExist: raise DataModelException('No image with specified id and repository') fetched.created = if created_date_str is not None: try: fetched.created = dateutil.parser.parse(created_date_str).replace(tzinfo=None) except: # parse raises different exceptions, so we cannot use a specific kind of handler here. pass # We cleanup any old checksum in case it's a retry after a fail fetched.v1_checksum = None = None fetched.comment = comment fetched.command = command fetched.v1_json_metadata = v1_json_metadata if parent: fetched.ancestors = '%s%s/' % (parent.ancestors, fetched.parent = parent return fetched def get_image(repo, docker_image_id): try: return Image.get(Image.docker_image_id == docker_image_id, Image.repository == repo) except Image.DoesNotExist: return None def get_repo_image_by_storage_checksum(namespace, repository_name, storage_checksum): try: return (Image .select() .join(ImageStorage) .switch(Image) .join(Repository) .join(Namespace, on=( == Repository.namespace_user)) .where( == repository_name, Namespace.username == namespace, ImageStorage.content_checksum == storage_checksum, ImageStorage.uploading == False) .get()) except Image.DoesNotExist: msg = 'Image with storage checksum {0} does not exist in repo {1}/{2}'.format(storage_checksum, namespace, repository_name) raise InvalidImageException(msg) def get_image_layers(image): """ Returns a list of the full layers of an image, including itself (if specified), sorted from base image outward. """ image_ids = image.ancestor_id_list() + [] query = (ImageStoragePlacement .select(ImageStoragePlacement, Image, ImageStorage, ImageStorageLocation) .join(ImageStorageLocation) .switch(ImageStoragePlacement) .join(ImageStorage, JOIN_LEFT_OUTER) .join(Image) .where( << image_ids)) image_list = list(invert_placement_query_results(query)) image_list.sort(key=lambda img: image_ids.index( return image_list def synthesize_v1_image(repo, image_storage, docker_image_id, created_date_str, comment, command, v1_json_metadata, parent_image=None): """ Find an existing image with this docker image id, and if none exists, write one with the specified metadata. """ ancestors = '/' if parent_image is not None: ancestors = '{0}{1}/'.format(parent_image.ancestors, created = None if created_date_str is not None: try: created = dateutil.parser.parse(created_date_str).replace(tzinfo=None) except: # parse raises different exceptions, so we cannot use a specific kind of handler here. pass # Get the aggregate size for the image. aggregate_size = _basequery.calculate_image_aggregate_size(ancestors, image_storage.image_size, parent_image) try: return Image.create(docker_image_id=docker_image_id, ancestors=ancestors, comment=comment, command=command, v1_json_metadata=v1_json_metadata, created=created, storage=image_storage, repository=repo, parent=parent_image, aggregate_size=aggregate_size) except IntegrityError: return Image.get(docker_image_id=docker_image_id, repository=repo) def ensure_image_locations(*names): with db_transaction(): locations = << names) insert_names = list(names) for location in locations: insert_names.remove( if not insert_names: return data = [{'name': name} for name in insert_names] ImageStorageLocation.insert_many(data).execute() def get_max_id_for_sec_scan(): """ Gets the maximum id for a clair sec scan """ return def get_min_id_for_sec_scan(version): """ Gets the minimum id for a clair sec scan """ return (Image .select(fn.Min( .where(Image.security_indexed_engine < version) .scalar()) def total_image_count(): """ Returns the total number of images in DB """ return def get_image_pk_field(): """ Returns the primary key for Image DB model """ return def get_images_eligible_for_scan(clair_version): """ Returns a query that gives all images eligible for a clair scan """ return (get_image_with_storage_and_parent_base() .where(Image.security_indexed_engine < clair_version)) def get_image_with_storage_and_parent_base(): Parent = Image.alias() ParentImageStorage = ImageStorage.alias() return (Image .select(Image, ImageStorage, Parent, ParentImageStorage) .join(ImageStorage) .switch(Image) .join(Parent, JOIN_LEFT_OUTER, on=(Image.parent == .join(ParentImageStorage, JOIN_LEFT_OUTER, on=( == def set_secscan_status(image, indexed, version): query = (Image .select() .join(ImageStorage) .where(Image.docker_image_id == image.docker_image_id, ImageStorage.uuid == ids_to_update = [ for row in query] if not ids_to_update: return False return (Image .update(security_indexed=indexed, security_indexed_engine=version) .where( << ids_to_update) .where((Image.security_indexed_engine != version) | (Image.security_indexed != indexed)) .execute()) != 0 def _get_uniqueness_hash(varying_metadata): if not varying_metadata: return None return hashlib.sha256(json.dumps(canonicalize(varying_metadata))).hexdigest() def find_or_create_derived_storage(source_image, transformation_name, preferred_location, varying_metadata=None): existing = find_derived_storage_for_image(source_image, transformation_name, varying_metadata) if existing is not None: return existing uniqueness_hash = _get_uniqueness_hash(varying_metadata) trans = ImageStorageTransformation.get(name=transformation_name) new_storage = storage.create_v1_storage(preferred_location) try: DerivedStorageForImage.create(source_image=source_image, derivative=new_storage, transformation=trans, uniqueness_hash=uniqueness_hash) except IntegrityError: # Storage was created while this method executed. Just return the existing. new_storage.delete_instance(recursive=True) return find_derived_storage_for_image(source_image, transformation_name, varying_metadata) return new_storage def find_derived_storage_for_image(source_image, transformation_name, varying_metadata=None): uniqueness_hash = _get_uniqueness_hash(varying_metadata) try: found = (ImageStorage .select(ImageStorage, DerivedStorageForImage) .join(DerivedStorageForImage) .join(ImageStorageTransformation) .where(DerivedStorageForImage.source_image == source_image, == transformation_name, DerivedStorageForImage.uniqueness_hash == uniqueness_hash) .get()) found.locations = { for placement in found.imagestorageplacement_set} return found except ImageStorage.DoesNotExist: return None def delete_derived_storage_by_uuid(storage_uuid): try: image_storage = storage.get_storage_by_uuid(storage_uuid) except InvalidImageException: return try: DerivedStorageForImage.get(derivative=image_storage) except DerivedStorageForImage.DoesNotExist: return image_storage.delete_instance(recursive=True)