parent
e826b14ca4
commit
dc24e8b1a1
8 changed files with 347 additions and 70 deletions
|
@ -56,3 +56,4 @@ toposort
|
||||||
rfc3987
|
rfc3987
|
||||||
pyjwkest
|
pyjwkest
|
||||||
jsonpath-rw
|
jsonpath-rw
|
||||||
|
bintrees
|
|
@ -4,6 +4,7 @@ APScheduler==3.0.3
|
||||||
autobahn==0.9.3-3
|
autobahn==0.9.3-3
|
||||||
Babel==1.3
|
Babel==1.3
|
||||||
beautifulsoup4==4.4.0
|
beautifulsoup4==4.4.0
|
||||||
|
bintrees==2.0.2
|
||||||
blinker==1.3
|
blinker==1.3
|
||||||
boto==2.38.0
|
boto==2.38.0
|
||||||
cachetools==1.0.3
|
cachetools==1.0.3
|
||||||
|
|
132
test/test_backfill_allocator.py
Normal file
132
test/test_backfill_allocator.py
Normal file
|
@ -0,0 +1,132 @@
|
||||||
|
import unittest
|
||||||
|
import logging
|
||||||
|
import random
|
||||||
|
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
from util.migrate.allocator import CompletedKeys, NoAvailableKeysError
|
||||||
|
|
||||||
|
|
||||||
|
class CompletedTestCase(unittest.TestCase):
|
||||||
|
def test_merge_blocks_operations(self):
|
||||||
|
candidates = CompletedKeys(10)
|
||||||
|
candidates.mark_completed(1, 5)
|
||||||
|
|
||||||
|
self.assertTrue(candidates.is_available(5))
|
||||||
|
self.assertTrue(candidates.is_available(0))
|
||||||
|
self.assertFalse(candidates.is_available(1))
|
||||||
|
self.assertFalse(candidates.is_available(4))
|
||||||
|
self.assertFalse(candidates.is_available(11))
|
||||||
|
self.assertFalse(candidates.is_available(10))
|
||||||
|
self.assertEqual(1, len(candidates._slabs))
|
||||||
|
|
||||||
|
candidates.mark_completed(5, 6)
|
||||||
|
self.assertFalse(candidates.is_available(5))
|
||||||
|
self.assertTrue(candidates.is_available(6))
|
||||||
|
self.assertEqual(1, len(candidates._slabs))
|
||||||
|
|
||||||
|
candidates.mark_completed(3, 8)
|
||||||
|
self.assertTrue(candidates.is_available(9))
|
||||||
|
self.assertTrue(candidates.is_available(8))
|
||||||
|
self.assertFalse(candidates.is_available(7))
|
||||||
|
self.assertEqual(1, len(candidates._slabs))
|
||||||
|
|
||||||
|
def test_adjust_max(self):
|
||||||
|
candidates = CompletedKeys(10)
|
||||||
|
self.assertEqual(0, len(candidates._slabs))
|
||||||
|
|
||||||
|
self.assertTrue(candidates.is_available(9))
|
||||||
|
candidates.mark_completed(5, 12)
|
||||||
|
self.assertEqual(0, len(candidates._slabs))
|
||||||
|
|
||||||
|
self.assertFalse(candidates.is_available(9))
|
||||||
|
self.assertTrue(candidates.is_available(4))
|
||||||
|
|
||||||
|
def test_adjust_min(self):
|
||||||
|
candidates = CompletedKeys(10)
|
||||||
|
self.assertEqual(0, len(candidates._slabs))
|
||||||
|
|
||||||
|
self.assertTrue(candidates.is_available(2))
|
||||||
|
candidates.mark_completed(0, 3)
|
||||||
|
self.assertEqual(0, len(candidates._slabs))
|
||||||
|
|
||||||
|
self.assertFalse(candidates.is_available(2))
|
||||||
|
self.assertTrue(candidates.is_available(4))
|
||||||
|
|
||||||
|
def test_inside_block(self):
|
||||||
|
candidates = CompletedKeys(10)
|
||||||
|
candidates.mark_completed(1, 8)
|
||||||
|
self.assertEqual(1, len(candidates._slabs))
|
||||||
|
|
||||||
|
candidates.mark_completed(2, 5)
|
||||||
|
self.assertEqual(1, len(candidates._slabs))
|
||||||
|
self.assertFalse(candidates.is_available(1))
|
||||||
|
self.assertFalse(candidates.is_available(5))
|
||||||
|
|
||||||
|
def test_wrap_block(self):
|
||||||
|
candidates = CompletedKeys(10)
|
||||||
|
candidates.mark_completed(2, 5)
|
||||||
|
self.assertEqual(1, len(candidates._slabs))
|
||||||
|
|
||||||
|
candidates.mark_completed(1, 8)
|
||||||
|
self.assertEqual(1, len(candidates._slabs))
|
||||||
|
self.assertFalse(candidates.is_available(1))
|
||||||
|
self.assertFalse(candidates.is_available(5))
|
||||||
|
|
||||||
|
def test_non_contiguous(self):
|
||||||
|
candidates = CompletedKeys(10)
|
||||||
|
|
||||||
|
candidates.mark_completed(1, 5)
|
||||||
|
self.assertEqual(1, len(candidates._slabs))
|
||||||
|
self.assertTrue(candidates.is_available(5))
|
||||||
|
self.assertTrue(candidates.is_available(6))
|
||||||
|
|
||||||
|
candidates.mark_completed(6, 8)
|
||||||
|
self.assertEqual(2, len(candidates._slabs))
|
||||||
|
self.assertTrue(candidates.is_available(5))
|
||||||
|
self.assertFalse(candidates.is_available(6))
|
||||||
|
|
||||||
|
def test_big_merge(self):
|
||||||
|
candidates = CompletedKeys(10)
|
||||||
|
|
||||||
|
candidates.mark_completed(1, 5)
|
||||||
|
self.assertEqual(1, len(candidates._slabs))
|
||||||
|
|
||||||
|
candidates.mark_completed(6, 8)
|
||||||
|
self.assertEqual(2, len(candidates._slabs))
|
||||||
|
|
||||||
|
candidates.mark_completed(5, 6)
|
||||||
|
self.assertEqual(1, len(candidates._slabs))
|
||||||
|
|
||||||
|
def test_range_limits(self):
|
||||||
|
candidates = CompletedKeys(10)
|
||||||
|
self.assertFalse(candidates.is_available(-1))
|
||||||
|
self.assertFalse(candidates.is_available(10))
|
||||||
|
|
||||||
|
self.assertTrue(candidates.is_available(9))
|
||||||
|
self.assertTrue(candidates.is_available(0))
|
||||||
|
|
||||||
|
def test_random_saturation(self):
|
||||||
|
candidates = CompletedKeys(100)
|
||||||
|
with self.assertRaises(NoAvailableKeysError):
|
||||||
|
for _ in range(101):
|
||||||
|
start = candidates.get_block_start_index(10)
|
||||||
|
self.assertTrue(candidates.is_available(start))
|
||||||
|
candidates.mark_completed(start, start + 10)
|
||||||
|
|
||||||
|
def test_huge_dataset(self):
|
||||||
|
candidates = CompletedKeys(1024 * 1024)
|
||||||
|
start_time = datetime.now()
|
||||||
|
iterations = 0
|
||||||
|
with self.assertRaises(NoAvailableKeysError):
|
||||||
|
while (datetime.now() - start_time) < timedelta(seconds=10):
|
||||||
|
start = candidates.get_block_start_index(1024)
|
||||||
|
self.assertTrue(candidates.is_available(start))
|
||||||
|
candidates.mark_completed(start, start + random.randint(512, 1024))
|
||||||
|
iterations += 1
|
||||||
|
|
||||||
|
self.assertGreater(iterations, 1024)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
logging.basicConfig(level=logging.DEBUG)
|
||||||
|
unittest.main()
|
|
@ -2,7 +2,6 @@ import logging
|
||||||
|
|
||||||
from sqlalchemy.types import TypeDecorator, Text
|
from sqlalchemy.types import TypeDecorator, Text
|
||||||
from sqlalchemy.dialects.mysql import TEXT as MySQLText, LONGTEXT
|
from sqlalchemy.dialects.mysql import TEXT as MySQLText, LONGTEXT
|
||||||
from random import shuffle
|
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
@ -21,56 +20,3 @@ class UTF8LongText(TypeDecorator):
|
||||||
return dialect.type_descriptor(LONGTEXT(charset='utf8mb4', collation='utf8mb4_unicode_ci'))
|
return dialect.type_descriptor(LONGTEXT(charset='utf8mb4', collation='utf8mb4_unicode_ci'))
|
||||||
else:
|
else:
|
||||||
return dialect.type_descriptor(Text())
|
return dialect.type_descriptor(Text())
|
||||||
|
|
||||||
|
|
||||||
def _chance_duplication(pop_size, samples):
|
|
||||||
""" The chance of randomly selecting a duplicate when you choose the specified number of samples
|
|
||||||
from the specified population size.
|
|
||||||
"""
|
|
||||||
pairs = (samples * (samples - 1)) / 2.0
|
|
||||||
unique = (pop_size - 1.0)/pop_size
|
|
||||||
all_unique = pow(unique, pairs)
|
|
||||||
return 1 - all_unique
|
|
||||||
|
|
||||||
|
|
||||||
def _num_checks(pop_size, desired):
|
|
||||||
""" Binary search for the proper number of entries to use to get the specified collision
|
|
||||||
probability.
|
|
||||||
"""
|
|
||||||
s_max = pop_size
|
|
||||||
s_min = 0
|
|
||||||
last_test = -1
|
|
||||||
s_test = s_max
|
|
||||||
|
|
||||||
while s_max > s_min and last_test != s_test:
|
|
||||||
last_test = s_test
|
|
||||||
s_test = (s_max + s_min)/2
|
|
||||||
chance = _chance_duplication(pop_size, s_test)
|
|
||||||
if chance > desired:
|
|
||||||
s_max = s_test - 1
|
|
||||||
else:
|
|
||||||
s_min = s_test
|
|
||||||
|
|
||||||
return s_test
|
|
||||||
|
|
||||||
|
|
||||||
def yield_random_entries(batch_query, batch_size, collision_chance):
|
|
||||||
""" This method will yield semi-random items from a query in a database friendly way until no
|
|
||||||
more items match the base query modifier. It will pull batches of batch_size from the query
|
|
||||||
and yield enough items from each batch so that concurrent workers have a reduced chance of
|
|
||||||
selecting the same items. For example, if your batches return 10,000 entries, and you desire
|
|
||||||
only a .03 collision_chance, we will only use 25 random entries before going back to the db
|
|
||||||
for a new batch.
|
|
||||||
"""
|
|
||||||
|
|
||||||
# Seed with some data which will pass the condition, but will be immediately discarded
|
|
||||||
all_candidates = [1]
|
|
||||||
while len(all_candidates) > 0:
|
|
||||||
all_candidates = list(batch_query().limit(batch_size))
|
|
||||||
shuffle(all_candidates)
|
|
||||||
num_selections = max(1, _num_checks(len(all_candidates), collision_chance))
|
|
||||||
logger.debug('Found %s/%s matching entries, processing %s', len(all_candidates), batch_size,
|
|
||||||
num_selections)
|
|
||||||
candidates = all_candidates[0:num_selections]
|
|
||||||
for candidate in candidates:
|
|
||||||
yield candidate
|
|
||||||
|
|
156
util/migrate/allocator.py
Normal file
156
util/migrate/allocator.py
Normal file
|
@ -0,0 +1,156 @@
|
||||||
|
import logging
|
||||||
|
import random
|
||||||
|
|
||||||
|
from bintrees import RBTree
|
||||||
|
from threading import Event
|
||||||
|
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
logger.setLevel(logging.INFO)
|
||||||
|
|
||||||
|
|
||||||
|
class NoAvailableKeysError(ValueError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class CompletedKeys(object):
|
||||||
|
def __init__(self, max_index):
|
||||||
|
self._max_index = max_index
|
||||||
|
self._min_index = 0
|
||||||
|
self._slabs = RBTree()
|
||||||
|
|
||||||
|
def _get_previous_or_none(self, index):
|
||||||
|
try:
|
||||||
|
return self._slabs.floor_item(index)
|
||||||
|
except KeyError:
|
||||||
|
return None
|
||||||
|
|
||||||
|
def is_available(self, index):
|
||||||
|
logger.debug('Testing index %s', index)
|
||||||
|
if index >= self._max_index or index < self._min_index:
|
||||||
|
logger.debug('Index out of range')
|
||||||
|
return False
|
||||||
|
|
||||||
|
try:
|
||||||
|
prev_start, prev_length = self._slabs.floor_item(index)
|
||||||
|
logger.debug('Prev range: %s-%s', prev_start, prev_start + prev_length)
|
||||||
|
return (prev_start + prev_length) <= index
|
||||||
|
except KeyError:
|
||||||
|
return True
|
||||||
|
|
||||||
|
def mark_completed(self, start_index, past_last_index):
|
||||||
|
logger.debug('Marking the range completed: %s-%s', start_index, past_last_index)
|
||||||
|
# Find the item directly before this and see if there is overlap
|
||||||
|
to_discard = set()
|
||||||
|
try:
|
||||||
|
prev_start, prev_length = self._slabs.floor_item(start_index)
|
||||||
|
if prev_start + prev_length >= start_index:
|
||||||
|
# we are going to merge with the range before us
|
||||||
|
logger.debug('Merging with the prev range: %s-%s', prev_start, prev_start + prev_length)
|
||||||
|
to_discard.add(prev_start)
|
||||||
|
start_index = prev_start
|
||||||
|
past_last_index = max(past_last_index, prev_start + prev_length)
|
||||||
|
except KeyError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Find all keys between the start and last index and merge them into one block
|
||||||
|
for merge_start, merge_length in self._slabs.iter_items(start_index, past_last_index + 1):
|
||||||
|
candidate_next_index = merge_start + merge_length
|
||||||
|
logger.debug('Merging with block %s-%s', merge_start, candidate_next_index)
|
||||||
|
to_discard.add(merge_start)
|
||||||
|
past_last_index = max(past_last_index, candidate_next_index)
|
||||||
|
|
||||||
|
# write the new block which is fully merged
|
||||||
|
discard = False
|
||||||
|
if past_last_index >= self._max_index:
|
||||||
|
logger.debug('Discarding block and setting new max to: %s', start_index)
|
||||||
|
self._max_index = start_index
|
||||||
|
discard = True
|
||||||
|
|
||||||
|
if start_index <= self._min_index:
|
||||||
|
logger.debug('Discarding block and setting new min to: %s', past_last_index)
|
||||||
|
self._min_index = past_last_index
|
||||||
|
discard = True
|
||||||
|
|
||||||
|
if to_discard:
|
||||||
|
logger.debug('Discarding %s obsolte blocks', len(to_discard))
|
||||||
|
self._slabs.remove_items(to_discard)
|
||||||
|
|
||||||
|
if not discard:
|
||||||
|
logger.debug('Writing new block with range: %s-%s', start_index, past_last_index)
|
||||||
|
self._slabs.insert(start_index, past_last_index - start_index)
|
||||||
|
|
||||||
|
logger.debug('Total blocks: %s', len(self._slabs))
|
||||||
|
|
||||||
|
def get_block_start_index(self, block_size_estimate):
|
||||||
|
logger.debug('Total range: %s-%s', self._min_index, self._max_index)
|
||||||
|
if self._max_index <= self._min_index:
|
||||||
|
raise NoAvailableKeysError('All indexes have been marked completed')
|
||||||
|
|
||||||
|
num_holes = len(self._slabs) + 1
|
||||||
|
random_hole = random.randint(0, num_holes - 1)
|
||||||
|
logger.debug('Selected random hole %s with %s total holes', random_hole, num_holes)
|
||||||
|
|
||||||
|
hole_start = self._min_index
|
||||||
|
past_hole_end = self._max_index
|
||||||
|
|
||||||
|
# Now that we have picked a hole, we need to define the bounds
|
||||||
|
if random_hole > 0:
|
||||||
|
# There will be a slab before this hole, find where it ends
|
||||||
|
bound_entries = self._slabs.nsmallest(random_hole + 1)[-2:]
|
||||||
|
left_index, left_len = bound_entries[0]
|
||||||
|
logger.debug('Left range %s-%s', left_index, left_index + left_len)
|
||||||
|
hole_start = left_index + left_len
|
||||||
|
|
||||||
|
if len(bound_entries) > 1:
|
||||||
|
right_index, right_len = bound_entries[1]
|
||||||
|
logger.debug('Right range %s-%s', right_index, right_index + right_len)
|
||||||
|
past_hole_end, _ = bound_entries[1]
|
||||||
|
elif not self._slabs.is_empty():
|
||||||
|
right_index, right_len = self._slabs.nsmallest(1)[0]
|
||||||
|
logger.debug('Right range %s-%s', right_index, right_index + right_len)
|
||||||
|
past_hole_end, _ = self._slabs.nsmallest(1)[0]
|
||||||
|
|
||||||
|
# Now that we have our hole bounds, select a random block from [0:len - block_size_estimate]
|
||||||
|
logger.debug('Selecting from hole range: %s-%s', hole_start, past_hole_end)
|
||||||
|
rand_max_bound = max(hole_start, past_hole_end - block_size_estimate)
|
||||||
|
logger.debug('Rand max bound: %s', rand_max_bound)
|
||||||
|
return random.randint(hole_start, rand_max_bound)
|
||||||
|
|
||||||
|
|
||||||
|
def yield_random_entries(batch_query, primary_key_field, batch_size, max_id):
|
||||||
|
""" This method will yield items from random blocks in the database. We will track metadata
|
||||||
|
about which keys are available for work, and we will complete the backfill when there is no
|
||||||
|
more work to be done. The method yields tupes of (candidate, Event), and if the work was
|
||||||
|
already done by another worker, the caller should set the event. Batch candidates must have
|
||||||
|
an "id" field which can be inspected.
|
||||||
|
"""
|
||||||
|
|
||||||
|
allocator = CompletedKeys(max_id + 1)
|
||||||
|
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
start_index = allocator.get_block_start_index(batch_size)
|
||||||
|
all_candidates = list(batch_query()
|
||||||
|
.limit(batch_size)
|
||||||
|
.where(primary_key_field >= start_index))
|
||||||
|
|
||||||
|
if len(all_candidates) == 0:
|
||||||
|
logger.debug('No candidates, new highest id: %s', start_index)
|
||||||
|
allocator.mark_completed(start_index, max_id)
|
||||||
|
continue
|
||||||
|
|
||||||
|
logger.debug('Found %s candidates, processing block')
|
||||||
|
for candidate in all_candidates:
|
||||||
|
abort_early = Event()
|
||||||
|
yield candidate, abort_early
|
||||||
|
if abort_early.is_set():
|
||||||
|
logger.debug('Overlap with another worker, aborting')
|
||||||
|
break
|
||||||
|
|
||||||
|
completed_through = candidate.id + 1
|
||||||
|
logger.debug('Marking id range as completed: %s-%s', start_index, completed_through)
|
||||||
|
allocator.mark_completed(start_index, completed_through)
|
||||||
|
|
||||||
|
except NoAvailableKeysError:
|
||||||
|
logger.debug('No more work')
|
|
@ -3,12 +3,12 @@ import logging
|
||||||
from peewee import JOIN_LEFT_OUTER
|
from peewee import JOIN_LEFT_OUTER
|
||||||
|
|
||||||
from peewee import (CharField, BigIntegerField, BooleanField, ForeignKeyField, DateTimeField,
|
from peewee import (CharField, BigIntegerField, BooleanField, ForeignKeyField, DateTimeField,
|
||||||
TextField)
|
TextField, fn)
|
||||||
|
|
||||||
from data.database import BaseModel, db, db_for_update, CloseForLongOperation
|
from data.database import BaseModel, db, db_for_update, CloseForLongOperation
|
||||||
from app import app, storage
|
from app import app, storage
|
||||||
from digest import checksums
|
from digest import checksums
|
||||||
from util.migrate import yield_random_entries
|
from util.migrate.allocator import yield_random_entries
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
@ -76,7 +76,9 @@ def backfill_content_checksums():
|
||||||
.select(ImageStorage.id, ImageStorage.uuid)
|
.select(ImageStorage.id, ImageStorage.uuid)
|
||||||
.where(ImageStorage.content_checksum >> None, ImageStorage.uploading == False))
|
.where(ImageStorage.content_checksum >> None, ImageStorage.uploading == False))
|
||||||
|
|
||||||
for candidate_storage in yield_random_entries(batch_query, 10000, 0.1):
|
max_id = ImageStorage.select(fn.Max(ImageStorage.id)).scalar()
|
||||||
|
|
||||||
|
for candidate_storage, abort in yield_random_entries(batch_query, ImageStorage.id, 1000, max_id):
|
||||||
logger.debug('Computing content checksum for storage: %s', candidate_storage.uuid)
|
logger.debug('Computing content checksum for storage: %s', candidate_storage.uuid)
|
||||||
|
|
||||||
locations = _get_image_storage_locations(candidate_storage.id)
|
locations = _get_image_storage_locations(candidate_storage.id)
|
||||||
|
@ -97,12 +99,13 @@ def backfill_content_checksums():
|
||||||
to_update = db_for_update(ImageStorage.get(ImageStorage.id == candidate_storage.id))
|
to_update = db_for_update(ImageStorage.get(ImageStorage.id == candidate_storage.id))
|
||||||
if to_update.content_checksum is not None:
|
if to_update.content_checksum is not None:
|
||||||
logger.info('Another worker filled in the checksum: %s', candidate_storage.uuid)
|
logger.info('Another worker filled in the checksum: %s', candidate_storage.uuid)
|
||||||
|
abort.set()
|
||||||
else:
|
else:
|
||||||
logger.debug('Setting content checksum to %s for %s', checksum, candidate_storage.uuid)
|
logger.debug('Setting content checksum to %s for %s', checksum, candidate_storage.uuid)
|
||||||
to_update.content_checksum = checksum
|
to_update.content_checksum = checksum
|
||||||
to_update.save()
|
to_update.save()
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == '__main__':
|
||||||
logging.basicConfig(level=logging.DEBUG)
|
logging.basicConfig(level=logging.DEBUG)
|
||||||
# logging.getLogger('peewee').setLevel(logging.CRITICAL)
|
# logging.getLogger('peewee').setLevel(logging.CRITICAL)
|
||||||
backfill_content_checksums()
|
backfill_content_checksums()
|
||||||
|
|
|
@ -1,8 +1,41 @@
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
from data.database import Image, ImageStorage, db, db_for_update
|
from data.database import BaseModel, db, db_for_update
|
||||||
|
from peewee import (fn, CharField, BigIntegerField, ForeignKeyField, BooleanField, DateTimeField,
|
||||||
|
TextField, IntegerField)
|
||||||
from app import app
|
from app import app
|
||||||
from util.migrate import yield_random_entries
|
from util.migrate.allocator import yield_random_entries
|
||||||
|
|
||||||
|
class Repository(BaseModel):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
# Vendor the information from tables we will be writing to at the time of this migration
|
||||||
|
class ImageStorage(BaseModel):
|
||||||
|
uuid = CharField(index=True, unique=True)
|
||||||
|
checksum = CharField(null=True)
|
||||||
|
image_size = BigIntegerField(null=True)
|
||||||
|
uncompressed_size = BigIntegerField(null=True)
|
||||||
|
uploading = BooleanField(default=True, null=True)
|
||||||
|
cas_path = BooleanField(default=True)
|
||||||
|
content_checksum = CharField(null=True, index=True)
|
||||||
|
|
||||||
|
|
||||||
|
class Image(BaseModel):
|
||||||
|
docker_image_id = CharField(index=True)
|
||||||
|
repository = ForeignKeyField(Repository)
|
||||||
|
ancestors = CharField(index=True, default='/', max_length=64535, null=True)
|
||||||
|
storage = ForeignKeyField(ImageStorage, index=True, null=True)
|
||||||
|
created = DateTimeField(null=True)
|
||||||
|
comment = TextField(null=True)
|
||||||
|
command = TextField(null=True)
|
||||||
|
aggregate_size = BigIntegerField(null=True)
|
||||||
|
v1_json_metadata = TextField(null=True)
|
||||||
|
v1_checksum = CharField(null=True)
|
||||||
|
|
||||||
|
security_indexed = BooleanField(default=False)
|
||||||
|
security_indexed_engine = IntegerField(default=-1)
|
||||||
|
parent_id = IntegerField(index=True, null=True)
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
@ -21,20 +54,23 @@ def backfill_parent_id():
|
||||||
.where(Image.parent_id >> None, Image.ancestors != '/',
|
.where(Image.parent_id >> None, Image.ancestors != '/',
|
||||||
ImageStorage.uploading == False))
|
ImageStorage.uploading == False))
|
||||||
|
|
||||||
for to_backfill in yield_random_entries(fetch_batch, 10000, 0.3):
|
max_id = Image.select(fn.Max(Image.id)).scalar()
|
||||||
|
|
||||||
|
for to_backfill, abort in yield_random_entries(fetch_batch, Image.id, 1000, max_id):
|
||||||
with app.config['DB_TRANSACTION_FACTORY'](db):
|
with app.config['DB_TRANSACTION_FACTORY'](db):
|
||||||
try:
|
try:
|
||||||
image = db_for_update(Image
|
image = db_for_update(Image
|
||||||
.select()
|
.select()
|
||||||
.where(Image.id == to_backfill.id)).get()
|
.where(Image.id == to_backfill.id, Image.parent_id >> None)).get()
|
||||||
image.parent_id = int(to_backfill.ancestors.split('/')[-2])
|
image.parent_id = int(to_backfill.ancestors.split('/')[-2])
|
||||||
image.save()
|
image.save()
|
||||||
except Image.DoesNotExist:
|
except Image.DoesNotExist:
|
||||||
pass
|
logger.info('Collision with another worker, aborting batch')
|
||||||
|
abort.set()
|
||||||
|
|
||||||
logger.debug('backfill_parent_id: Completed')
|
logger.debug('backfill_parent_id: Completed')
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == '__main__':
|
||||||
logging.basicConfig(level=logging.DEBUG)
|
logging.basicConfig(level=logging.DEBUG)
|
||||||
logging.getLogger('peewee').setLevel(logging.CRITICAL)
|
logging.getLogger('peewee').setLevel(logging.CRITICAL)
|
||||||
|
|
||||||
|
|
|
@ -1,9 +1,9 @@
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
from peewee import (CharField, BigIntegerField, BooleanField, ForeignKeyField, DateTimeField,
|
from peewee import (CharField, BigIntegerField, BooleanField, ForeignKeyField, DateTimeField,
|
||||||
TextField)
|
TextField, fn)
|
||||||
from data.database import BaseModel, db, db_for_update
|
from data.database import BaseModel, db, db_for_update
|
||||||
from util.migrate import yield_random_entries
|
from util.migrate.allocator import yield_random_entries
|
||||||
from app import app
|
from app import app
|
||||||
|
|
||||||
|
|
||||||
|
@ -48,20 +48,22 @@ def backfill_checksums():
|
||||||
.where(Image.v1_checksum >> None, ImageStorage.uploading == False,
|
.where(Image.v1_checksum >> None, ImageStorage.uploading == False,
|
||||||
~(ImageStorage.checksum >> None)))
|
~(ImageStorage.checksum >> None)))
|
||||||
|
|
||||||
for candidate_image in yield_random_entries(batch_query, 10000, 0.1):
|
max_id = Image.select(fn.Max(Image.id)).scalar()
|
||||||
logger.debug('Computing content checksum for storage: %s', candidate_image.id)
|
|
||||||
|
|
||||||
|
for candidate_image, abort in yield_random_entries(batch_query, Image.id, 1000, max_id):
|
||||||
with app.config['DB_TRANSACTION_FACTORY'](db):
|
with app.config['DB_TRANSACTION_FACTORY'](db):
|
||||||
try:
|
try:
|
||||||
image = db_for_update(Image
|
image = db_for_update(Image
|
||||||
.select(Image, ImageStorage)
|
.select(Image, ImageStorage)
|
||||||
.join(ImageStorage)
|
.join(ImageStorage)
|
||||||
.where(Image.id == candidate_image.id)).get()
|
.where(Image.id == candidate_image.id,
|
||||||
|
Image.v1_checksum >> None)).get()
|
||||||
|
|
||||||
image.v1_checksum = image.storage.checksum
|
image.v1_checksum = image.storage.checksum
|
||||||
image.save()
|
image.save()
|
||||||
except Image.DoesNotExist:
|
except Image.DoesNotExist:
|
||||||
pass
|
logger.info('Collision with another worker, aborting batch')
|
||||||
|
abort.set()
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|
Reference in a new issue