diff --git a/data/database.py b/data/database.py index 8a7a87004..caf4f560b 100644 --- a/data/database.py +++ b/data/database.py @@ -451,8 +451,8 @@ class User(BaseModel): TagManifest, AccessToken, OAuthAccessToken, BlobUpload, RepositoryNotification, OAuthAuthorizationCode, RepositoryActionCount, TagManifestLabel, Tag, - ManifestLabel, BlobUploading, TeamSync} | beta_classes - + ManifestLabel, BlobUploading, TeamSync, + RepositorySearchScore} | beta_classes delete_instance_filtered(self, User, delete_nullable, skip_transitive_deletes) @@ -584,10 +584,18 @@ class Repository(BaseModel): # These models don't need to use transitive deletes, because the referenced objects # are cleaned up directly skip_transitive_deletes = {RepositoryTag, RepositoryBuild, RepositoryBuildTrigger, BlobUpload, - Image, TagManifest, TagManifestLabel, Label, DerivedStorageForImage} | beta_classes + Image, TagManifest, TagManifestLabel, Label, DerivedStorageForImage, + RepositorySearchScore} | beta_classes + delete_instance_filtered(self, Repository, delete_nullable, skip_transitive_deletes) +class RepositorySearchScore(BaseModel): + repository = ForeignKeyField(Repository, unique=True) + score = BigIntegerField(index=True, default=0) + last_updated = DateTimeField(null=True) + + class Star(BaseModel): user = ForeignKeyField(User) repository = ForeignKeyField(Repository) diff --git a/data/migrations/versions/f30984525c86_add_repositorysearchscore_table.py b/data/migrations/versions/f30984525c86_add_repositorysearchscore_table.py new file mode 100644 index 000000000..48795acff --- /dev/null +++ b/data/migrations/versions/f30984525c86_add_repositorysearchscore_table.py @@ -0,0 +1,35 @@ +"""Add RepositorySearchScore table + +Revision ID: f30984525c86 +Revises: be8d1c402ce0 +Create Date: 2017-04-04 14:30:13.270728 + +""" + +# revision identifiers, used by Alembic. +revision = 'f30984525c86' +down_revision = 'be8d1c402ce0' + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import mysql + +def upgrade(tables): + ### commands auto generated by Alembic - please adjust! ### + op.create_table('repositorysearchscore', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('repository_id', sa.Integer(), nullable=False), + sa.Column('score', sa.BigInteger(), nullable=False), + sa.Column('last_updated', sa.DateTime(), nullable=True), + sa.ForeignKeyConstraint(['repository_id'], ['repository.id'], name=op.f('fk_repositorysearchscore_repository_id_repository')), + sa.PrimaryKeyConstraint('id', name=op.f('pk_repositorysearchscore')) + ) + op.create_index('repositorysearchscore_repository_id', 'repositorysearchscore', ['repository_id'], unique=True) + op.create_index('repositorysearchscore_score', 'repositorysearchscore', ['score'], unique=False) + ### end Alembic commands ### + + +def downgrade(tables): + ### commands auto generated by Alembic - please adjust! ### + op.drop_table('repositorysearchscore') + ### end Alembic commands ### diff --git a/data/model/__init__.py b/data/model/__init__.py index 09521f9e4..41a5bbf80 100644 --- a/data/model/__init__.py +++ b/data/model/__init__.py @@ -133,6 +133,7 @@ from data.model import ( oauth, organization, permission, + repositoryactioncount, release, repository, service_keys, diff --git a/data/model/repository.py b/data/model/repository.py index 26733e1fa..ddaaa2e4b 100644 --- a/data/model/repository.py +++ b/data/model/repository.py @@ -12,7 +12,7 @@ from data.database import (Repository, Namespace, RepositoryTag, Star, Image, Im Visibility, RepositoryPermission, RepositoryActionCount, Role, RepositoryAuthorizedEmail, TagManifest, DerivedStorageForImage, Label, TagManifestLabel, db_for_update, get_epoch_timestamp, - db_random_func, db_concat_func) + db_random_func, db_concat_func, RepositorySearchScore) from data.text import prefix_search from util.itertoolrecipes import take @@ -42,6 +42,7 @@ def create_repository(namespace, name, creating_user, visibility='private', repo yesterday = datetime.now() - timedelta(days=1) RepositoryActionCount.create(repository=repo, count=0, date=yesterday) + RepositorySearchScore.create(repository=repo, score=0) if creating_user and not creating_user.organization: RepositoryPermission.create(user=creating_user, repository=repo, role=admin) diff --git a/data/model/repositoryactioncount.py b/data/model/repositoryactioncount.py new file mode 100644 index 000000000..66c1f9c4c --- /dev/null +++ b/data/model/repositoryactioncount.py @@ -0,0 +1,118 @@ +import logging + +from collections import namedtuple +from peewee import IntegrityError + +from datetime import date, timedelta, datetime +from data.database import (Repository, LogEntry, RepositoryActionCount, RepositorySearchScore, + db_random_func, fn) + +logger = logging.getLogger(__name__) + +search_bucket = namedtuple('SearchBucket', ['delta', 'days', 'weight']) + +# Defines the various buckets for search scoring. Each bucket is computed using the given time +# delta from today *minus the previous bucket's time period*. Once all the actions over the +# bucket's time period have been collected, they are multiplied by the given modifier. The modifiers +# for this bucket were determined via the integral of (2/((x/183)+1)^2)/183 over the period of days +# in the bucket; this integral over 0..183 has a sum of 1, so we get a good normalize score result. +SEARCH_BUCKETS = [ + search_bucket(timedelta(days=1), 1, 0.010870), + search_bucket(timedelta(days=7), 6, 0.062815), + search_bucket(timedelta(days=31), 24, 0.21604), + search_bucket(timedelta(days=183), 152, 0.71028), +] + +def find_uncounted_repository(): + """ Returns a repository that has not yet had an entry added into the RepositoryActionCount + table for yesterday. + """ + try: + # Get a random repository to count. + today = date.today() + yesterday = today - timedelta(days=1) + has_yesterday_actions = (RepositoryActionCount + .select(RepositoryActionCount.repository) + .where(RepositoryActionCount.date == yesterday)) + + to_count = (Repository + .select() + .where(~(Repository.id << (has_yesterday_actions))) + .order_by(db_random_func()).get()) + return to_count + except Repository.DoesNotExist: + return 0 + + +def count_repository_actions(to_count): + """ Aggregates repository actions from the LogEntry table for the last day and writes them to + the RepositoryActionCount table. Return True if the repository was updated and False + otherwise. + """ + today = date.today() + yesterday = today - timedelta(days=1) + + actions = (LogEntry + .select() + .where(LogEntry.repository == to_count, + LogEntry.datetime >= yesterday, + LogEntry.datetime < today) + .count()) + + try: + RepositoryActionCount.create(repository=to_count, date=yesterday, count=actions) + return True + except IntegrityError: + logger.exception('Exception when writing count for repository') + return False + + +def update_repository_score(repo): + """ Updates the repository score entry for the given table by retrieving information from + the RepositoryActionCount table. Note that count_repository_actions for the repo should + be called first. Returns True if the row was updated and False otherwise. + """ + today = date.today() + + # Retrieve the counts for each bucket and calculate the final score. + final_score = 0.0 + last_end_timedelta = timedelta(days=0) + + for bucket in SEARCH_BUCKETS: + start_date = today - bucket.delta + end_date = today - last_end_timedelta + last_end_timedelta = bucket.delta + + query = (RepositoryActionCount + .select(fn.Sum(RepositoryActionCount.count), fn.Count(RepositoryActionCount.id)) + .where(RepositoryActionCount.date >= start_date, + RepositoryActionCount.date < end_date, + RepositoryActionCount.repository == repo)) + + bucket_tuple = query.tuples()[0] + logger.debug('Got bucket tuple %s for bucket %s for repository %s', bucket_tuple, bucket, + repo.id) + + bucket_sum = bucket_tuple[0] + bucket_count = bucket_tuple[1] + if not bucket_count: + continue + + bucket_score = bucket_sum / (bucket_count * 1.0) + final_score += bucket_score * bucket.weight + + # Update the existing repo search score row or create a new one. + normalized_score = int(final_score * 100.0) + try: + try: + search_score_row = RepositorySearchScore.get(repository=repo) + search_score_row.last_updated = datetime.now() + search_score_row.score = normalized_score + search_score_row.save() + return True + except RepositorySearchScore.DoesNotExist: + RepositorySearchScore.create(repository=repo, score=normalized_score, last_updated=today) + return True + except IntegrityError: + logger.debug('RepositorySearchScore row already existed; skipping') + return False diff --git a/data/model/test/test_repositoryactioncount.py b/data/model/test/test_repositoryactioncount.py new file mode 100644 index 000000000..7d53ca9ff --- /dev/null +++ b/data/model/test/test_repositoryactioncount.py @@ -0,0 +1,38 @@ +from datetime import date, timedelta + +import pytest + +from data.database import RepositoryActionCount, RepositorySearchScore +from data.model.repository import create_repository +from data.model.repositoryactioncount import update_repository_score, SEARCH_BUCKETS +from test.fixtures import database_uri, init_db_path, sqlitedb_file + +@pytest.mark.parametrize('bucket_sums,expected_score', [ + ((0, 0, 0, 0), 0), + + ((1, 6, 24, 152), 100), + ((2, 6, 24, 152), 101), + ((1, 6, 24, 304), 171), + + ((100, 480, 24, 152), 703), + ((1, 6, 24, 15200), 7131), + + ((300, 500, 1000, 0), 1733), + ((5000, 0, 0, 0), 5434), +]) +def test_update_repository_score(bucket_sums, expected_score, database_uri): + # Create a new repository. + repo = create_repository('devtable', 'somenewrepo', None, repo_kind='image') + + # Delete the RAC created in create_repository. + RepositoryActionCount.delete().where(RepositoryActionCount.repository == repo).execute() + + # Add RAC rows for each of the buckets. + for index, bucket in enumerate(SEARCH_BUCKETS): + for day in range(0, bucket.days): + RepositoryActionCount.create(repository=repo, + count=(bucket_sums[index] / bucket.days * 1.0), + date=date.today() - bucket.delta + timedelta(days=day)) + + assert update_repository_score(repo) + assert RepositorySearchScore.get(repository=repo).score == expected_score diff --git a/initdb.py b/initdb.py index a7e278823..855ee11b3 100644 --- a/initdb.py +++ b/initdb.py @@ -854,8 +854,13 @@ def populate_database(minimal=False, with_storage=False): model.user.create_user_prompt(new_user_4, 'confirm_username') - while repositoryactioncounter.count_repository_actions(): - pass + while True: + to_count = model.repositoryactioncount.find_uncounted_repository() + if not to_count: + break + + model.repositoryactioncount.count_repository_actions(to_count) + model.repositoryactioncount.update_repository_score(to_count) def find_models_missing_data(): diff --git a/test/data/test.db b/test/data/test.db index 55e2f8cfa..914c1c29e 100644 Binary files a/test/data/test.db and b/test/data/test.db differ diff --git a/workers/repositoryactioncounter.py b/workers/repositoryactioncounter.py index bfcf9c78b..9def95f25 100644 --- a/workers/repositoryactioncounter.py +++ b/workers/repositoryactioncounter.py @@ -1,62 +1,39 @@ import logging -from datetime import date, timedelta - from app import app # This is required to initialize the database. -from data.database import Repository, LogEntry, RepositoryActionCount, db_random_func +from data import model from workers.worker import Worker POLL_PERIOD_SECONDS = 10 logger = logging.getLogger(__name__) -def count_repository_actions(): - """ Aggregates repository actions from the LogEntry table and writes them to - the RepositoryActionCount table. Returns the number of repositories for - which actions were logged. Returns 0 when there is no more work. - """ - try: - # Get a random repository to count. - today = date.today() - yesterday = today - timedelta(days=1) - has_yesterday_actions = (RepositoryActionCount - .select(RepositoryActionCount.repository) - .where(RepositoryActionCount.date == yesterday)) - - to_count = (Repository - .select() - .where(~(Repository.id << (has_yesterday_actions))) - .order_by(db_random_func()).get()) - - logger.debug('Counting: %s', to_count.id) - - actions = (LogEntry - .select() - .where(LogEntry.repository == to_count, - LogEntry.datetime >= yesterday, - LogEntry.datetime < today) - .count()) - - # Create the row. - try: - RepositoryActionCount.create(repository=to_count, date=yesterday, count=actions) - return 1 - except: - logger.exception('Exception when writing count') - except Repository.DoesNotExist: - logger.debug('No further repositories to count') - - return 0 - - class RepositoryActionCountWorker(Worker): def __init__(self): super(RepositoryActionCountWorker, self).__init__() self.add_operation(self._count_repository_actions, POLL_PERIOD_SECONDS) def _count_repository_actions(self): - """ Counts actions for a random repository for the previous day. """ - count_repository_actions() + """ Counts actions and aggregates search scores for a random repository for the + previous day. """ + to_count = model.repositoryactioncount.find_uncounted_repository() + if to_count is None: + logger.debug('No further repositories to count') + return + + logger.debug('Found repository #%s to count', to_count.id) + was_counted = model.repositoryactioncount.count_repository_actions(to_count) + if not was_counted: + logger.debug('Repository #%s was counted by another worker', to_count.id) + return + + logger.debug('Updating search score for repository #%s', to_count.id) + was_updated = model.repositoryactioncount.update_repository_score(to_count) + if not was_updated: + logger.debug('Repository #%s had its search score updated by another worker', to_count.id) + return + + logger.debug('Repository #%s search score updated', to_count.id) if __name__ == "__main__": worker = RepositoryActionCountWorker() diff --git a/workers/test/test_repositoryactioncounter.py b/workers/test/test_repositoryactioncounter.py new file mode 100644 index 000000000..e2dd36d83 --- /dev/null +++ b/workers/test/test_repositoryactioncounter.py @@ -0,0 +1,10 @@ +from data import model, database +from test.fixtures import app, appconfig, database_uri, init_db_path, sqlitedb_file +from workers.repositoryactioncounter import RepositoryActionCountWorker + +def test_repositoryactioncount(app): + database.RepositoryActionCount.delete().execute() + database.RepositorySearchScore.delete().execute() + + rac = RepositoryActionCountWorker() + rac._count_repository_actions()