Change georeplication queuing to use new batch system
This commit is contained in:
		
							parent
							
								
									732ab67b57
								
							
						
					
					
						commit
						9413e25123
					
				
					 2 changed files with 31 additions and 11 deletions
				
			
		|  | @ -21,7 +21,7 @@ from image.docker import ManifestException | ||||||
| from image.docker.schema1 import DockerSchema1Manifest, DockerSchema1ManifestBuilder | from image.docker.schema1 import DockerSchema1Manifest, DockerSchema1ManifestBuilder | ||||||
| from image.docker.schema2 import DOCKER_SCHEMA2_CONTENT_TYPES | from image.docker.schema2 import DOCKER_SCHEMA2_CONTENT_TYPES | ||||||
| from util.names import VALID_TAG_PATTERN | from util.names import VALID_TAG_PATTERN | ||||||
| from util.registry.replication import queue_storage_replication | from util.registry.replication import queue_replication_batch | ||||||
| from util.validation import is_json | from util.validation import is_json | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | @ -197,11 +197,11 @@ def _write_manifest_and_log(namespace_name, repo_name, manifest): | ||||||
|   repo, storage_map = _write_manifest(namespace_name, repo_name, manifest) |   repo, storage_map = _write_manifest(namespace_name, repo_name, manifest) | ||||||
| 
 | 
 | ||||||
|   # Queue all blob manifests for replication. |   # Queue all blob manifests for replication. | ||||||
|   # TODO(jschorr): Find a way to optimize this insertion. |  | ||||||
|   if features.STORAGE_REPLICATION: |   if features.STORAGE_REPLICATION: | ||||||
|     for layer in manifest.layers: |     with queue_replication_batch(namespace_name) as queue_storage_replication: | ||||||
|       digest_str = str(layer.digest) |       for layer in manifest.layers: | ||||||
|       queue_storage_replication(namespace_name, storage_map[digest_str]) |         digest_str = str(layer.digest) | ||||||
|  |         queue_storage_replication(storage_map[digest_str]) | ||||||
| 
 | 
 | ||||||
|   track_and_log('push_repo', repo, tag=manifest.tag) |   track_and_log('push_repo', repo, tag=manifest.tag) | ||||||
|   spawn_notification(repo, 'repo_push', {'updated_tags': [manifest.tag]}) |   spawn_notification(repo, 'repo_push', {'updated_tags': [manifest.tag]}) | ||||||
|  |  | ||||||
|  | @ -1,14 +1,34 @@ | ||||||
| import features | import features | ||||||
| import json | import json | ||||||
|  | 
 | ||||||
|  | from contextlib import contextmanager | ||||||
| from data import model | from data import model | ||||||
| 
 | 
 | ||||||
| from app import image_replication_queue | from app import image_replication_queue | ||||||
| 
 | 
 | ||||||
|  | DEFAULT_BATCH_SIZE = 1000 | ||||||
|  | 
 | ||||||
|  | @contextmanager | ||||||
|  | def queue_replication_batch(namespace, batch_size=DEFAULT_BATCH_SIZE): | ||||||
|  |   """ | ||||||
|  |   Context manager implementation which returns a target callable that takes the storage | ||||||
|  |   to queue for replication. When the the context block exits the items generated by | ||||||
|  |   the callable will be bulk inserted into the queue with the specified batch size. | ||||||
|  |   """ | ||||||
|  |   namespace_user = model.user.get_namespace_user(namespace) | ||||||
|  | 
 | ||||||
|  |   with image_replication_queue.batch_insert(batch_size) as queue_put: | ||||||
|  |     def queue_storage_replication_batch(storage): | ||||||
|  |       if features.STORAGE_REPLICATION: | ||||||
|  |         queue_put([storage.uuid], json.dumps({ | ||||||
|  |           'namespace_user_id': namespace_user.id, | ||||||
|  |           'storage_id': storage.uuid, | ||||||
|  |         })) | ||||||
|  | 
 | ||||||
|  |     yield queue_storage_replication_batch | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
| def queue_storage_replication(namespace, storage): | def queue_storage_replication(namespace, storage): | ||||||
|   """ Queues replication for the given image storage under the given namespace (if enabled). """ |   """ Queues replication for the given image storage under the given namespace (if enabled). """ | ||||||
|   if features.STORAGE_REPLICATION: |   with queue_replication_batch(namespace, 1) as batch_spawn: | ||||||
|     namespace_user = model.user.get_namespace_user(namespace) |     batch_spawn(storage) | ||||||
|     image_replication_queue.put([storage.uuid], json.dumps({ |  | ||||||
|       'namespace_user_id': namespace_user.id, |  | ||||||
|       'storage_id': storage.uuid, |  | ||||||
|     })) |  | ||||||
|  |  | ||||||
		Reference in a new issue