Merge pull request #2254 from coreos-inc/georep-batch

Change georeplication queuing to use new batch system
This commit is contained in:
josephschorr 2016-12-22 14:09:09 -05:00 committed by GitHub
commit a9cced3282
2 changed files with 31 additions and 11 deletions

View file

@ -21,7 +21,7 @@ from image.docker import ManifestException
from image.docker.schema1 import DockerSchema1Manifest, DockerSchema1ManifestBuilder
from image.docker.schema2 import DOCKER_SCHEMA2_CONTENT_TYPES
from util.names import VALID_TAG_PATTERN
from util.registry.replication import queue_storage_replication
from util.registry.replication import queue_replication_batch
from util.validation import is_json
@ -197,11 +197,11 @@ def _write_manifest_and_log(namespace_name, repo_name, manifest):
repo, storage_map = _write_manifest(namespace_name, repo_name, manifest)
# Queue all blob manifests for replication.
# TODO(jschorr): Find a way to optimize this insertion.
if features.STORAGE_REPLICATION:
for layer in manifest.layers:
digest_str = str(layer.digest)
queue_storage_replication(namespace_name, storage_map[digest_str])
with queue_replication_batch(namespace_name) as queue_storage_replication:
for layer in manifest.layers:
digest_str = str(layer.digest)
queue_storage_replication(storage_map[digest_str])
track_and_log('push_repo', repo, tag=manifest.tag)
spawn_notification(repo, 'repo_push', {'updated_tags': [manifest.tag]})

View file

@ -1,14 +1,34 @@
import features
import json
from contextlib import contextmanager
from data import model
from app import image_replication_queue
DEFAULT_BATCH_SIZE = 1000
@contextmanager
def queue_replication_batch(namespace, batch_size=DEFAULT_BATCH_SIZE):
"""
Context manager implementation which returns a target callable that takes the storage
to queue for replication. When the the context block exits the items generated by
the callable will be bulk inserted into the queue with the specified batch size.
"""
namespace_user = model.user.get_namespace_user(namespace)
with image_replication_queue.batch_insert(batch_size) as queue_put:
def queue_storage_replication_batch(storage):
if features.STORAGE_REPLICATION:
queue_put([storage.uuid], json.dumps({
'namespace_user_id': namespace_user.id,
'storage_id': storage.uuid,
}))
yield queue_storage_replication_batch
def queue_storage_replication(namespace, storage):
""" Queues replication for the given image storage under the given namespace (if enabled). """
if features.STORAGE_REPLICATION:
namespace_user = model.user.get_namespace_user(namespace)
image_replication_queue.put([storage.uuid], json.dumps({
'namespace_user_id': namespace_user.id,
'storage_id': storage.uuid,
}))
with queue_replication_batch(namespace, 1) as batch_spawn:
batch_spawn(storage)