Merge pull request #839 from jakedt/migratelogs

Backfill migrations in batches with tracked ids
This commit is contained in:
Jake Moshenko 2015-11-09 22:29:54 -05:00
commit 493d077f62
9 changed files with 348 additions and 71 deletions

View file

@ -64,5 +64,5 @@ level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
format = %(asctime)s %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

View file

@ -56,3 +56,4 @@ toposort
rfc3987
pyjwkest
jsonpath-rw
bintrees

View file

@ -4,6 +4,7 @@ APScheduler==3.0.3
autobahn==0.9.3-3
Babel==1.3
beautifulsoup4==4.4.0
bintrees==2.0.2
blinker==1.3
boto==2.38.0
cachetools==1.0.3

View file

@ -0,0 +1,132 @@
import unittest
import logging
import random
from datetime import datetime, timedelta
from util.migrate.allocator import CompletedKeys, NoAvailableKeysError
class CompletedTestCase(unittest.TestCase):
def test_merge_blocks_operations(self):
candidates = CompletedKeys(10)
candidates.mark_completed(1, 5)
self.assertTrue(candidates.is_available(5))
self.assertTrue(candidates.is_available(0))
self.assertFalse(candidates.is_available(1))
self.assertFalse(candidates.is_available(4))
self.assertFalse(candidates.is_available(11))
self.assertFalse(candidates.is_available(10))
self.assertEqual(1, len(candidates._slabs))
candidates.mark_completed(5, 6)
self.assertFalse(candidates.is_available(5))
self.assertTrue(candidates.is_available(6))
self.assertEqual(1, len(candidates._slabs))
candidates.mark_completed(3, 8)
self.assertTrue(candidates.is_available(9))
self.assertTrue(candidates.is_available(8))
self.assertFalse(candidates.is_available(7))
self.assertEqual(1, len(candidates._slabs))
def test_adjust_max(self):
candidates = CompletedKeys(10)
self.assertEqual(0, len(candidates._slabs))
self.assertTrue(candidates.is_available(9))
candidates.mark_completed(5, 12)
self.assertEqual(0, len(candidates._slabs))
self.assertFalse(candidates.is_available(9))
self.assertTrue(candidates.is_available(4))
def test_adjust_min(self):
candidates = CompletedKeys(10)
self.assertEqual(0, len(candidates._slabs))
self.assertTrue(candidates.is_available(2))
candidates.mark_completed(0, 3)
self.assertEqual(0, len(candidates._slabs))
self.assertFalse(candidates.is_available(2))
self.assertTrue(candidates.is_available(4))
def test_inside_block(self):
candidates = CompletedKeys(10)
candidates.mark_completed(1, 8)
self.assertEqual(1, len(candidates._slabs))
candidates.mark_completed(2, 5)
self.assertEqual(1, len(candidates._slabs))
self.assertFalse(candidates.is_available(1))
self.assertFalse(candidates.is_available(5))
def test_wrap_block(self):
candidates = CompletedKeys(10)
candidates.mark_completed(2, 5)
self.assertEqual(1, len(candidates._slabs))
candidates.mark_completed(1, 8)
self.assertEqual(1, len(candidates._slabs))
self.assertFalse(candidates.is_available(1))
self.assertFalse(candidates.is_available(5))
def test_non_contiguous(self):
candidates = CompletedKeys(10)
candidates.mark_completed(1, 5)
self.assertEqual(1, len(candidates._slabs))
self.assertTrue(candidates.is_available(5))
self.assertTrue(candidates.is_available(6))
candidates.mark_completed(6, 8)
self.assertEqual(2, len(candidates._slabs))
self.assertTrue(candidates.is_available(5))
self.assertFalse(candidates.is_available(6))
def test_big_merge(self):
candidates = CompletedKeys(10)
candidates.mark_completed(1, 5)
self.assertEqual(1, len(candidates._slabs))
candidates.mark_completed(6, 8)
self.assertEqual(2, len(candidates._slabs))
candidates.mark_completed(5, 6)
self.assertEqual(1, len(candidates._slabs))
def test_range_limits(self):
candidates = CompletedKeys(10)
self.assertFalse(candidates.is_available(-1))
self.assertFalse(candidates.is_available(10))
self.assertTrue(candidates.is_available(9))
self.assertTrue(candidates.is_available(0))
def test_random_saturation(self):
candidates = CompletedKeys(100)
with self.assertRaises(NoAvailableKeysError):
for _ in range(101):
start = candidates.get_block_start_index(10)
self.assertTrue(candidates.is_available(start))
candidates.mark_completed(start, start + 10)
def test_huge_dataset(self):
candidates = CompletedKeys(1024 * 1024)
start_time = datetime.now()
iterations = 0
with self.assertRaises(NoAvailableKeysError):
while (datetime.now() - start_time) < timedelta(seconds=10):
start = candidates.get_block_start_index(1024)
self.assertTrue(candidates.is_available(start))
candidates.mark_completed(start, start + random.randint(512, 1024))
iterations += 1
self.assertGreater(iterations, 1024)
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
unittest.main()

View file

@ -2,7 +2,6 @@ import logging
from sqlalchemy.types import TypeDecorator, Text
from sqlalchemy.dialects.mysql import TEXT as MySQLText, LONGTEXT
from random import shuffle
logger = logging.getLogger(__name__)
@ -21,56 +20,3 @@ class UTF8LongText(TypeDecorator):
return dialect.type_descriptor(LONGTEXT(charset='utf8mb4', collation='utf8mb4_unicode_ci'))
else:
return dialect.type_descriptor(Text())
def _chance_duplication(pop_size, samples):
""" The chance of randomly selecting a duplicate when you choose the specified number of samples
from the specified population size.
"""
pairs = (samples * (samples - 1)) / 2.0
unique = (pop_size - 1.0)/pop_size
all_unique = pow(unique, pairs)
return 1 - all_unique
def _num_checks(pop_size, desired):
""" Binary search for the proper number of entries to use to get the specified collision
probability.
"""
s_max = pop_size
s_min = 0
last_test = -1
s_test = s_max
while s_max > s_min and last_test != s_test:
last_test = s_test
s_test = (s_max + s_min)/2
chance = _chance_duplication(pop_size, s_test)
if chance > desired:
s_max = s_test - 1
else:
s_min = s_test
return s_test
def yield_random_entries(batch_query, batch_size, collision_chance):
""" This method will yield semi-random items from a query in a database friendly way until no
more items match the base query modifier. It will pull batches of batch_size from the query
and yield enough items from each batch so that concurrent workers have a reduced chance of
selecting the same items. For example, if your batches return 10,000 entries, and you desire
only a .03 collision_chance, we will only use 25 random entries before going back to the db
for a new batch.
"""
# Seed with some data which will pass the condition, but will be immediately discarded
all_candidates = [1]
while len(all_candidates) > 0:
all_candidates = list(batch_query().limit(batch_size))
shuffle(all_candidates)
num_selections = max(1, _num_checks(len(all_candidates), collision_chance))
logger.debug('Found %s/%s matching entries, processing %s', len(all_candidates), batch_size,
num_selections)
candidates = all_candidates[0:num_selections]
for candidate in candidates:
yield candidate

156
util/migrate/allocator.py Normal file
View file

@ -0,0 +1,156 @@
import logging
import random
from bintrees import RBTree
from threading import Event
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
class NoAvailableKeysError(ValueError):
pass
class CompletedKeys(object):
def __init__(self, max_index):
self._max_index = max_index
self._min_index = 0
self._slabs = RBTree()
def _get_previous_or_none(self, index):
try:
return self._slabs.floor_item(index)
except KeyError:
return None
def is_available(self, index):
logger.debug('Testing index %s', index)
if index >= self._max_index or index < self._min_index:
logger.debug('Index out of range')
return False
try:
prev_start, prev_length = self._slabs.floor_item(index)
logger.debug('Prev range: %s-%s', prev_start, prev_start + prev_length)
return (prev_start + prev_length) <= index
except KeyError:
return True
def mark_completed(self, start_index, past_last_index):
logger.debug('Marking the range completed: %s-%s', start_index, past_last_index)
# Find the item directly before this and see if there is overlap
to_discard = set()
try:
prev_start, prev_length = self._slabs.floor_item(start_index)
if prev_start + prev_length >= start_index:
# we are going to merge with the range before us
logger.debug('Merging with the prev range: %s-%s', prev_start, prev_start + prev_length)
to_discard.add(prev_start)
start_index = prev_start
past_last_index = max(past_last_index, prev_start + prev_length)
except KeyError:
pass
# Find all keys between the start and last index and merge them into one block
for merge_start, merge_length in self._slabs.iter_items(start_index, past_last_index + 1):
candidate_next_index = merge_start + merge_length
logger.debug('Merging with block %s-%s', merge_start, candidate_next_index)
to_discard.add(merge_start)
past_last_index = max(past_last_index, candidate_next_index)
# write the new block which is fully merged
discard = False
if past_last_index >= self._max_index:
logger.debug('Discarding block and setting new max to: %s', start_index)
self._max_index = start_index
discard = True
if start_index <= self._min_index:
logger.debug('Discarding block and setting new min to: %s', past_last_index)
self._min_index = past_last_index
discard = True
if to_discard:
logger.debug('Discarding %s obsolte blocks', len(to_discard))
self._slabs.remove_items(to_discard)
if not discard:
logger.debug('Writing new block with range: %s-%s', start_index, past_last_index)
self._slabs.insert(start_index, past_last_index - start_index)
logger.debug('Total blocks: %s', len(self._slabs))
def get_block_start_index(self, block_size_estimate):
logger.debug('Total range: %s-%s', self._min_index, self._max_index)
if self._max_index <= self._min_index:
raise NoAvailableKeysError('All indexes have been marked completed')
num_holes = len(self._slabs) + 1
random_hole = random.randint(0, num_holes - 1)
logger.debug('Selected random hole %s with %s total holes', random_hole, num_holes)
hole_start = self._min_index
past_hole_end = self._max_index
# Now that we have picked a hole, we need to define the bounds
if random_hole > 0:
# There will be a slab before this hole, find where it ends
bound_entries = self._slabs.nsmallest(random_hole + 1)[-2:]
left_index, left_len = bound_entries[0]
logger.debug('Left range %s-%s', left_index, left_index + left_len)
hole_start = left_index + left_len
if len(bound_entries) > 1:
right_index, right_len = bound_entries[1]
logger.debug('Right range %s-%s', right_index, right_index + right_len)
past_hole_end, _ = bound_entries[1]
elif not self._slabs.is_empty():
right_index, right_len = self._slabs.nsmallest(1)[0]
logger.debug('Right range %s-%s', right_index, right_index + right_len)
past_hole_end, _ = self._slabs.nsmallest(1)[0]
# Now that we have our hole bounds, select a random block from [0:len - block_size_estimate]
logger.debug('Selecting from hole range: %s-%s', hole_start, past_hole_end)
rand_max_bound = max(hole_start, past_hole_end - block_size_estimate)
logger.debug('Rand max bound: %s', rand_max_bound)
return random.randint(hole_start, rand_max_bound)
def yield_random_entries(batch_query, primary_key_field, batch_size, max_id):
""" This method will yield items from random blocks in the database. We will track metadata
about which keys are available for work, and we will complete the backfill when there is no
more work to be done. The method yields tupes of (candidate, Event), and if the work was
already done by another worker, the caller should set the event. Batch candidates must have
an "id" field which can be inspected.
"""
allocator = CompletedKeys(max_id + 1)
try:
while True:
start_index = allocator.get_block_start_index(batch_size)
all_candidates = list(batch_query()
.limit(batch_size)
.where(primary_key_field >= start_index))
if len(all_candidates) == 0:
logger.debug('No candidates, new highest id: %s', start_index)
allocator.mark_completed(start_index, max_id)
continue
logger.debug('Found %s candidates, processing block')
for candidate in all_candidates:
abort_early = Event()
yield candidate, abort_early
if abort_early.is_set():
logger.debug('Overlap with another worker, aborting')
break
completed_through = candidate.id + 1
logger.debug('Marking id range as completed: %s-%s', start_index, completed_through)
allocator.mark_completed(start_index, completed_through)
except NoAvailableKeysError:
logger.debug('No more work')

View file

@ -3,12 +3,12 @@ import logging
from peewee import JOIN_LEFT_OUTER
from peewee import (CharField, BigIntegerField, BooleanField, ForeignKeyField, DateTimeField,
TextField)
TextField, fn)
from data.database import BaseModel, db, db_for_update, CloseForLongOperation
from app import app, storage
from digest import checksums
from util.migrate import yield_random_entries
from util.migrate.allocator import yield_random_entries
logger = logging.getLogger(__name__)
@ -76,7 +76,9 @@ def backfill_content_checksums():
.select(ImageStorage.id, ImageStorage.uuid)
.where(ImageStorage.content_checksum >> None, ImageStorage.uploading == False))
for candidate_storage in yield_random_entries(batch_query, 10000, 0.1):
max_id = ImageStorage.select(fn.Max(ImageStorage.id)).scalar()
for candidate_storage, abort in yield_random_entries(batch_query, ImageStorage.id, 1000, max_id):
logger.debug('Computing content checksum for storage: %s', candidate_storage.uuid)
locations = _get_image_storage_locations(candidate_storage.id)
@ -97,12 +99,13 @@ def backfill_content_checksums():
to_update = db_for_update(ImageStorage.get(ImageStorage.id == candidate_storage.id))
if to_update.content_checksum is not None:
logger.info('Another worker filled in the checksum: %s', candidate_storage.uuid)
abort.set()
else:
logger.debug('Setting content checksum to %s for %s', checksum, candidate_storage.uuid)
to_update.content_checksum = checksum
to_update.save()
if __name__ == "__main__":
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
# logging.getLogger('peewee').setLevel(logging.CRITICAL)
backfill_content_checksums()

View file

@ -1,8 +1,41 @@
import logging
from data.database import Image, ImageStorage, db, db_for_update
from data.database import BaseModel, db, db_for_update
from peewee import (fn, CharField, BigIntegerField, ForeignKeyField, BooleanField, DateTimeField,
TextField, IntegerField)
from app import app
from util.migrate import yield_random_entries
from util.migrate.allocator import yield_random_entries
class Repository(BaseModel):
pass
# Vendor the information from tables we will be writing to at the time of this migration
class ImageStorage(BaseModel):
uuid = CharField(index=True, unique=True)
checksum = CharField(null=True)
image_size = BigIntegerField(null=True)
uncompressed_size = BigIntegerField(null=True)
uploading = BooleanField(default=True, null=True)
cas_path = BooleanField(default=True)
content_checksum = CharField(null=True, index=True)
class Image(BaseModel):
docker_image_id = CharField(index=True)
repository = ForeignKeyField(Repository)
ancestors = CharField(index=True, default='/', max_length=64535, null=True)
storage = ForeignKeyField(ImageStorage, index=True, null=True)
created = DateTimeField(null=True)
comment = TextField(null=True)
command = TextField(null=True)
aggregate_size = BigIntegerField(null=True)
v1_json_metadata = TextField(null=True)
v1_checksum = CharField(null=True)
security_indexed = BooleanField(default=False)
security_indexed_engine = IntegerField(default=-1)
parent_id = IntegerField(index=True, null=True)
logger = logging.getLogger(__name__)
@ -21,20 +54,23 @@ def backfill_parent_id():
.where(Image.parent_id >> None, Image.ancestors != '/',
ImageStorage.uploading == False))
for to_backfill in yield_random_entries(fetch_batch, 10000, 0.3):
max_id = Image.select(fn.Max(Image.id)).scalar()
for to_backfill, abort in yield_random_entries(fetch_batch, Image.id, 1000, max_id):
with app.config['DB_TRANSACTION_FACTORY'](db):
try:
image = db_for_update(Image
.select()
.where(Image.id == to_backfill.id)).get()
.where(Image.id == to_backfill.id, Image.parent_id >> None)).get()
image.parent_id = int(to_backfill.ancestors.split('/')[-2])
image.save()
except Image.DoesNotExist:
pass
logger.info('Collision with another worker, aborting batch')
abort.set()
logger.debug('backfill_parent_id: Completed')
if __name__ == "__main__":
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
logging.getLogger('peewee').setLevel(logging.CRITICAL)

View file

@ -1,9 +1,9 @@
import logging
from peewee import (CharField, BigIntegerField, BooleanField, ForeignKeyField, DateTimeField,
TextField)
TextField, fn)
from data.database import BaseModel, db, db_for_update
from util.migrate import yield_random_entries
from util.migrate.allocator import yield_random_entries
from app import app
@ -48,20 +48,22 @@ def backfill_checksums():
.where(Image.v1_checksum >> None, ImageStorage.uploading == False,
~(ImageStorage.checksum >> None)))
for candidate_image in yield_random_entries(batch_query, 10000, 0.1):
logger.debug('Computing content checksum for storage: %s', candidate_image.id)
max_id = Image.select(fn.Max(Image.id)).scalar()
for candidate_image, abort in yield_random_entries(batch_query, Image.id, 1000, max_id):
with app.config['DB_TRANSACTION_FACTORY'](db):
try:
image = db_for_update(Image
.select(Image, ImageStorage)
.join(ImageStorage)
.where(Image.id == candidate_image.id)).get()
.where(Image.id == candidate_image.id,
Image.v1_checksum >> None)).get()
image.v1_checksum = image.storage.checksum
image.save()
except Image.DoesNotExist:
pass
logger.info('Collision with another worker, aborting batch')
abort.set()
if __name__ == "__main__":