diff --git a/data/database.py b/data/database.py index 6562e1f63..6a1544e76 100644 --- a/data/database.py +++ b/data/database.py @@ -35,6 +35,36 @@ class CallableProxy(Proxy): raise AttributeError('Cannot use uninitialized Proxy.') return self.obj(*args, **kwargs) + +class CloseForLongOperation(object): + """ Helper object which disconnects the database then reconnects after the nested operation + completes. + """ + + def __init__(self, config_object): + self.config_object = config_object + + def __enter__(self): + close_db_filter(None) + + def __exit__(self, type, value, traceback): + # Note: Nothing to do. The next SQL call will reconnect automatically. + pass + + +class UseThenDisconnect(object): + """ Helper object for conducting work with a database and then tearing it down. """ + + def __init__(self, config_object): + self.config_object = config_object + + def __enter__(self): + configure(self.config_object) + + def __exit__(self, type, value, traceback): + close_db_filter(None) + + db = Proxy() read_slave = Proxy() db_random_func = CallableProxy() @@ -56,6 +86,7 @@ def _db_from_url(url, db_kwargs): def configure(config_object): + logger.debug('Configuring database') db_kwargs = dict(config_object['DB_CONNECTION_ARGS']) write_db_uri = config_object['DB_URI'] db.initialize(_db_from_url(write_db_uri, db_kwargs)) diff --git a/data/model/legacy.py b/data/model/legacy.py index b7a20d1cd..4511769d8 100644 --- a/data/model/legacy.py +++ b/data/model/legacy.py @@ -1035,16 +1035,26 @@ def get_repository(namespace_name, repository_name): return None -def get_repo_image(namespace_name, repository_name, image_id): +def get_repo_image(namespace_name, repository_name, docker_image_id): def limit_to_image_id(query): - return query.where(Image.docker_image_id == image_id) + return query.where(Image.docker_image_id == docker_image_id).limit(1) + + query = _get_repository_images(namespace_name, repository_name, limit_to_image_id) + try: + return query.get() + except Image.DoesNotExist: + return None + + +def get_repo_image_extended(namespace_name, repository_name, docker_image_id): + def limit_to_image_id(query): + return query.where(Image.docker_image_id == docker_image_id).limit(1) images = _get_repository_images_base(namespace_name, repository_name, limit_to_image_id) if not images: return None - else: - return images[0] + return images[0] def repository_is_public(namespace_name, repository_name): try: @@ -1137,20 +1147,21 @@ def __translate_ancestry(old_ancestry, translations, repository, username, prefe if old_ancestry == '/': return '/' - def translate_id(old_id): + def translate_id(old_id, docker_image_id): logger.debug('Translating id: %s', old_id) if old_id not in translations: - # Figure out which docker_image_id the old id refers to, then find a - # a local one - old = Image.select(Image.docker_image_id).where(Image.id == old_id).get() - image_in_repo = find_create_or_link_image(old.docker_image_id, repository, username, + image_in_repo = find_create_or_link_image(docker_image_id, repository, username, translations, preferred_location) translations[old_id] = image_in_repo.id - return translations[old_id] + # Select all the ancestor Docker IDs in a single query. old_ids = [int(id_str) for id_str in old_ancestry.split('/')[1:-1]] - new_ids = [str(translate_id(old_id)) for old_id in old_ids] + query = Image.select(Image.id, Image.docker_image_id).where(Image.id << old_ids) + old_images = {i.id: i.docker_image_id for i in query} + + # Translate the old images into new ones. + new_ids = [str(translate_id(old_id, old_images[old_id])) for old_id in old_ids] return '/%s/' % '/'.join(new_ids) @@ -1162,36 +1173,22 @@ def _create_storage(location_name): return storage -def find_create_or_link_image(docker_image_id, repository, username, translations, - preferred_location): +def _find_or_link_image(existing_image, repository, username, translations, preferred_location): + # TODO(jake): This call is currently recursively done under a single transaction. Can we make + # it instead be done under a set of transactions? with config.app_config['DB_TRANSACTION_FACTORY'](db): + # Check for an existing image, under the transaction, to make sure it doesn't already exist. repo_image = get_repo_image(repository.namespace_user.username, repository.name, - docker_image_id) + existing_image.docker_image_id) if repo_image: return repo_image - query = (Image - .select(Image, ImageStorage) - .distinct() - .join(ImageStorage) - .switch(Image) - .join(Repository) - .join(Visibility) - .switch(Repository) - .join(RepositoryPermission, JOIN_LEFT_OUTER) - .switch(Repository) - .join(Namespace, on=(Repository.namespace_user == Namespace.id)) - .where(ImageStorage.uploading == False)) - - query = (_filter_to_repos_for_user(query, username) - .where(Image.docker_image_id == docker_image_id)) - - new_image_ancestry = '/' - origin_image_id = None + # Make sure the existing base image still exists. try: - to_copy = query.get() + to_copy = Image.select().join(ImageStorage).where(Image.id == existing_image.id).get() + msg = 'Linking image to existing storage with docker id: %s and uuid: %s' - logger.debug(msg, docker_image_id, to_copy.storage.uuid) + logger.debug(msg, existing_image.docker_image_id, to_copy.storage.uuid) new_image_ancestry = __translate_ancestry(to_copy.ancestors, translations, repository, username, preferred_location) @@ -1199,25 +1196,73 @@ def find_create_or_link_image(docker_image_id, repository, username, translation storage = to_copy.storage storage.locations = {placement.location.name for placement in storage.imagestorageplacement_set} - origin_image_id = to_copy.id + + new_image = Image.create(docker_image_id=existing_image.docker_image_id, + repository=repository, storage=storage, + ancestors=new_image_ancestry) + + logger.debug('Storing translation %s -> %s', existing_image.id, new_image.id) + translations[existing_image.id] = new_image.id + return new_image except Image.DoesNotExist: - logger.debug('Creating new storage for docker id: %s', docker_image_id) - storage = _create_storage(preferred_location) - - logger.debug('Storage locations: %s', storage.locations) - - new_image = Image.create(docker_image_id=docker_image_id, - repository=repository, storage=storage, - ancestors=new_image_ancestry) - - logger.debug('new_image storage locations: %s', new_image.storage.locations) + return None - if origin_image_id: - logger.debug('Storing translation %s -> %s', origin_image_id, new_image.id) - translations[origin_image_id] = new_image.id +def find_create_or_link_image(docker_image_id, repository, username, translations, + preferred_location): - return new_image + # First check for the image existing in the repository. If found, we simply return it. + repo_image = get_repo_image(repository.namespace_user.username, repository.name, + docker_image_id) + if repo_image: + return repo_image + + # We next check to see if there is an existing storage the new image can link to. + existing_image_query = (Image + .select(Image, ImageStorage) + .distinct() + .join(ImageStorage) + .switch(Image) + .join(Repository) + .join(Visibility) + .switch(Repository) + .join(RepositoryPermission, JOIN_LEFT_OUTER) + .switch(Repository) + .join(Namespace, on=(Repository.namespace_user == Namespace.id)) + .where(ImageStorage.uploading == False)) + + existing_image_query = (_filter_to_repos_for_user(existing_image_query, username) + .where(Image.docker_image_id == docker_image_id)) + + # If there is an existing image, we try to translate its ancestry and copy its storage. + new_image = None + try: + logger.debug('Looking up existing image for ID: %s', docker_image_id) + existing_image = existing_image_query.get() + + logger.debug('Existing image %s found for ID: %s', existing_image.id, docker_image_id) + new_image = _find_or_link_image(existing_image, repository, username, translations, + preferred_location) + if new_image: + return new_image + except Image.DoesNotExist: + logger.debug('No existing image found for ID: %s', docker_image_id) + pass + + # Otherwise, create a new storage directly. + with config.app_config['DB_TRANSACTION_FACTORY'](db): + # Final check for an existing image, under the transaction. + repo_image = get_repo_image(repository.namespace_user.username, repository.name, + docker_image_id) + if repo_image: + return repo_image + + logger.debug('Creating new storage for docker id: %s', docker_image_id) + storage = _create_storage(preferred_location) + + return Image.create(docker_image_id=docker_image_id, + repository=repository, storage=storage, + ancestors='/') def find_or_create_derived_storage(source, transformation_name, preferred_location): @@ -1331,6 +1376,15 @@ def set_image_metadata(docker_image_id, namespace_name, repository_name, created fetched.storage.save() return fetched +def _get_repository_images(namespace_name, repository_name, query_modifier): + query = (Image + .select() + .join(Repository) + .join(Namespace, on=(Repository.namespace_user == Namespace.id)) + .where(Repository.name == repository_name, Namespace.username == namespace_name)) + + query = query_modifier(query) + return query def _get_repository_images_base(namespace_name, repository_name, query_modifier): query = (ImageStoragePlacement @@ -1367,6 +1421,20 @@ def _get_repository_images_base(namespace_name, repository_name, query_modifier) return images.values() +def lookup_repository_images(namespace_name, repository_name, docker_image_ids): + return (Image + .select() + .join(Repository) + .join(Namespace, on=(Repository.namespace_user == Namespace.id)) + .where(Repository.name == repository_name, Namespace.username == namespace_name, + Image.docker_image_id << docker_image_ids)) + +def get_matching_repository_images(namespace_name, repository_name, docker_image_ids): + def modify_query(q): + return q.where(Image.docker_image_id << docker_image_ids) + + return _get_repository_images_base(namespace_name, repository_name, modify_query) + def get_repository_images(namespace_name, repository_name): return _get_repository_images_base(namespace_name, repository_name, lambda q: q) @@ -1385,6 +1453,9 @@ def garbage_collect_repository(namespace_name, repository_name): storage_id_whitelist = {} with config.app_config['DB_TRANSACTION_FACTORY'](db): + # TODO (jake): We could probably select this and all the images in a single query using + # a different kind of join. + # Get a list of all images used by tags in the repository tag_query = (RepositoryTag .select(RepositoryTag, Image, ImageStorage) @@ -1403,7 +1474,7 @@ def garbage_collect_repository(namespace_name, repository_name): referenced_anscestors = referenced_anscestors.union(set(ancestor_list)) referenced_anscestors.add(tag.image.id) - all_repo_images = get_repository_images(namespace_name, repository_name) + all_repo_images = _get_repository_images(namespace_name, repository_name, lambda q: q) all_images = {int(img.id): img for img in all_repo_images} to_remove = set(all_images.keys()).difference(referenced_anscestors) @@ -1461,8 +1532,8 @@ def garbage_collect_storage(storage_id_whitelist): # Note: Both of these deletes must occur in the same transaction (unfortunately) because a # storage without any placement is invalid, and a placement cannot exist without a storage. - # TODO: We might want to allow for null storages on placements, which would allow us to delete - # the storages, then delete the placements in a non-transaction. + # TODO(jake): We might want to allow for null storages on placements, which would allow us to + # delete the storages, then delete the placements in a non-transaction. logger.debug('Garbage collecting storages from candidates: %s', storage_id_whitelist) with config.app_config['DB_TRANSACTION_FACTORY'](db): # Track all of the data that should be removed from blob storage @@ -1516,7 +1587,7 @@ def get_tag_image(namespace_name, repository_name, tag_name): def get_image_by_id(namespace_name, repository_name, docker_image_id): - image = get_repo_image(namespace_name, repository_name, docker_image_id) + image = get_repo_image_extended(namespace_name, repository_name, docker_image_id) if not image: raise DataModelException('Unable to find image \'%s\' for repo \'%s/%s\'' % (docker_image_id, namespace_name, repository_name)) @@ -1747,11 +1818,10 @@ def get_repository_delegate_tokens(namespace_name, repository_name): def get_repo_delegate_token(namespace_name, repository_name, code): repo_query = get_repository_delegate_tokens(namespace_name, repository_name) - found = list(repo_query.where(AccessToken.code == code)) - if found: - return found[0] - else: + try: + return repo_query.where(AccessToken.code == code).get() + except AccessToken.DoesNotExist: raise InvalidTokenException('Unable to find token with code: %s' % code) @@ -1920,9 +1990,9 @@ def list_logs(start_time, end_time, performer=None, repository=None, namespace=N if namespace: joined = joined.where(User.username == namespace) - return joined.where( + return list(joined.where( LogEntry.datetime >= start_time, - LogEntry.datetime < end_time).order_by(LogEntry.datetime.desc()) + LogEntry.datetime < end_time).order_by(LogEntry.datetime.desc())) def log_action(kind_name, user_or_organization_name, performer=None, diff --git a/endpoints/api/image.py b/endpoints/api/image.py index 3a6c62507..c952cc5d7 100644 --- a/endpoints/api/image.py +++ b/endpoints/api/image.py @@ -73,7 +73,7 @@ class RepositoryImage(RepositoryParamResource): @nickname('getImage') def get(self, namespace, repository, image_id): """ Get the information available for the specified image. """ - image = model.get_repo_image(namespace, repository, image_id) + image = model.get_repo_image_extended(namespace, repository, image_id) if not image: raise NotFound() @@ -94,7 +94,7 @@ class RepositoryImageChanges(RepositoryParamResource): @nickname('getImageChanges') def get(self, namespace, repository, image_id): """ Get the list of changes for the specified image. """ - image = model.get_repo_image(namespace, repository, image_id) + image = model.get_repo_image_extended(namespace, repository, image_id) if not image: raise NotFound() diff --git a/endpoints/index.py b/endpoints/index.py index 1c8e551a0..d97dd94f5 100644 --- a/endpoints/index.py +++ b/endpoints/index.py @@ -223,13 +223,20 @@ def create_repository(namespace, repository): repo = model.create_repository(namespace, repository, get_authenticated_user()) - profile.debug('Determining added images') - added_images = OrderedDict([(desc['id'], desc) - for desc in image_descriptions]) + profile.debug('Determining already added images') + added_images = OrderedDict([(desc['id'], desc) for desc in image_descriptions]) new_repo_images = dict(added_images) - for existing in model.get_repository_images(namespace, repository): - if existing.docker_image_id in new_repo_images: + # Optimization: Lookup any existing images in the repository with matching docker IDs and + # remove them from the added dict, so we don't need to look them up one-by-one. + def chunks(l, n): + for i in xrange(0, len(l), n): + yield l[i:i+n] + + # Note: We do this in chunks in an effort to not hit the SQL query size limit. + for chunk in chunks(new_repo_images.keys(), 50): + existing_images = model.lookup_repository_images(namespace, repository, chunk) + for existing in existing_images: added_images.pop(existing.docker_image_id) profile.debug('Creating/Linking necessary images') @@ -241,9 +248,8 @@ def create_repository(namespace, repository): profile.debug('Created images') - response = make_response('Created', 201) track_and_log('push_repo', repo) - return response + return make_response('Created', 201) @index.route('/repositories/<path:repository>/images', methods=['PUT']) diff --git a/endpoints/registry.py b/endpoints/registry.py index 48943bf4c..77ddbec68 100644 --- a/endpoints/registry.py +++ b/endpoints/registry.py @@ -7,13 +7,13 @@ from functools import wraps from datetime import datetime from time import time -from app import storage as store, image_diff_queue +from app import storage as store, image_diff_queue, app from auth.auth import process_auth, extract_namespace_repo_from_session from util import checksums, changes from util.http import abort, exact_abort from auth.permissions import (ReadRepositoryPermission, ModifyRepositoryPermission) -from data import model +from data import model, database from util import gzipstream @@ -59,7 +59,7 @@ def require_completion(f): @wraps(f) def wrapper(namespace, repository, *args, **kwargs): image_id = kwargs['image_id'] - repo_image = model.get_repo_image(namespace, repository, image_id) + repo_image = model.get_repo_image_extended(namespace, repository, image_id) if image_is_uploading(repo_image): abort(400, 'Image %(image_id)s is being uploaded, retry later', issue='upload-in-progress', image_id=kwargs['image_id']) @@ -103,7 +103,7 @@ def head_image_layer(namespace, repository, image_id, headers): profile.debug('Checking repo permissions') if permission.can() or model.repository_is_public(namespace, repository): profile.debug('Looking up repo image') - repo_image = model.get_repo_image(namespace, repository, image_id) + repo_image = model.get_repo_image_extended(namespace, repository, image_id) if not repo_image: profile.debug('Image not found') abort(404, 'Image %(image_id)s not found', issue='unknown-image', @@ -136,7 +136,7 @@ def get_image_layer(namespace, repository, image_id, headers): profile.debug('Checking repo permissions') if permission.can() or model.repository_is_public(namespace, repository): profile.debug('Looking up repo image') - repo_image = model.get_repo_image(namespace, repository, image_id) + repo_image = model.get_repo_image_extended(namespace, repository, image_id) profile.debug('Looking up the layer path') try: @@ -151,6 +151,10 @@ def get_image_layer(namespace, repository, image_id, headers): return resp profile.debug('Streaming layer data') + + # Close the database handle here for this process before we send the long download. + database.close_db_filter(None) + return Response(store.stream_read(repo_image.storage.locations, path), headers=headers) except (IOError, AttributeError): profile.debug('Image not found') @@ -170,7 +174,7 @@ def put_image_layer(namespace, repository, image_id): abort(403) profile.debug('Retrieving image') - repo_image = model.get_repo_image(namespace, repository, image_id) + repo_image = model.get_repo_image_extended(namespace, repository, image_id) try: profile.debug('Retrieving image data') uuid = repo_image.storage.uuid @@ -213,7 +217,8 @@ def put_image_layer(namespace, repository, image_id): sr.add_handler(sum_hndlr) # Stream write the data to storage. - store.stream_write(repo_image.storage.locations, layer_path, sr) + with database.CloseForLongOperation(app.config): + store.stream_write(repo_image.storage.locations, layer_path, sr) # Append the computed checksum. csums = [] @@ -293,7 +298,7 @@ def put_image_checksum(namespace, repository, image_id): issue='missing-checksum-cookie', image_id=image_id) profile.debug('Looking up repo image') - repo_image = model.get_repo_image(namespace, repository, image_id) + repo_image = model.get_repo_image_extended(namespace, repository, image_id) if not repo_image or not repo_image.storage: abort(404, 'Image not found: %(image_id)s', issue='unknown-image', image_id=image_id) @@ -348,7 +353,7 @@ def get_image_json(namespace, repository, image_id, headers): abort(403) profile.debug('Looking up repo image') - repo_image = model.get_repo_image(namespace, repository, image_id) + repo_image = model.get_repo_image_extended(namespace, repository, image_id) profile.debug('Looking up repo layer data') try: @@ -379,7 +384,7 @@ def get_image_ancestry(namespace, repository, image_id, headers): abort(403) profile.debug('Looking up repo image') - repo_image = model.get_repo_image(namespace, repository, image_id) + repo_image = model.get_repo_image_extended(namespace, repository, image_id) profile.debug('Looking up image data') try: @@ -443,7 +448,7 @@ def put_image_json(namespace, repository, image_id): issue='invalid-request', image_id=image_id) profile.debug('Looking up repo image') - repo_image = model.get_repo_image(namespace, repository, image_id) + repo_image = model.get_repo_image_extended(namespace, repository, image_id) if not repo_image: profile.debug('Image not found') abort(404, 'Image %(image_id)s not found', issue='unknown-image', @@ -460,7 +465,7 @@ def put_image_json(namespace, repository, image_id): parent_image = None if parent_id: profile.debug('Looking up parent image') - parent_image = model.get_repo_image(namespace, repository, parent_id) + parent_image = model.get_repo_image_extended(namespace, repository, parent_id) parent_uuid = parent_image and parent_image.storage.uuid parent_locations = parent_image and parent_image.storage.locations @@ -513,7 +518,7 @@ def put_image_json(namespace, repository, image_id): def process_image_changes(namespace, repository, image_id): logger.debug('Generating diffs for image: %s' % image_id) - repo_image = model.get_repo_image(namespace, repository, image_id) + repo_image = model.get_repo_image_extended(namespace, repository, image_id) if not repo_image: logger.warning('No image for id: %s', image_id) return None, None diff --git a/endpoints/verbs.py b/endpoints/verbs.py index b410251d3..33a214788 100644 --- a/endpoints/verbs.py +++ b/endpoints/verbs.py @@ -21,24 +21,28 @@ from util.dockerloadformat import build_docker_load_stream verbs = Blueprint('verbs', __name__) logger = logging.getLogger(__name__) -def _open_stream(namespace, repository, tag, synthetic_image_id, image_json, image_list): +def _open_stream(namespace, repository, tag, synthetic_image_id, image_json, image_id_list): store = Storage(app) + # For performance reasons, we load the full image list here, cache it, then disconnect from + # the database. + with database.UseThenDisconnect(app.config): + image_list = model.get_matching_repository_images(namespace, repository, image_id_list) + def get_next_image(): - for current_image_id in image_list: - yield model.get_repo_image(namespace, repository, current_image_id) + for current_image in image_list: + yield current_image def get_next_layer(): - for current_image_id in image_list: - current_image_entry = model.get_repo_image(namespace, repository, current_image_id) + for current_image_entry in image_list: current_image_path = store.image_layer_path(current_image_entry.storage.uuid) current_image_stream = store.stream_read_file(current_image_entry.storage.locations, current_image_path) + current_image_id = current_image_entry.id logger.debug('Returning image layer %s: %s' % (current_image_id, current_image_path)) yield current_image_stream - database.configure(app.config) stream = build_docker_load_stream(namespace, repository, tag, synthetic_image_id, image_json, get_next_image, get_next_layer) @@ -46,12 +50,13 @@ def _open_stream(namespace, repository, tag, synthetic_image_id, image_json, ima def _write_synthetic_image_to_storage(linked_storage_uuid, linked_locations, queue_file): - database.configure(app.config) store = Storage(app) def handle_exception(ex): logger.debug('Exception when building squashed image %s: %s', linked_storage_uuid, ex) - model.delete_derived_storage_by_uuid(linked_storage_uuid) + + with database.UseThenDisconnect(app.config): + model.delete_derived_storage_by_uuid(linked_storage_uuid) queue_file.add_exception_handler(handle_exception) @@ -60,9 +65,10 @@ def _write_synthetic_image_to_storage(linked_storage_uuid, linked_locations, que queue_file.close() if not queue_file.raised_exception: - done_uploading = model.get_storage_by_uuid(linked_storage_uuid) - done_uploading.uploading = False - done_uploading.save() + with database.UseThenDisconnect(app.config): + done_uploading = model.get_storage_by_uuid(linked_storage_uuid) + done_uploading.uploading = False + done_uploading.save() @verbs.route('/squash/<namespace>/<repository>/<tag>', methods=['GET']) @@ -77,7 +83,7 @@ def get_squashed_tag(namespace, repository, tag): abort(404) # Lookup the tag's image and storage. - repo_image = model.get_repo_image(namespace, repository, tag_image.docker_image_id) + repo_image = model.get_repo_image_extended(namespace, repository, tag_image.docker_image_id) if not repo_image: abort(404) @@ -95,6 +101,9 @@ def get_squashed_tag(namespace, repository, tag): logger.debug('Redirecting to download URL for derived image %s', derived.uuid) return redirect(download_url) + # Close the database handle here for this process before we send the long download. + database.close_db_filter(None) + logger.debug('Sending cached derived image %s', derived.uuid) return send_file(store.stream_read_file(derived.locations, derived_layer_path)) @@ -132,6 +141,9 @@ def get_squashed_tag(namespace, repository, tag): storage_args = (derived.uuid, derived.locations, storage_queue_file) QueueProcess.run_process(_write_synthetic_image_to_storage, storage_args, finished=_cleanup) + # Close the database handle here for this process before we send the long download. + database.close_db_filter(None) + # Return the client's data. return send_file(client_queue_file) diff --git a/tools/auditancestry.py b/tools/auditancestry.py index 59d636836..27fd11d8c 100644 --- a/tools/auditancestry.py +++ b/tools/auditancestry.py @@ -27,7 +27,7 @@ bad_count = 0 good_count = 0 def resolve_or_create(repo, docker_image_id, new_ancestry): - existing = model.get_repo_image(repo.namespace_user.username, repo.name, docker_image_id) + existing = model.get_repo_image_extended(repo.namespace_user.username, repo.name, docker_image_id) if existing: logger.debug('Found existing image: %s, %s', existing.id, docker_image_id) return existing @@ -63,7 +63,7 @@ def all_ancestors_exist(ancestors): cant_fix = [] for img in query: try: - with_locations = model.get_repo_image(img.repository.namespace_user.username, + with_locations = model.get_repo_image_extended(img.repository.namespace_user.username, img.repository.name, img.docker_image_id) ancestry_storage = store.image_ancestry_path(img.storage.uuid) if store.exists(with_locations.storage.locations, ancestry_storage):