From 9413e25123721c189bafa68ed3e85ecd57e13f68 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Wed, 21 Dec 2016 12:54:50 -0500 Subject: [PATCH] Change georeplication queuing to use new batch system --- endpoints/v2/manifest.py | 10 +++++----- util/registry/replication.py | 32 ++++++++++++++++++++++++++------ 2 files changed, 31 insertions(+), 11 deletions(-) diff --git a/endpoints/v2/manifest.py b/endpoints/v2/manifest.py index 2cd80bec7..a4155add2 100644 --- a/endpoints/v2/manifest.py +++ b/endpoints/v2/manifest.py @@ -21,7 +21,7 @@ from image.docker import ManifestException from image.docker.schema1 import DockerSchema1Manifest, DockerSchema1ManifestBuilder from image.docker.schema2 import DOCKER_SCHEMA2_CONTENT_TYPES from util.names import VALID_TAG_PATTERN -from util.registry.replication import queue_storage_replication +from util.registry.replication import queue_replication_batch from util.validation import is_json @@ -197,11 +197,11 @@ def _write_manifest_and_log(namespace_name, repo_name, manifest): repo, storage_map = _write_manifest(namespace_name, repo_name, manifest) # Queue all blob manifests for replication. - # TODO(jschorr): Find a way to optimize this insertion. if features.STORAGE_REPLICATION: - for layer in manifest.layers: - digest_str = str(layer.digest) - queue_storage_replication(namespace_name, storage_map[digest_str]) + with queue_replication_batch(namespace_name) as queue_storage_replication: + for layer in manifest.layers: + digest_str = str(layer.digest) + queue_storage_replication(storage_map[digest_str]) track_and_log('push_repo', repo, tag=manifest.tag) spawn_notification(repo, 'repo_push', {'updated_tags': [manifest.tag]}) diff --git a/util/registry/replication.py b/util/registry/replication.py index b20be90f8..f9eca981c 100644 --- a/util/registry/replication.py +++ b/util/registry/replication.py @@ -1,14 +1,34 @@ import features import json + +from contextlib import contextmanager from data import model from app import image_replication_queue +DEFAULT_BATCH_SIZE = 1000 + +@contextmanager +def queue_replication_batch(namespace, batch_size=DEFAULT_BATCH_SIZE): + """ + Context manager implementation which returns a target callable that takes the storage + to queue for replication. When the the context block exits the items generated by + the callable will be bulk inserted into the queue with the specified batch size. + """ + namespace_user = model.user.get_namespace_user(namespace) + + with image_replication_queue.batch_insert(batch_size) as queue_put: + def queue_storage_replication_batch(storage): + if features.STORAGE_REPLICATION: + queue_put([storage.uuid], json.dumps({ + 'namespace_user_id': namespace_user.id, + 'storage_id': storage.uuid, + })) + + yield queue_storage_replication_batch + + def queue_storage_replication(namespace, storage): """ Queues replication for the given image storage under the given namespace (if enabled). """ - if features.STORAGE_REPLICATION: - namespace_user = model.user.get_namespace_user(namespace) - image_replication_queue.put([storage.uuid], json.dumps({ - 'namespace_user_id': namespace_user.id, - 'storage_id': storage.uuid, - })) + with queue_replication_batch(namespace, 1) as batch_spawn: + batch_spawn(storage)