Merge pull request #2452 from coreos-inc/storage-rep-fixes
Small improvements to storage replication code
This commit is contained in:
commit
a204db1ecc
2 changed files with 15 additions and 4 deletions
|
@ -1,7 +1,9 @@
|
||||||
import logging
|
import logging
|
||||||
import features
|
import features
|
||||||
|
|
||||||
|
from app import storage, image_replication_queue
|
||||||
from data.database import Image, ImageStorage, Repository, User
|
from data.database import Image, ImageStorage, Repository, User
|
||||||
|
from data import model
|
||||||
from util.registry.replication import queue_storage_replication
|
from util.registry.replication import queue_storage_replication
|
||||||
|
|
||||||
def backfill_replication():
|
def backfill_replication():
|
||||||
|
@ -17,9 +19,16 @@ def backfill_replication():
|
||||||
if image.storage.uuid in encountered:
|
if image.storage.uuid in encountered:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
print "Enqueueing image storage %s to be replicated" % (image.storage.uuid)
|
namespace = image.repository.namespace_user.username
|
||||||
encountered.add(image.storage.uuid)
|
locations = model.user.get_region_locations(namespace)
|
||||||
queue_storage_replication(image.repository.namespace_user.username, image.storage)
|
locations_required = locations | set(storage.default_locations)
|
||||||
|
locations_missing = locations_required - set(image.storage.locations)
|
||||||
|
if locations_missing:
|
||||||
|
print "Enqueueing image storage %s to be replicated" % (image.storage.uuid)
|
||||||
|
encountered.add(image.storage.uuid)
|
||||||
|
|
||||||
|
if not image_replication_queue.alive([image.storage.uuid]):
|
||||||
|
queue_storage_replication(image.repository.namespace_user.username, image.storage)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
|
|
|
@ -10,6 +10,7 @@ from workers.queueworker import QueueWorker, WorkerUnhealthyException
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
POLL_PERIOD_SECONDS = 10
|
POLL_PERIOD_SECONDS = 10
|
||||||
|
RESERVATION_SECONDS = app.config.get('STORAGE_REPLICATION_PROCESSING_SECONDS', 60*20)
|
||||||
|
|
||||||
class StorageReplicationWorker(QueueWorker):
|
class StorageReplicationWorker(QueueWorker):
|
||||||
def process_queue_item(self, job_details):
|
def process_queue_item(self, job_details):
|
||||||
|
@ -97,5 +98,6 @@ if __name__ == "__main__":
|
||||||
|
|
||||||
logger.debug('Starting replication worker')
|
logger.debug('Starting replication worker')
|
||||||
worker = StorageReplicationWorker(image_replication_queue,
|
worker = StorageReplicationWorker(image_replication_queue,
|
||||||
poll_period_seconds=POLL_PERIOD_SECONDS)
|
poll_period_seconds=POLL_PERIOD_SECONDS,
|
||||||
|
reservation_seconds=RESERVATION_SECONDS)
|
||||||
worker.start()
|
worker.start()
|
||||||
|
|
Reference in a new issue