Merge branch 'perf'
This commit is contained in:
commit
e7cbda86f7
7 changed files with 219 additions and 95 deletions
|
@ -35,6 +35,36 @@ class CallableProxy(Proxy):
|
||||||
raise AttributeError('Cannot use uninitialized Proxy.')
|
raise AttributeError('Cannot use uninitialized Proxy.')
|
||||||
return self.obj(*args, **kwargs)
|
return self.obj(*args, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
class CloseForLongOperation(object):
|
||||||
|
""" Helper object which disconnects the database then reconnects after the nested operation
|
||||||
|
completes.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, config_object):
|
||||||
|
self.config_object = config_object
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
close_db_filter(None)
|
||||||
|
|
||||||
|
def __exit__(self, type, value, traceback):
|
||||||
|
# Note: Nothing to do. The next SQL call will reconnect automatically.
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class UseThenDisconnect(object):
|
||||||
|
""" Helper object for conducting work with a database and then tearing it down. """
|
||||||
|
|
||||||
|
def __init__(self, config_object):
|
||||||
|
self.config_object = config_object
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
configure(self.config_object)
|
||||||
|
|
||||||
|
def __exit__(self, type, value, traceback):
|
||||||
|
close_db_filter(None)
|
||||||
|
|
||||||
|
|
||||||
db = Proxy()
|
db = Proxy()
|
||||||
read_slave = Proxy()
|
read_slave = Proxy()
|
||||||
db_random_func = CallableProxy()
|
db_random_func = CallableProxy()
|
||||||
|
@ -56,6 +86,7 @@ def _db_from_url(url, db_kwargs):
|
||||||
|
|
||||||
|
|
||||||
def configure(config_object):
|
def configure(config_object):
|
||||||
|
logger.debug('Configuring database')
|
||||||
db_kwargs = dict(config_object['DB_CONNECTION_ARGS'])
|
db_kwargs = dict(config_object['DB_CONNECTION_ARGS'])
|
||||||
write_db_uri = config_object['DB_URI']
|
write_db_uri = config_object['DB_URI']
|
||||||
db.initialize(_db_from_url(write_db_uri, db_kwargs))
|
db.initialize(_db_from_url(write_db_uri, db_kwargs))
|
||||||
|
|
|
@ -1035,16 +1035,26 @@ def get_repository(namespace_name, repository_name):
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
def get_repo_image(namespace_name, repository_name, image_id):
|
def get_repo_image(namespace_name, repository_name, docker_image_id):
|
||||||
def limit_to_image_id(query):
|
def limit_to_image_id(query):
|
||||||
return query.where(Image.docker_image_id == image_id)
|
return query.where(Image.docker_image_id == docker_image_id).limit(1)
|
||||||
|
|
||||||
|
query = _get_repository_images(namespace_name, repository_name, limit_to_image_id)
|
||||||
|
try:
|
||||||
|
return query.get()
|
||||||
|
except Image.DoesNotExist:
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def get_repo_image_extended(namespace_name, repository_name, docker_image_id):
|
||||||
|
def limit_to_image_id(query):
|
||||||
|
return query.where(Image.docker_image_id == docker_image_id).limit(1)
|
||||||
|
|
||||||
images = _get_repository_images_base(namespace_name, repository_name, limit_to_image_id)
|
images = _get_repository_images_base(namespace_name, repository_name, limit_to_image_id)
|
||||||
if not images:
|
if not images:
|
||||||
return None
|
return None
|
||||||
else:
|
|
||||||
return images[0]
|
|
||||||
|
|
||||||
|
return images[0]
|
||||||
|
|
||||||
def repository_is_public(namespace_name, repository_name):
|
def repository_is_public(namespace_name, repository_name):
|
||||||
try:
|
try:
|
||||||
|
@ -1137,20 +1147,21 @@ def __translate_ancestry(old_ancestry, translations, repository, username, prefe
|
||||||
if old_ancestry == '/':
|
if old_ancestry == '/':
|
||||||
return '/'
|
return '/'
|
||||||
|
|
||||||
def translate_id(old_id):
|
def translate_id(old_id, docker_image_id):
|
||||||
logger.debug('Translating id: %s', old_id)
|
logger.debug('Translating id: %s', old_id)
|
||||||
if old_id not in translations:
|
if old_id not in translations:
|
||||||
# Figure out which docker_image_id the old id refers to, then find a
|
image_in_repo = find_create_or_link_image(docker_image_id, repository, username,
|
||||||
# a local one
|
|
||||||
old = Image.select(Image.docker_image_id).where(Image.id == old_id).get()
|
|
||||||
image_in_repo = find_create_or_link_image(old.docker_image_id, repository, username,
|
|
||||||
translations, preferred_location)
|
translations, preferred_location)
|
||||||
translations[old_id] = image_in_repo.id
|
translations[old_id] = image_in_repo.id
|
||||||
|
|
||||||
return translations[old_id]
|
return translations[old_id]
|
||||||
|
|
||||||
|
# Select all the ancestor Docker IDs in a single query.
|
||||||
old_ids = [int(id_str) for id_str in old_ancestry.split('/')[1:-1]]
|
old_ids = [int(id_str) for id_str in old_ancestry.split('/')[1:-1]]
|
||||||
new_ids = [str(translate_id(old_id)) for old_id in old_ids]
|
query = Image.select(Image.id, Image.docker_image_id).where(Image.id << old_ids)
|
||||||
|
old_images = {i.id: i.docker_image_id for i in query}
|
||||||
|
|
||||||
|
# Translate the old images into new ones.
|
||||||
|
new_ids = [str(translate_id(old_id, old_images[old_id])) for old_id in old_ids]
|
||||||
return '/%s/' % '/'.join(new_ids)
|
return '/%s/' % '/'.join(new_ids)
|
||||||
|
|
||||||
|
|
||||||
|
@ -1162,36 +1173,22 @@ def _create_storage(location_name):
|
||||||
return storage
|
return storage
|
||||||
|
|
||||||
|
|
||||||
def find_create_or_link_image(docker_image_id, repository, username, translations,
|
def _find_or_link_image(existing_image, repository, username, translations, preferred_location):
|
||||||
preferred_location):
|
# TODO(jake): This call is currently recursively done under a single transaction. Can we make
|
||||||
|
# it instead be done under a set of transactions?
|
||||||
with config.app_config['DB_TRANSACTION_FACTORY'](db):
|
with config.app_config['DB_TRANSACTION_FACTORY'](db):
|
||||||
|
# Check for an existing image, under the transaction, to make sure it doesn't already exist.
|
||||||
repo_image = get_repo_image(repository.namespace_user.username, repository.name,
|
repo_image = get_repo_image(repository.namespace_user.username, repository.name,
|
||||||
docker_image_id)
|
existing_image.docker_image_id)
|
||||||
if repo_image:
|
if repo_image:
|
||||||
return repo_image
|
return repo_image
|
||||||
|
|
||||||
query = (Image
|
# Make sure the existing base image still exists.
|
||||||
.select(Image, ImageStorage)
|
|
||||||
.distinct()
|
|
||||||
.join(ImageStorage)
|
|
||||||
.switch(Image)
|
|
||||||
.join(Repository)
|
|
||||||
.join(Visibility)
|
|
||||||
.switch(Repository)
|
|
||||||
.join(RepositoryPermission, JOIN_LEFT_OUTER)
|
|
||||||
.switch(Repository)
|
|
||||||
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
|
|
||||||
.where(ImageStorage.uploading == False))
|
|
||||||
|
|
||||||
query = (_filter_to_repos_for_user(query, username)
|
|
||||||
.where(Image.docker_image_id == docker_image_id))
|
|
||||||
|
|
||||||
new_image_ancestry = '/'
|
|
||||||
origin_image_id = None
|
|
||||||
try:
|
try:
|
||||||
to_copy = query.get()
|
to_copy = Image.select().join(ImageStorage).where(Image.id == existing_image.id).get()
|
||||||
|
|
||||||
msg = 'Linking image to existing storage with docker id: %s and uuid: %s'
|
msg = 'Linking image to existing storage with docker id: %s and uuid: %s'
|
||||||
logger.debug(msg, docker_image_id, to_copy.storage.uuid)
|
logger.debug(msg, existing_image.docker_image_id, to_copy.storage.uuid)
|
||||||
|
|
||||||
new_image_ancestry = __translate_ancestry(to_copy.ancestors, translations, repository,
|
new_image_ancestry = __translate_ancestry(to_copy.ancestors, translations, repository,
|
||||||
username, preferred_location)
|
username, preferred_location)
|
||||||
|
@ -1199,25 +1196,73 @@ def find_create_or_link_image(docker_image_id, repository, username, translation
|
||||||
storage = to_copy.storage
|
storage = to_copy.storage
|
||||||
storage.locations = {placement.location.name
|
storage.locations = {placement.location.name
|
||||||
for placement in storage.imagestorageplacement_set}
|
for placement in storage.imagestorageplacement_set}
|
||||||
origin_image_id = to_copy.id
|
|
||||||
|
new_image = Image.create(docker_image_id=existing_image.docker_image_id,
|
||||||
|
repository=repository, storage=storage,
|
||||||
|
ancestors=new_image_ancestry)
|
||||||
|
|
||||||
|
logger.debug('Storing translation %s -> %s', existing_image.id, new_image.id)
|
||||||
|
translations[existing_image.id] = new_image.id
|
||||||
|
return new_image
|
||||||
except Image.DoesNotExist:
|
except Image.DoesNotExist:
|
||||||
logger.debug('Creating new storage for docker id: %s', docker_image_id)
|
return None
|
||||||
storage = _create_storage(preferred_location)
|
|
||||||
|
|
||||||
logger.debug('Storage locations: %s', storage.locations)
|
|
||||||
|
|
||||||
new_image = Image.create(docker_image_id=docker_image_id,
|
|
||||||
repository=repository, storage=storage,
|
|
||||||
ancestors=new_image_ancestry)
|
|
||||||
|
|
||||||
logger.debug('new_image storage locations: %s', new_image.storage.locations)
|
|
||||||
|
|
||||||
|
|
||||||
if origin_image_id:
|
def find_create_or_link_image(docker_image_id, repository, username, translations,
|
||||||
logger.debug('Storing translation %s -> %s', origin_image_id, new_image.id)
|
preferred_location):
|
||||||
translations[origin_image_id] = new_image.id
|
|
||||||
|
|
||||||
return new_image
|
# First check for the image existing in the repository. If found, we simply return it.
|
||||||
|
repo_image = get_repo_image(repository.namespace_user.username, repository.name,
|
||||||
|
docker_image_id)
|
||||||
|
if repo_image:
|
||||||
|
return repo_image
|
||||||
|
|
||||||
|
# We next check to see if there is an existing storage the new image can link to.
|
||||||
|
existing_image_query = (Image
|
||||||
|
.select(Image, ImageStorage)
|
||||||
|
.distinct()
|
||||||
|
.join(ImageStorage)
|
||||||
|
.switch(Image)
|
||||||
|
.join(Repository)
|
||||||
|
.join(Visibility)
|
||||||
|
.switch(Repository)
|
||||||
|
.join(RepositoryPermission, JOIN_LEFT_OUTER)
|
||||||
|
.switch(Repository)
|
||||||
|
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
|
||||||
|
.where(ImageStorage.uploading == False))
|
||||||
|
|
||||||
|
existing_image_query = (_filter_to_repos_for_user(existing_image_query, username)
|
||||||
|
.where(Image.docker_image_id == docker_image_id))
|
||||||
|
|
||||||
|
# If there is an existing image, we try to translate its ancestry and copy its storage.
|
||||||
|
new_image = None
|
||||||
|
try:
|
||||||
|
logger.debug('Looking up existing image for ID: %s', docker_image_id)
|
||||||
|
existing_image = existing_image_query.get()
|
||||||
|
|
||||||
|
logger.debug('Existing image %s found for ID: %s', existing_image.id, docker_image_id)
|
||||||
|
new_image = _find_or_link_image(existing_image, repository, username, translations,
|
||||||
|
preferred_location)
|
||||||
|
if new_image:
|
||||||
|
return new_image
|
||||||
|
except Image.DoesNotExist:
|
||||||
|
logger.debug('No existing image found for ID: %s', docker_image_id)
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Otherwise, create a new storage directly.
|
||||||
|
with config.app_config['DB_TRANSACTION_FACTORY'](db):
|
||||||
|
# Final check for an existing image, under the transaction.
|
||||||
|
repo_image = get_repo_image(repository.namespace_user.username, repository.name,
|
||||||
|
docker_image_id)
|
||||||
|
if repo_image:
|
||||||
|
return repo_image
|
||||||
|
|
||||||
|
logger.debug('Creating new storage for docker id: %s', docker_image_id)
|
||||||
|
storage = _create_storage(preferred_location)
|
||||||
|
|
||||||
|
return Image.create(docker_image_id=docker_image_id,
|
||||||
|
repository=repository, storage=storage,
|
||||||
|
ancestors='/')
|
||||||
|
|
||||||
|
|
||||||
def find_or_create_derived_storage(source, transformation_name, preferred_location):
|
def find_or_create_derived_storage(source, transformation_name, preferred_location):
|
||||||
|
@ -1331,6 +1376,15 @@ def set_image_metadata(docker_image_id, namespace_name, repository_name, created
|
||||||
fetched.storage.save()
|
fetched.storage.save()
|
||||||
return fetched
|
return fetched
|
||||||
|
|
||||||
|
def _get_repository_images(namespace_name, repository_name, query_modifier):
|
||||||
|
query = (Image
|
||||||
|
.select()
|
||||||
|
.join(Repository)
|
||||||
|
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
|
||||||
|
.where(Repository.name == repository_name, Namespace.username == namespace_name))
|
||||||
|
|
||||||
|
query = query_modifier(query)
|
||||||
|
return query
|
||||||
|
|
||||||
def _get_repository_images_base(namespace_name, repository_name, query_modifier):
|
def _get_repository_images_base(namespace_name, repository_name, query_modifier):
|
||||||
query = (ImageStoragePlacement
|
query = (ImageStoragePlacement
|
||||||
|
@ -1367,6 +1421,20 @@ def _get_repository_images_base(namespace_name, repository_name, query_modifier)
|
||||||
return images.values()
|
return images.values()
|
||||||
|
|
||||||
|
|
||||||
|
def lookup_repository_images(namespace_name, repository_name, docker_image_ids):
|
||||||
|
return (Image
|
||||||
|
.select()
|
||||||
|
.join(Repository)
|
||||||
|
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
|
||||||
|
.where(Repository.name == repository_name, Namespace.username == namespace_name,
|
||||||
|
Image.docker_image_id << docker_image_ids))
|
||||||
|
|
||||||
|
def get_matching_repository_images(namespace_name, repository_name, docker_image_ids):
|
||||||
|
def modify_query(q):
|
||||||
|
return q.where(Image.docker_image_id << docker_image_ids)
|
||||||
|
|
||||||
|
return _get_repository_images_base(namespace_name, repository_name, modify_query)
|
||||||
|
|
||||||
def get_repository_images(namespace_name, repository_name):
|
def get_repository_images(namespace_name, repository_name):
|
||||||
return _get_repository_images_base(namespace_name, repository_name, lambda q: q)
|
return _get_repository_images_base(namespace_name, repository_name, lambda q: q)
|
||||||
|
|
||||||
|
@ -1385,6 +1453,9 @@ def garbage_collect_repository(namespace_name, repository_name):
|
||||||
storage_id_whitelist = {}
|
storage_id_whitelist = {}
|
||||||
|
|
||||||
with config.app_config['DB_TRANSACTION_FACTORY'](db):
|
with config.app_config['DB_TRANSACTION_FACTORY'](db):
|
||||||
|
# TODO (jake): We could probably select this and all the images in a single query using
|
||||||
|
# a different kind of join.
|
||||||
|
|
||||||
# Get a list of all images used by tags in the repository
|
# Get a list of all images used by tags in the repository
|
||||||
tag_query = (RepositoryTag
|
tag_query = (RepositoryTag
|
||||||
.select(RepositoryTag, Image, ImageStorage)
|
.select(RepositoryTag, Image, ImageStorage)
|
||||||
|
@ -1403,7 +1474,7 @@ def garbage_collect_repository(namespace_name, repository_name):
|
||||||
referenced_anscestors = referenced_anscestors.union(set(ancestor_list))
|
referenced_anscestors = referenced_anscestors.union(set(ancestor_list))
|
||||||
referenced_anscestors.add(tag.image.id)
|
referenced_anscestors.add(tag.image.id)
|
||||||
|
|
||||||
all_repo_images = get_repository_images(namespace_name, repository_name)
|
all_repo_images = _get_repository_images(namespace_name, repository_name, lambda q: q)
|
||||||
all_images = {int(img.id): img for img in all_repo_images}
|
all_images = {int(img.id): img for img in all_repo_images}
|
||||||
to_remove = set(all_images.keys()).difference(referenced_anscestors)
|
to_remove = set(all_images.keys()).difference(referenced_anscestors)
|
||||||
|
|
||||||
|
@ -1461,8 +1532,8 @@ def garbage_collect_storage(storage_id_whitelist):
|
||||||
|
|
||||||
# Note: Both of these deletes must occur in the same transaction (unfortunately) because a
|
# Note: Both of these deletes must occur in the same transaction (unfortunately) because a
|
||||||
# storage without any placement is invalid, and a placement cannot exist without a storage.
|
# storage without any placement is invalid, and a placement cannot exist without a storage.
|
||||||
# TODO: We might want to allow for null storages on placements, which would allow us to delete
|
# TODO(jake): We might want to allow for null storages on placements, which would allow us to
|
||||||
# the storages, then delete the placements in a non-transaction.
|
# delete the storages, then delete the placements in a non-transaction.
|
||||||
logger.debug('Garbage collecting storages from candidates: %s', storage_id_whitelist)
|
logger.debug('Garbage collecting storages from candidates: %s', storage_id_whitelist)
|
||||||
with config.app_config['DB_TRANSACTION_FACTORY'](db):
|
with config.app_config['DB_TRANSACTION_FACTORY'](db):
|
||||||
# Track all of the data that should be removed from blob storage
|
# Track all of the data that should be removed from blob storage
|
||||||
|
@ -1516,7 +1587,7 @@ def get_tag_image(namespace_name, repository_name, tag_name):
|
||||||
|
|
||||||
|
|
||||||
def get_image_by_id(namespace_name, repository_name, docker_image_id):
|
def get_image_by_id(namespace_name, repository_name, docker_image_id):
|
||||||
image = get_repo_image(namespace_name, repository_name, docker_image_id)
|
image = get_repo_image_extended(namespace_name, repository_name, docker_image_id)
|
||||||
if not image:
|
if not image:
|
||||||
raise DataModelException('Unable to find image \'%s\' for repo \'%s/%s\'' %
|
raise DataModelException('Unable to find image \'%s\' for repo \'%s/%s\'' %
|
||||||
(docker_image_id, namespace_name, repository_name))
|
(docker_image_id, namespace_name, repository_name))
|
||||||
|
@ -1747,11 +1818,10 @@ def get_repository_delegate_tokens(namespace_name, repository_name):
|
||||||
|
|
||||||
def get_repo_delegate_token(namespace_name, repository_name, code):
|
def get_repo_delegate_token(namespace_name, repository_name, code):
|
||||||
repo_query = get_repository_delegate_tokens(namespace_name, repository_name)
|
repo_query = get_repository_delegate_tokens(namespace_name, repository_name)
|
||||||
found = list(repo_query.where(AccessToken.code == code))
|
|
||||||
|
|
||||||
if found:
|
try:
|
||||||
return found[0]
|
return repo_query.where(AccessToken.code == code).get()
|
||||||
else:
|
except AccessToken.DoesNotExist:
|
||||||
raise InvalidTokenException('Unable to find token with code: %s' % code)
|
raise InvalidTokenException('Unable to find token with code: %s' % code)
|
||||||
|
|
||||||
|
|
||||||
|
@ -1920,9 +1990,9 @@ def list_logs(start_time, end_time, performer=None, repository=None, namespace=N
|
||||||
if namespace:
|
if namespace:
|
||||||
joined = joined.where(User.username == namespace)
|
joined = joined.where(User.username == namespace)
|
||||||
|
|
||||||
return joined.where(
|
return list(joined.where(
|
||||||
LogEntry.datetime >= start_time,
|
LogEntry.datetime >= start_time,
|
||||||
LogEntry.datetime < end_time).order_by(LogEntry.datetime.desc())
|
LogEntry.datetime < end_time).order_by(LogEntry.datetime.desc()))
|
||||||
|
|
||||||
|
|
||||||
def log_action(kind_name, user_or_organization_name, performer=None,
|
def log_action(kind_name, user_or_organization_name, performer=None,
|
||||||
|
|
|
@ -73,7 +73,7 @@ class RepositoryImage(RepositoryParamResource):
|
||||||
@nickname('getImage')
|
@nickname('getImage')
|
||||||
def get(self, namespace, repository, image_id):
|
def get(self, namespace, repository, image_id):
|
||||||
""" Get the information available for the specified image. """
|
""" Get the information available for the specified image. """
|
||||||
image = model.get_repo_image(namespace, repository, image_id)
|
image = model.get_repo_image_extended(namespace, repository, image_id)
|
||||||
if not image:
|
if not image:
|
||||||
raise NotFound()
|
raise NotFound()
|
||||||
|
|
||||||
|
@ -94,7 +94,7 @@ class RepositoryImageChanges(RepositoryParamResource):
|
||||||
@nickname('getImageChanges')
|
@nickname('getImageChanges')
|
||||||
def get(self, namespace, repository, image_id):
|
def get(self, namespace, repository, image_id):
|
||||||
""" Get the list of changes for the specified image. """
|
""" Get the list of changes for the specified image. """
|
||||||
image = model.get_repo_image(namespace, repository, image_id)
|
image = model.get_repo_image_extended(namespace, repository, image_id)
|
||||||
|
|
||||||
if not image:
|
if not image:
|
||||||
raise NotFound()
|
raise NotFound()
|
||||||
|
|
|
@ -223,13 +223,20 @@ def create_repository(namespace, repository):
|
||||||
repo = model.create_repository(namespace, repository,
|
repo = model.create_repository(namespace, repository,
|
||||||
get_authenticated_user())
|
get_authenticated_user())
|
||||||
|
|
||||||
profile.debug('Determining added images')
|
profile.debug('Determining already added images')
|
||||||
added_images = OrderedDict([(desc['id'], desc)
|
added_images = OrderedDict([(desc['id'], desc) for desc in image_descriptions])
|
||||||
for desc in image_descriptions])
|
|
||||||
new_repo_images = dict(added_images)
|
new_repo_images = dict(added_images)
|
||||||
|
|
||||||
for existing in model.get_repository_images(namespace, repository):
|
# Optimization: Lookup any existing images in the repository with matching docker IDs and
|
||||||
if existing.docker_image_id in new_repo_images:
|
# remove them from the added dict, so we don't need to look them up one-by-one.
|
||||||
|
def chunks(l, n):
|
||||||
|
for i in xrange(0, len(l), n):
|
||||||
|
yield l[i:i+n]
|
||||||
|
|
||||||
|
# Note: We do this in chunks in an effort to not hit the SQL query size limit.
|
||||||
|
for chunk in chunks(new_repo_images.keys(), 50):
|
||||||
|
existing_images = model.lookup_repository_images(namespace, repository, chunk)
|
||||||
|
for existing in existing_images:
|
||||||
added_images.pop(existing.docker_image_id)
|
added_images.pop(existing.docker_image_id)
|
||||||
|
|
||||||
profile.debug('Creating/Linking necessary images')
|
profile.debug('Creating/Linking necessary images')
|
||||||
|
@ -241,9 +248,8 @@ def create_repository(namespace, repository):
|
||||||
|
|
||||||
|
|
||||||
profile.debug('Created images')
|
profile.debug('Created images')
|
||||||
response = make_response('Created', 201)
|
|
||||||
track_and_log('push_repo', repo)
|
track_and_log('push_repo', repo)
|
||||||
return response
|
return make_response('Created', 201)
|
||||||
|
|
||||||
|
|
||||||
@index.route('/repositories/<path:repository>/images', methods=['PUT'])
|
@index.route('/repositories/<path:repository>/images', methods=['PUT'])
|
||||||
|
|
|
@ -7,13 +7,13 @@ from functools import wraps
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from time import time
|
from time import time
|
||||||
|
|
||||||
from app import storage as store, image_diff_queue
|
from app import storage as store, image_diff_queue, app
|
||||||
from auth.auth import process_auth, extract_namespace_repo_from_session
|
from auth.auth import process_auth, extract_namespace_repo_from_session
|
||||||
from util import checksums, changes
|
from util import checksums, changes
|
||||||
from util.http import abort, exact_abort
|
from util.http import abort, exact_abort
|
||||||
from auth.permissions import (ReadRepositoryPermission,
|
from auth.permissions import (ReadRepositoryPermission,
|
||||||
ModifyRepositoryPermission)
|
ModifyRepositoryPermission)
|
||||||
from data import model
|
from data import model, database
|
||||||
from util import gzipstream
|
from util import gzipstream
|
||||||
|
|
||||||
|
|
||||||
|
@ -59,7 +59,7 @@ def require_completion(f):
|
||||||
@wraps(f)
|
@wraps(f)
|
||||||
def wrapper(namespace, repository, *args, **kwargs):
|
def wrapper(namespace, repository, *args, **kwargs):
|
||||||
image_id = kwargs['image_id']
|
image_id = kwargs['image_id']
|
||||||
repo_image = model.get_repo_image(namespace, repository, image_id)
|
repo_image = model.get_repo_image_extended(namespace, repository, image_id)
|
||||||
if image_is_uploading(repo_image):
|
if image_is_uploading(repo_image):
|
||||||
abort(400, 'Image %(image_id)s is being uploaded, retry later',
|
abort(400, 'Image %(image_id)s is being uploaded, retry later',
|
||||||
issue='upload-in-progress', image_id=kwargs['image_id'])
|
issue='upload-in-progress', image_id=kwargs['image_id'])
|
||||||
|
@ -103,7 +103,7 @@ def head_image_layer(namespace, repository, image_id, headers):
|
||||||
profile.debug('Checking repo permissions')
|
profile.debug('Checking repo permissions')
|
||||||
if permission.can() or model.repository_is_public(namespace, repository):
|
if permission.can() or model.repository_is_public(namespace, repository):
|
||||||
profile.debug('Looking up repo image')
|
profile.debug('Looking up repo image')
|
||||||
repo_image = model.get_repo_image(namespace, repository, image_id)
|
repo_image = model.get_repo_image_extended(namespace, repository, image_id)
|
||||||
if not repo_image:
|
if not repo_image:
|
||||||
profile.debug('Image not found')
|
profile.debug('Image not found')
|
||||||
abort(404, 'Image %(image_id)s not found', issue='unknown-image',
|
abort(404, 'Image %(image_id)s not found', issue='unknown-image',
|
||||||
|
@ -136,7 +136,7 @@ def get_image_layer(namespace, repository, image_id, headers):
|
||||||
profile.debug('Checking repo permissions')
|
profile.debug('Checking repo permissions')
|
||||||
if permission.can() or model.repository_is_public(namespace, repository):
|
if permission.can() or model.repository_is_public(namespace, repository):
|
||||||
profile.debug('Looking up repo image')
|
profile.debug('Looking up repo image')
|
||||||
repo_image = model.get_repo_image(namespace, repository, image_id)
|
repo_image = model.get_repo_image_extended(namespace, repository, image_id)
|
||||||
|
|
||||||
profile.debug('Looking up the layer path')
|
profile.debug('Looking up the layer path')
|
||||||
try:
|
try:
|
||||||
|
@ -151,6 +151,10 @@ def get_image_layer(namespace, repository, image_id, headers):
|
||||||
return resp
|
return resp
|
||||||
|
|
||||||
profile.debug('Streaming layer data')
|
profile.debug('Streaming layer data')
|
||||||
|
|
||||||
|
# Close the database handle here for this process before we send the long download.
|
||||||
|
database.close_db_filter(None)
|
||||||
|
|
||||||
return Response(store.stream_read(repo_image.storage.locations, path), headers=headers)
|
return Response(store.stream_read(repo_image.storage.locations, path), headers=headers)
|
||||||
except (IOError, AttributeError):
|
except (IOError, AttributeError):
|
||||||
profile.debug('Image not found')
|
profile.debug('Image not found')
|
||||||
|
@ -170,7 +174,7 @@ def put_image_layer(namespace, repository, image_id):
|
||||||
abort(403)
|
abort(403)
|
||||||
|
|
||||||
profile.debug('Retrieving image')
|
profile.debug('Retrieving image')
|
||||||
repo_image = model.get_repo_image(namespace, repository, image_id)
|
repo_image = model.get_repo_image_extended(namespace, repository, image_id)
|
||||||
try:
|
try:
|
||||||
profile.debug('Retrieving image data')
|
profile.debug('Retrieving image data')
|
||||||
uuid = repo_image.storage.uuid
|
uuid = repo_image.storage.uuid
|
||||||
|
@ -213,7 +217,8 @@ def put_image_layer(namespace, repository, image_id):
|
||||||
sr.add_handler(sum_hndlr)
|
sr.add_handler(sum_hndlr)
|
||||||
|
|
||||||
# Stream write the data to storage.
|
# Stream write the data to storage.
|
||||||
store.stream_write(repo_image.storage.locations, layer_path, sr)
|
with database.CloseForLongOperation(app.config):
|
||||||
|
store.stream_write(repo_image.storage.locations, layer_path, sr)
|
||||||
|
|
||||||
# Append the computed checksum.
|
# Append the computed checksum.
|
||||||
csums = []
|
csums = []
|
||||||
|
@ -293,7 +298,7 @@ def put_image_checksum(namespace, repository, image_id):
|
||||||
issue='missing-checksum-cookie', image_id=image_id)
|
issue='missing-checksum-cookie', image_id=image_id)
|
||||||
|
|
||||||
profile.debug('Looking up repo image')
|
profile.debug('Looking up repo image')
|
||||||
repo_image = model.get_repo_image(namespace, repository, image_id)
|
repo_image = model.get_repo_image_extended(namespace, repository, image_id)
|
||||||
if not repo_image or not repo_image.storage:
|
if not repo_image or not repo_image.storage:
|
||||||
abort(404, 'Image not found: %(image_id)s', issue='unknown-image', image_id=image_id)
|
abort(404, 'Image not found: %(image_id)s', issue='unknown-image', image_id=image_id)
|
||||||
|
|
||||||
|
@ -348,7 +353,7 @@ def get_image_json(namespace, repository, image_id, headers):
|
||||||
abort(403)
|
abort(403)
|
||||||
|
|
||||||
profile.debug('Looking up repo image')
|
profile.debug('Looking up repo image')
|
||||||
repo_image = model.get_repo_image(namespace, repository, image_id)
|
repo_image = model.get_repo_image_extended(namespace, repository, image_id)
|
||||||
|
|
||||||
profile.debug('Looking up repo layer data')
|
profile.debug('Looking up repo layer data')
|
||||||
try:
|
try:
|
||||||
|
@ -379,7 +384,7 @@ def get_image_ancestry(namespace, repository, image_id, headers):
|
||||||
abort(403)
|
abort(403)
|
||||||
|
|
||||||
profile.debug('Looking up repo image')
|
profile.debug('Looking up repo image')
|
||||||
repo_image = model.get_repo_image(namespace, repository, image_id)
|
repo_image = model.get_repo_image_extended(namespace, repository, image_id)
|
||||||
|
|
||||||
profile.debug('Looking up image data')
|
profile.debug('Looking up image data')
|
||||||
try:
|
try:
|
||||||
|
@ -443,7 +448,7 @@ def put_image_json(namespace, repository, image_id):
|
||||||
issue='invalid-request', image_id=image_id)
|
issue='invalid-request', image_id=image_id)
|
||||||
|
|
||||||
profile.debug('Looking up repo image')
|
profile.debug('Looking up repo image')
|
||||||
repo_image = model.get_repo_image(namespace, repository, image_id)
|
repo_image = model.get_repo_image_extended(namespace, repository, image_id)
|
||||||
if not repo_image:
|
if not repo_image:
|
||||||
profile.debug('Image not found')
|
profile.debug('Image not found')
|
||||||
abort(404, 'Image %(image_id)s not found', issue='unknown-image',
|
abort(404, 'Image %(image_id)s not found', issue='unknown-image',
|
||||||
|
@ -460,7 +465,7 @@ def put_image_json(namespace, repository, image_id):
|
||||||
parent_image = None
|
parent_image = None
|
||||||
if parent_id:
|
if parent_id:
|
||||||
profile.debug('Looking up parent image')
|
profile.debug('Looking up parent image')
|
||||||
parent_image = model.get_repo_image(namespace, repository, parent_id)
|
parent_image = model.get_repo_image_extended(namespace, repository, parent_id)
|
||||||
|
|
||||||
parent_uuid = parent_image and parent_image.storage.uuid
|
parent_uuid = parent_image and parent_image.storage.uuid
|
||||||
parent_locations = parent_image and parent_image.storage.locations
|
parent_locations = parent_image and parent_image.storage.locations
|
||||||
|
@ -513,7 +518,7 @@ def put_image_json(namespace, repository, image_id):
|
||||||
def process_image_changes(namespace, repository, image_id):
|
def process_image_changes(namespace, repository, image_id):
|
||||||
logger.debug('Generating diffs for image: %s' % image_id)
|
logger.debug('Generating diffs for image: %s' % image_id)
|
||||||
|
|
||||||
repo_image = model.get_repo_image(namespace, repository, image_id)
|
repo_image = model.get_repo_image_extended(namespace, repository, image_id)
|
||||||
if not repo_image:
|
if not repo_image:
|
||||||
logger.warning('No image for id: %s', image_id)
|
logger.warning('No image for id: %s', image_id)
|
||||||
return None, None
|
return None, None
|
||||||
|
|
|
@ -21,24 +21,28 @@ from util.dockerloadformat import build_docker_load_stream
|
||||||
verbs = Blueprint('verbs', __name__)
|
verbs = Blueprint('verbs', __name__)
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
def _open_stream(namespace, repository, tag, synthetic_image_id, image_json, image_list):
|
def _open_stream(namespace, repository, tag, synthetic_image_id, image_json, image_id_list):
|
||||||
store = Storage(app)
|
store = Storage(app)
|
||||||
|
|
||||||
|
# For performance reasons, we load the full image list here, cache it, then disconnect from
|
||||||
|
# the database.
|
||||||
|
with database.UseThenDisconnect(app.config):
|
||||||
|
image_list = model.get_matching_repository_images(namespace, repository, image_id_list)
|
||||||
|
|
||||||
def get_next_image():
|
def get_next_image():
|
||||||
for current_image_id in image_list:
|
for current_image in image_list:
|
||||||
yield model.get_repo_image(namespace, repository, current_image_id)
|
yield current_image
|
||||||
|
|
||||||
def get_next_layer():
|
def get_next_layer():
|
||||||
for current_image_id in image_list:
|
for current_image_entry in image_list:
|
||||||
current_image_entry = model.get_repo_image(namespace, repository, current_image_id)
|
|
||||||
current_image_path = store.image_layer_path(current_image_entry.storage.uuid)
|
current_image_path = store.image_layer_path(current_image_entry.storage.uuid)
|
||||||
current_image_stream = store.stream_read_file(current_image_entry.storage.locations,
|
current_image_stream = store.stream_read_file(current_image_entry.storage.locations,
|
||||||
current_image_path)
|
current_image_path)
|
||||||
|
|
||||||
|
current_image_id = current_image_entry.id
|
||||||
logger.debug('Returning image layer %s: %s' % (current_image_id, current_image_path))
|
logger.debug('Returning image layer %s: %s' % (current_image_id, current_image_path))
|
||||||
yield current_image_stream
|
yield current_image_stream
|
||||||
|
|
||||||
database.configure(app.config)
|
|
||||||
stream = build_docker_load_stream(namespace, repository, tag, synthetic_image_id, image_json,
|
stream = build_docker_load_stream(namespace, repository, tag, synthetic_image_id, image_json,
|
||||||
get_next_image, get_next_layer)
|
get_next_image, get_next_layer)
|
||||||
|
|
||||||
|
@ -46,12 +50,13 @@ def _open_stream(namespace, repository, tag, synthetic_image_id, image_json, ima
|
||||||
|
|
||||||
|
|
||||||
def _write_synthetic_image_to_storage(linked_storage_uuid, linked_locations, queue_file):
|
def _write_synthetic_image_to_storage(linked_storage_uuid, linked_locations, queue_file):
|
||||||
database.configure(app.config)
|
|
||||||
store = Storage(app)
|
store = Storage(app)
|
||||||
|
|
||||||
def handle_exception(ex):
|
def handle_exception(ex):
|
||||||
logger.debug('Exception when building squashed image %s: %s', linked_storage_uuid, ex)
|
logger.debug('Exception when building squashed image %s: %s', linked_storage_uuid, ex)
|
||||||
model.delete_derived_storage_by_uuid(linked_storage_uuid)
|
|
||||||
|
with database.UseThenDisconnect(app.config):
|
||||||
|
model.delete_derived_storage_by_uuid(linked_storage_uuid)
|
||||||
|
|
||||||
queue_file.add_exception_handler(handle_exception)
|
queue_file.add_exception_handler(handle_exception)
|
||||||
|
|
||||||
|
@ -60,9 +65,10 @@ def _write_synthetic_image_to_storage(linked_storage_uuid, linked_locations, que
|
||||||
queue_file.close()
|
queue_file.close()
|
||||||
|
|
||||||
if not queue_file.raised_exception:
|
if not queue_file.raised_exception:
|
||||||
done_uploading = model.get_storage_by_uuid(linked_storage_uuid)
|
with database.UseThenDisconnect(app.config):
|
||||||
done_uploading.uploading = False
|
done_uploading = model.get_storage_by_uuid(linked_storage_uuid)
|
||||||
done_uploading.save()
|
done_uploading.uploading = False
|
||||||
|
done_uploading.save()
|
||||||
|
|
||||||
|
|
||||||
@verbs.route('/squash/<namespace>/<repository>/<tag>', methods=['GET'])
|
@verbs.route('/squash/<namespace>/<repository>/<tag>', methods=['GET'])
|
||||||
|
@ -77,7 +83,7 @@ def get_squashed_tag(namespace, repository, tag):
|
||||||
abort(404)
|
abort(404)
|
||||||
|
|
||||||
# Lookup the tag's image and storage.
|
# Lookup the tag's image and storage.
|
||||||
repo_image = model.get_repo_image(namespace, repository, tag_image.docker_image_id)
|
repo_image = model.get_repo_image_extended(namespace, repository, tag_image.docker_image_id)
|
||||||
if not repo_image:
|
if not repo_image:
|
||||||
abort(404)
|
abort(404)
|
||||||
|
|
||||||
|
@ -95,6 +101,9 @@ def get_squashed_tag(namespace, repository, tag):
|
||||||
logger.debug('Redirecting to download URL for derived image %s', derived.uuid)
|
logger.debug('Redirecting to download URL for derived image %s', derived.uuid)
|
||||||
return redirect(download_url)
|
return redirect(download_url)
|
||||||
|
|
||||||
|
# Close the database handle here for this process before we send the long download.
|
||||||
|
database.close_db_filter(None)
|
||||||
|
|
||||||
logger.debug('Sending cached derived image %s', derived.uuid)
|
logger.debug('Sending cached derived image %s', derived.uuid)
|
||||||
return send_file(store.stream_read_file(derived.locations, derived_layer_path))
|
return send_file(store.stream_read_file(derived.locations, derived_layer_path))
|
||||||
|
|
||||||
|
@ -132,6 +141,9 @@ def get_squashed_tag(namespace, repository, tag):
|
||||||
storage_args = (derived.uuid, derived.locations, storage_queue_file)
|
storage_args = (derived.uuid, derived.locations, storage_queue_file)
|
||||||
QueueProcess.run_process(_write_synthetic_image_to_storage, storage_args, finished=_cleanup)
|
QueueProcess.run_process(_write_synthetic_image_to_storage, storage_args, finished=_cleanup)
|
||||||
|
|
||||||
|
# Close the database handle here for this process before we send the long download.
|
||||||
|
database.close_db_filter(None)
|
||||||
|
|
||||||
# Return the client's data.
|
# Return the client's data.
|
||||||
return send_file(client_queue_file)
|
return send_file(client_queue_file)
|
||||||
|
|
||||||
|
|
|
@ -27,7 +27,7 @@ bad_count = 0
|
||||||
good_count = 0
|
good_count = 0
|
||||||
|
|
||||||
def resolve_or_create(repo, docker_image_id, new_ancestry):
|
def resolve_or_create(repo, docker_image_id, new_ancestry):
|
||||||
existing = model.get_repo_image(repo.namespace_user.username, repo.name, docker_image_id)
|
existing = model.get_repo_image_extended(repo.namespace_user.username, repo.name, docker_image_id)
|
||||||
if existing:
|
if existing:
|
||||||
logger.debug('Found existing image: %s, %s', existing.id, docker_image_id)
|
logger.debug('Found existing image: %s, %s', existing.id, docker_image_id)
|
||||||
return existing
|
return existing
|
||||||
|
@ -63,7 +63,7 @@ def all_ancestors_exist(ancestors):
|
||||||
cant_fix = []
|
cant_fix = []
|
||||||
for img in query:
|
for img in query:
|
||||||
try:
|
try:
|
||||||
with_locations = model.get_repo_image(img.repository.namespace_user.username,
|
with_locations = model.get_repo_image_extended(img.repository.namespace_user.username,
|
||||||
img.repository.name, img.docker_image_id)
|
img.repository.name, img.docker_image_id)
|
||||||
ancestry_storage = store.image_ancestry_path(img.storage.uuid)
|
ancestry_storage = store.image_ancestry_path(img.storage.uuid)
|
||||||
if store.exists(with_locations.storage.locations, ancestry_storage):
|
if store.exists(with_locations.storage.locations, ancestry_storage):
|
||||||
|
|
Reference in a new issue