import logging from sqlalchemy.types import TypeDecorator, Text from sqlalchemy.dialects.mysql import TEXT as MySQLText, LONGTEXT from random import shuffle logger = logging.getLogger(__name__) class UTF8LongText(TypeDecorator): """ Platform-independent UTF-8 LONGTEXT type. Uses MySQL's LongText with charset utf8mb4, otherwise uses TEXT, because other engines default to UTF-8 and have longer TEXT fields. """ impl = Text def load_dialect_impl(self, dialect): if dialect.name == 'mysql': return dialect.type_descriptor(LONGTEXT(charset='utf8mb4', collation='utf8mb4_unicode_ci')) else: return dialect.type_descriptor(Text()) def _chance_duplication(pop_size, samples): """ The chance of randomly selecting a duplicate when you choose the specified number of samples from the specified population size. """ pairs = (samples * (samples - 1)) / 2.0 unique = (pop_size - 1.0)/pop_size all_unique = pow(unique, pairs) return 1 - all_unique def _num_checks(pop_size, desired): """ Binary search for the proper number of entries to use to get the specified collision probability. """ s_max = pop_size s_min = 0 last_test = -1 s_test = s_max while s_max > s_min and last_test != s_test: last_test = s_test s_test = (s_max + s_min)/2 chance = _chance_duplication(pop_size, s_test) if chance > desired: s_max = s_test - 1 else: s_min = s_test return s_test def yield_random_entries(batch_query, batch_size, collision_chance): """ This method will yield semi-random items from a query in a database friendly way until no more items match the base query modifier. It will pull batches of batch_size from the query and yield enough items from each batch so that concurrent workers have a reduced chance of selecting the same items. For example, if your batches return 10,000 entries, and you desire only a .03 collision_chance, we will only use 25 random entries before going back to the db for a new batch. """ # Seed with some data which will pass the condition, but will be immediately discarded all_candidates = [1] while len(all_candidates) > 0: all_candidates = list(batch_query().limit(batch_size)) shuffle(all_candidates) num_selections = max(1, _num_checks(len(all_candidates), collision_chance)) logger.debug('Found %s/%s matching entries, processing %s', len(all_candidates), batch_size, num_selections) candidates = all_candidates[0:num_selections] for candidate in candidates: yield candidate