Read the number of unscanned clair images from the block allocator

This commit is contained in:
Jake Moshenko 2017-02-21 19:13:51 -05:00
parent 0214b1ba9f
commit b03e03c389
5 changed files with 61 additions and 15 deletions

View file

@ -17,6 +17,7 @@ class CompletedKeys(object):
def __init__(self, max_index, min_index=0):
self._max_index = max_index
self._min_index = min_index
self.num_remaining = max_index - min_index
self._slabs = RBTree()
def _get_previous_or_none(self, index):
@ -40,14 +41,18 @@ class CompletedKeys(object):
def mark_completed(self, start_index, past_last_index):
logger.debug('Marking the range completed: %s-%s', start_index, past_last_index)
num_completed = min(past_last_index, self._max_index) - max(start_index, self._min_index)
# Find the item directly before this and see if there is overlap
to_discard = set()
try:
prev_start, prev_length = self._slabs.floor_item(start_index)
if prev_start + prev_length >= start_index:
max_prev_completed = prev_start + prev_length
if max_prev_completed >= start_index:
# we are going to merge with the range before us
logger.debug('Merging with the prev range: %s-%s', prev_start, prev_start + prev_length)
to_discard.add(prev_start)
num_completed = max(num_completed - (max_prev_completed - start_index), 0)
start_index = prev_start
past_last_index = max(past_last_index, prev_start + prev_length)
except KeyError:
@ -55,8 +60,13 @@ class CompletedKeys(object):
# Find all keys between the start and last index and merge them into one block
for merge_start, merge_length in self._slabs.iter_items(start_index, past_last_index + 1):
if merge_start in to_discard:
logger.debug('Already merged with block %s-%s', merge_start, merge_start + merge_length)
continue
candidate_next_index = merge_start + merge_length
logger.debug('Merging with block %s-%s', merge_start, candidate_next_index)
num_completed -= merge_length - max(candidate_next_index - past_last_index, 0)
to_discard.add(merge_start)
past_last_index = max(past_last_index, candidate_next_index)
@ -73,13 +83,16 @@ class CompletedKeys(object):
discard = True
if to_discard:
logger.debug('Discarding %s obsolte blocks', len(to_discard))
logger.debug('Discarding %s obsolete blocks', len(to_discard))
self._slabs.remove_items(to_discard)
if not discard:
logger.debug('Writing new block with range: %s-%s', start_index, past_last_index)
self._slabs.insert(start_index, past_last_index - start_index)
# Update the number of remaining items with the adjustments we've made
assert num_completed >= 0
self.num_remaining -= num_completed
logger.debug('Total blocks: %s', len(self._slabs))
def get_block_start_index(self, block_size_estimate):
@ -145,9 +158,11 @@ def yield_random_entries(batch_query, primary_key_field, batch_size, max_id, min
continue
logger.info('Found %s candidates, processing block', len(all_candidates))
batch_completed = 0
for candidate in all_candidates:
abort_early = Event()
yield candidate, abort_early
yield candidate, abort_early, allocator.num_remaining - batch_completed
batch_completed += 1
if abort_early.is_set():
logger.info('Overlap with another worker, aborting')
break