109d69abfd
This potentially fixes an issue with the v2 image field backfill. We should be safe to copy these fields over at link time so that hopefully it doesn't get skipped by the docker client. `_find_or_link_image` should NEVER be used by the registry v2 protocol.
365 lines
14 KiB
Python
365 lines
14 KiB
Python
import logging
|
|
import dateutil.parser
|
|
|
|
from peewee import JOIN_LEFT_OUTER, fn
|
|
from datetime import datetime
|
|
|
|
from data.model import DataModelException, db_transaction, _basequery, storage
|
|
from data.database import (Image, Repository, ImageStoragePlacement, Namespace, ImageStorage,
|
|
ImageStorageLocation, RepositoryPermission, db_for_update)
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def get_parent_images(namespace_name, repository_name, image_obj):
|
|
""" Returns a list of parent Image objects in chronilogical order. """
|
|
parents = image_obj.ancestors
|
|
|
|
# Ancestors are in the format /<root>/<intermediate>/.../<parent>/, with each path section
|
|
# containing the database Id of the image row.
|
|
parent_db_ids = parents.strip('/').split('/')
|
|
|
|
if parent_db_ids == ['']:
|
|
return []
|
|
|
|
def filter_to_parents(query):
|
|
return query.where(Image.id << parent_db_ids)
|
|
|
|
parents = get_repository_images_base(namespace_name, repository_name, filter_to_parents)
|
|
|
|
id_to_image = {unicode(image.id): image for image in parents}
|
|
|
|
return [id_to_image[parent_id] for parent_id in parent_db_ids]
|
|
|
|
|
|
def get_repo_image(namespace_name, repository_name, docker_image_id):
|
|
def limit_to_image_id(query):
|
|
return query.where(Image.docker_image_id == docker_image_id).limit(1)
|
|
|
|
query = _get_repository_images(namespace_name, repository_name, limit_to_image_id)
|
|
try:
|
|
return query.get()
|
|
except Image.DoesNotExist:
|
|
return None
|
|
|
|
|
|
def get_repo_image_extended(namespace_name, repository_name, docker_image_id):
|
|
def limit_to_image_id(query):
|
|
return query.where(Image.docker_image_id == docker_image_id).limit(1)
|
|
|
|
images = get_repository_images_base(namespace_name, repository_name, limit_to_image_id)
|
|
if not images:
|
|
return None
|
|
|
|
return images[0]
|
|
|
|
|
|
def _get_repository_images(namespace_name, repository_name, query_modifier):
|
|
query = (Image
|
|
.select()
|
|
.join(Repository)
|
|
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
|
|
.where(Repository.name == repository_name, Namespace.username == namespace_name))
|
|
|
|
query = query_modifier(query)
|
|
return query
|
|
|
|
|
|
def get_repository_images_base(namespace_name, repository_name, query_modifier):
|
|
query = (ImageStoragePlacement
|
|
.select(ImageStoragePlacement, Image, ImageStorage, ImageStorageLocation)
|
|
.join(ImageStorageLocation)
|
|
.switch(ImageStoragePlacement)
|
|
.join(ImageStorage, JOIN_LEFT_OUTER)
|
|
.join(Image)
|
|
.join(Repository)
|
|
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
|
|
.where(Repository.name == repository_name, Namespace.username == namespace_name))
|
|
|
|
query = query_modifier(query)
|
|
|
|
location_list = list(query)
|
|
|
|
images = {}
|
|
for location in location_list:
|
|
# Make sure we're always retrieving the same image object.
|
|
image = location.storage.image
|
|
|
|
# Set the storage to the one we got from the location, to prevent another query
|
|
image.storage = location.storage
|
|
|
|
if not image.id in images:
|
|
images[image.id] = image
|
|
image.storage.locations = set()
|
|
else:
|
|
image = images[image.id]
|
|
|
|
# Add the location to the image's locations set.
|
|
image.storage.locations.add(location.location.name)
|
|
|
|
return images.values()
|
|
|
|
|
|
def lookup_repository_images(namespace_name, repository_name, docker_image_ids):
|
|
return (Image
|
|
.select()
|
|
.join(Repository)
|
|
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
|
|
.where(Repository.name == repository_name, Namespace.username == namespace_name,
|
|
Image.docker_image_id << docker_image_ids))
|
|
|
|
|
|
def get_matching_repository_images(namespace_name, repository_name, docker_image_ids):
|
|
def modify_query(query):
|
|
return query.where(Image.docker_image_id << docker_image_ids)
|
|
|
|
return get_repository_images_base(namespace_name, repository_name, modify_query)
|
|
|
|
|
|
def get_repository_images_without_placements(repo_obj, with_ancestor=None):
|
|
query = (Image
|
|
.select(Image, ImageStorage)
|
|
.join(ImageStorage)
|
|
.where(Image.repository == repo_obj))
|
|
|
|
if with_ancestor:
|
|
ancestors_string = '%s%s/' % (with_ancestor.ancestors, with_ancestor.id)
|
|
query = query.where((Image.ancestors ** (ancestors_string + '%')) |
|
|
(Image.id == with_ancestor.id))
|
|
|
|
return query
|
|
|
|
|
|
def get_repository_images(namespace_name, repository_name):
|
|
return get_repository_images_base(namespace_name, repository_name, lambda q: q)
|
|
|
|
|
|
def get_image_by_id(namespace_name, repository_name, docker_image_id):
|
|
image = get_repo_image_extended(namespace_name, repository_name, docker_image_id)
|
|
if not image:
|
|
raise DataModelException('Unable to find image \'%s\' for repo \'%s/%s\'' %
|
|
(docker_image_id, namespace_name, repository_name))
|
|
return image
|
|
|
|
|
|
def __translate_ancestry(old_ancestry, translations, repo_obj, username, preferred_location):
|
|
if old_ancestry == '/':
|
|
return '/'
|
|
|
|
def translate_id(old_id, docker_image_id):
|
|
logger.debug('Translating id: %s', old_id)
|
|
if old_id not in translations:
|
|
image_in_repo = find_create_or_link_image(docker_image_id, repo_obj, username, translations,
|
|
preferred_location)
|
|
translations[old_id] = image_in_repo.id
|
|
return translations[old_id]
|
|
|
|
# Select all the ancestor Docker IDs in a single query.
|
|
old_ids = [int(id_str) for id_str in old_ancestry.split('/')[1:-1]]
|
|
query = Image.select(Image.id, Image.docker_image_id).where(Image.id << old_ids)
|
|
old_images = {i.id: i.docker_image_id for i in query}
|
|
|
|
# Translate the old images into new ones.
|
|
new_ids = [str(translate_id(old_id, old_images[old_id])) for old_id in old_ids]
|
|
return '/%s/' % '/'.join(new_ids)
|
|
|
|
|
|
def _find_or_link_image(existing_image, repo_obj, username, translations, preferred_location):
|
|
# TODO(jake): This call is currently recursively done under a single transaction. Can we make
|
|
# it instead be done under a set of transactions?
|
|
with db_transaction():
|
|
# Check for an existing image, under the transaction, to make sure it doesn't already exist.
|
|
repo_image = get_repo_image(repo_obj.namespace_user.username, repo_obj.name,
|
|
existing_image.docker_image_id)
|
|
if repo_image:
|
|
return repo_image
|
|
|
|
# Make sure the existing base image still exists.
|
|
try:
|
|
to_copy = Image.select().join(ImageStorage).where(Image.id == existing_image.id).get()
|
|
|
|
msg = 'Linking image to existing storage with docker id: %s and uuid: %s'
|
|
logger.debug(msg, existing_image.docker_image_id, to_copy.storage.uuid)
|
|
|
|
new_image_ancestry = __translate_ancestry(to_copy.ancestors, translations, repo_obj,
|
|
username, preferred_location)
|
|
|
|
copied_storage = to_copy.storage
|
|
copied_storage.locations = {placement.location.name
|
|
for placement in copied_storage.imagestorageplacement_set}
|
|
|
|
new_image = Image.create(docker_image_id=existing_image.docker_image_id,
|
|
repository=repo_obj, storage=copied_storage,
|
|
ancestors=new_image_ancestry,
|
|
command=existing_image.command,
|
|
created=existing_image.created,
|
|
comment=existing_image.comment,
|
|
aggregate_size=existing_image.aggregate_size)
|
|
|
|
|
|
logger.debug('Storing translation %s -> %s', existing_image.id, new_image.id)
|
|
translations[existing_image.id] = new_image.id
|
|
return new_image
|
|
except Image.DoesNotExist:
|
|
return None
|
|
|
|
|
|
def find_create_or_link_image(docker_image_id, repo_obj, username, translations,
|
|
preferred_location):
|
|
|
|
# First check for the image existing in the repository. If found, we simply return it.
|
|
repo_image = get_repo_image(repo_obj.namespace_user.username, repo_obj.name,
|
|
docker_image_id)
|
|
if repo_image:
|
|
return repo_image
|
|
|
|
# We next check to see if there is an existing storage the new image can link to.
|
|
existing_image_query = (Image
|
|
.select(Image, ImageStorage)
|
|
.distinct()
|
|
.join(ImageStorage)
|
|
.switch(Image)
|
|
.join(Repository)
|
|
.join(RepositoryPermission, JOIN_LEFT_OUTER)
|
|
.switch(Repository)
|
|
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
|
|
.where(ImageStorage.uploading == False,
|
|
Image.docker_image_id == docker_image_id))
|
|
|
|
existing_image_query = _basequery.filter_to_repos_for_user(existing_image_query, username)
|
|
|
|
# If there is an existing image, we try to translate its ancestry and copy its storage.
|
|
new_image = None
|
|
try:
|
|
logger.debug('Looking up existing image for ID: %s', docker_image_id)
|
|
existing_image = existing_image_query.get()
|
|
|
|
logger.debug('Existing image %s found for ID: %s', existing_image.id, docker_image_id)
|
|
new_image = _find_or_link_image(existing_image, repo_obj, username, translations,
|
|
preferred_location)
|
|
if new_image:
|
|
return new_image
|
|
except Image.DoesNotExist:
|
|
logger.debug('No existing image found for ID: %s', docker_image_id)
|
|
|
|
# Otherwise, create a new storage directly.
|
|
with db_transaction():
|
|
# Final check for an existing image, under the transaction.
|
|
repo_image = get_repo_image(repo_obj.namespace_user.username, repo_obj.name,
|
|
docker_image_id)
|
|
if repo_image:
|
|
return repo_image
|
|
|
|
logger.debug('Creating new storage for docker id: %s', docker_image_id)
|
|
new_storage = storage.create_storage(preferred_location)
|
|
|
|
return Image.create(docker_image_id=docker_image_id,
|
|
repository=repo_obj, storage=new_storage,
|
|
ancestors='/')
|
|
|
|
|
|
def set_image_metadata(docker_image_id, namespace_name, repository_name, created_date_str, comment,
|
|
command, v1_json_metadata, parent=None):
|
|
with db_transaction():
|
|
query = (Image
|
|
.select(Image, ImageStorage)
|
|
.join(Repository)
|
|
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
|
|
.switch(Image)
|
|
.join(ImageStorage)
|
|
.where(Repository.name == repository_name, Namespace.username == namespace_name,
|
|
Image.docker_image_id == docker_image_id))
|
|
|
|
try:
|
|
fetched = db_for_update(query).get()
|
|
except Image.DoesNotExist:
|
|
raise DataModelException('No image with specified id and repository')
|
|
|
|
# We cleanup any old checksum in case it's a retry after a fail
|
|
fetched.storage.checksum = None
|
|
now = datetime.now()
|
|
# TODO stop writing to storage when all readers are removed
|
|
fetched.storage.created = now
|
|
fetched.created = now
|
|
|
|
if created_date_str is not None:
|
|
try:
|
|
# TODO stop writing to storage fields when all readers are removed
|
|
parsed_created_time = dateutil.parser.parse(created_date_str).replace(tzinfo=None)
|
|
fetched.created = parsed_created_time
|
|
fetched.storage.created = parsed_created_time
|
|
except:
|
|
# parse raises different exceptions, so we cannot use a specific kind of handler here.
|
|
pass
|
|
|
|
# TODO stop writing to storage fields when all readers are removed
|
|
fetched.storage.comment = comment
|
|
fetched.storage.command = command
|
|
fetched.comment = comment
|
|
fetched.command = command
|
|
fetched.v1_json_metadata = v1_json_metadata
|
|
|
|
if parent:
|
|
fetched.ancestors = '%s%s/' % (parent.ancestors, parent.id)
|
|
|
|
fetched.save()
|
|
fetched.storage.save()
|
|
return fetched
|
|
|
|
|
|
def set_image_size(docker_image_id, namespace_name, repository_name, image_size, uncompressed_size):
|
|
if image_size is None:
|
|
raise DataModelException('Empty image size field')
|
|
|
|
try:
|
|
image = (Image
|
|
.select(Image, ImageStorage)
|
|
.join(Repository)
|
|
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
|
|
.switch(Image)
|
|
.join(ImageStorage, JOIN_LEFT_OUTER)
|
|
.where(Repository.name == repository_name, Namespace.username == namespace_name,
|
|
Image.docker_image_id == docker_image_id)
|
|
.get())
|
|
except Image.DoesNotExist:
|
|
raise DataModelException('No image with specified id and repository')
|
|
|
|
image.storage.image_size = image_size
|
|
image.storage.uncompressed_size = uncompressed_size
|
|
|
|
ancestors = image.ancestors.split('/')[1:-1]
|
|
if ancestors:
|
|
try:
|
|
# TODO(jschorr): Switch to this faster route once we have full ancestor aggregate_size
|
|
# parent_image = Image.get(Image.id == ancestors[-1])
|
|
# total_size = image_size + parent_image.storage.aggregate_size
|
|
ancestor_size = (ImageStorage
|
|
.select(fn.Sum(ImageStorage.image_size))
|
|
.join(Image)
|
|
.where(Image.id << ancestors)
|
|
.scalar())
|
|
|
|
# TODO stop writing to storage when all readers are removed
|
|
if ancestor_size is not None:
|
|
total_size = ancestor_size + image_size
|
|
image.storage.aggregate_size = total_size
|
|
image.aggregate_size = total_size
|
|
except Image.DoesNotExist:
|
|
pass
|
|
else:
|
|
# TODO stop writing to storage when all readers are removed
|
|
image.storage.aggregate_size = image_size
|
|
image.aggregate_size = image_size
|
|
|
|
image.storage.save()
|
|
image.save()
|
|
|
|
return image
|
|
|
|
|
|
def get_image(repo, dockerfile_id):
|
|
try:
|
|
return Image.get(Image.docker_image_id == dockerfile_id, Image.repository == repo)
|
|
except Image.DoesNotExist:
|
|
return None
|