Add a RepositorySearchScore table and calculation to the RAC worker
This will be used in a followup PR to order search results instead of the RAC join. Currently, the join with the RAC table in search results in a lookup of ~600K rows, which causes searching to take ~6s. This PR denormalizes the data we need, as well as allowing us to score based on a wider band (6 months vs the current 1 week).
This commit is contained in:
parent
1bfca871ec
commit
df3f47c79a
10 changed files with 243 additions and 50 deletions
|
@ -451,8 +451,8 @@ class User(BaseModel):
|
||||||
TagManifest, AccessToken, OAuthAccessToken, BlobUpload,
|
TagManifest, AccessToken, OAuthAccessToken, BlobUpload,
|
||||||
RepositoryNotification, OAuthAuthorizationCode,
|
RepositoryNotification, OAuthAuthorizationCode,
|
||||||
RepositoryActionCount, TagManifestLabel, Tag,
|
RepositoryActionCount, TagManifestLabel, Tag,
|
||||||
ManifestLabel, BlobUploading, TeamSync} | beta_classes
|
ManifestLabel, BlobUploading, TeamSync,
|
||||||
|
RepositorySearchScore} | beta_classes
|
||||||
delete_instance_filtered(self, User, delete_nullable, skip_transitive_deletes)
|
delete_instance_filtered(self, User, delete_nullable, skip_transitive_deletes)
|
||||||
|
|
||||||
|
|
||||||
|
@ -584,10 +584,18 @@ class Repository(BaseModel):
|
||||||
# These models don't need to use transitive deletes, because the referenced objects
|
# These models don't need to use transitive deletes, because the referenced objects
|
||||||
# are cleaned up directly
|
# are cleaned up directly
|
||||||
skip_transitive_deletes = {RepositoryTag, RepositoryBuild, RepositoryBuildTrigger, BlobUpload,
|
skip_transitive_deletes = {RepositoryTag, RepositoryBuild, RepositoryBuildTrigger, BlobUpload,
|
||||||
Image, TagManifest, TagManifestLabel, Label, DerivedStorageForImage} | beta_classes
|
Image, TagManifest, TagManifestLabel, Label, DerivedStorageForImage,
|
||||||
|
RepositorySearchScore} | beta_classes
|
||||||
|
|
||||||
delete_instance_filtered(self, Repository, delete_nullable, skip_transitive_deletes)
|
delete_instance_filtered(self, Repository, delete_nullable, skip_transitive_deletes)
|
||||||
|
|
||||||
|
|
||||||
|
class RepositorySearchScore(BaseModel):
|
||||||
|
repository = ForeignKeyField(Repository, unique=True)
|
||||||
|
score = BigIntegerField(index=True, default=0)
|
||||||
|
last_updated = DateTimeField(null=True)
|
||||||
|
|
||||||
|
|
||||||
class Star(BaseModel):
|
class Star(BaseModel):
|
||||||
user = ForeignKeyField(User)
|
user = ForeignKeyField(User)
|
||||||
repository = ForeignKeyField(Repository)
|
repository = ForeignKeyField(Repository)
|
||||||
|
|
|
@ -0,0 +1,35 @@
|
||||||
|
"""Add RepositorySearchScore table
|
||||||
|
|
||||||
|
Revision ID: f30984525c86
|
||||||
|
Revises: be8d1c402ce0
|
||||||
|
Create Date: 2017-04-04 14:30:13.270728
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
# revision identifiers, used by Alembic.
|
||||||
|
revision = 'f30984525c86'
|
||||||
|
down_revision = 'be8d1c402ce0'
|
||||||
|
|
||||||
|
from alembic import op
|
||||||
|
import sqlalchemy as sa
|
||||||
|
from sqlalchemy.dialects import mysql
|
||||||
|
|
||||||
|
def upgrade(tables):
|
||||||
|
### commands auto generated by Alembic - please adjust! ###
|
||||||
|
op.create_table('repositorysearchscore',
|
||||||
|
sa.Column('id', sa.Integer(), nullable=False),
|
||||||
|
sa.Column('repository_id', sa.Integer(), nullable=False),
|
||||||
|
sa.Column('score', sa.BigInteger(), nullable=False),
|
||||||
|
sa.Column('last_updated', sa.DateTime(), nullable=True),
|
||||||
|
sa.ForeignKeyConstraint(['repository_id'], ['repository.id'], name=op.f('fk_repositorysearchscore_repository_id_repository')),
|
||||||
|
sa.PrimaryKeyConstraint('id', name=op.f('pk_repositorysearchscore'))
|
||||||
|
)
|
||||||
|
op.create_index('repositorysearchscore_repository_id', 'repositorysearchscore', ['repository_id'], unique=True)
|
||||||
|
op.create_index('repositorysearchscore_score', 'repositorysearchscore', ['score'], unique=False)
|
||||||
|
### end Alembic commands ###
|
||||||
|
|
||||||
|
|
||||||
|
def downgrade(tables):
|
||||||
|
### commands auto generated by Alembic - please adjust! ###
|
||||||
|
op.drop_table('repositorysearchscore')
|
||||||
|
### end Alembic commands ###
|
|
@ -133,6 +133,7 @@ from data.model import (
|
||||||
oauth,
|
oauth,
|
||||||
organization,
|
organization,
|
||||||
permission,
|
permission,
|
||||||
|
repositoryactioncount,
|
||||||
release,
|
release,
|
||||||
repository,
|
repository,
|
||||||
service_keys,
|
service_keys,
|
||||||
|
|
|
@ -12,7 +12,7 @@ from data.database import (Repository, Namespace, RepositoryTag, Star, Image, Im
|
||||||
Visibility, RepositoryPermission, RepositoryActionCount,
|
Visibility, RepositoryPermission, RepositoryActionCount,
|
||||||
Role, RepositoryAuthorizedEmail, TagManifest, DerivedStorageForImage,
|
Role, RepositoryAuthorizedEmail, TagManifest, DerivedStorageForImage,
|
||||||
Label, TagManifestLabel, db_for_update, get_epoch_timestamp,
|
Label, TagManifestLabel, db_for_update, get_epoch_timestamp,
|
||||||
db_random_func, db_concat_func)
|
db_random_func, db_concat_func, RepositorySearchScore)
|
||||||
from data.text import prefix_search
|
from data.text import prefix_search
|
||||||
from util.itertoolrecipes import take
|
from util.itertoolrecipes import take
|
||||||
|
|
||||||
|
@ -42,6 +42,7 @@ def create_repository(namespace, name, creating_user, visibility='private', repo
|
||||||
|
|
||||||
yesterday = datetime.now() - timedelta(days=1)
|
yesterday = datetime.now() - timedelta(days=1)
|
||||||
RepositoryActionCount.create(repository=repo, count=0, date=yesterday)
|
RepositoryActionCount.create(repository=repo, count=0, date=yesterday)
|
||||||
|
RepositorySearchScore.create(repository=repo, score=0)
|
||||||
|
|
||||||
if creating_user and not creating_user.organization:
|
if creating_user and not creating_user.organization:
|
||||||
RepositoryPermission.create(user=creating_user, repository=repo, role=admin)
|
RepositoryPermission.create(user=creating_user, repository=repo, role=admin)
|
||||||
|
|
118
data/model/repositoryactioncount.py
Normal file
118
data/model/repositoryactioncount.py
Normal file
|
@ -0,0 +1,118 @@
|
||||||
|
import logging
|
||||||
|
|
||||||
|
from collections import namedtuple
|
||||||
|
from peewee import IntegrityError
|
||||||
|
|
||||||
|
from datetime import date, timedelta, datetime
|
||||||
|
from data.database import (Repository, LogEntry, RepositoryActionCount, RepositorySearchScore,
|
||||||
|
db_random_func, fn)
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
search_bucket = namedtuple('SearchBucket', ['delta', 'days', 'weight'])
|
||||||
|
|
||||||
|
# Defines the various buckets for search scoring. Each bucket is computed using the given time
|
||||||
|
# delta from today *minus the previous bucket's time period*. Once all the actions over the
|
||||||
|
# bucket's time period have been collected, they are multiplied by the given modifier. The modifiers
|
||||||
|
# for this bucket were determined via the integral of (2/((x/183)+1)^2)/183 over the period of days
|
||||||
|
# in the bucket; this integral over 0..183 has a sum of 1, so we get a good normalize score result.
|
||||||
|
SEARCH_BUCKETS = [
|
||||||
|
search_bucket(timedelta(days=1), 1, 0.010870),
|
||||||
|
search_bucket(timedelta(days=7), 6, 0.062815),
|
||||||
|
search_bucket(timedelta(days=31), 24, 0.21604),
|
||||||
|
search_bucket(timedelta(days=183), 152, 0.71028),
|
||||||
|
]
|
||||||
|
|
||||||
|
def find_uncounted_repository():
|
||||||
|
""" Returns a repository that has not yet had an entry added into the RepositoryActionCount
|
||||||
|
table for yesterday.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
# Get a random repository to count.
|
||||||
|
today = date.today()
|
||||||
|
yesterday = today - timedelta(days=1)
|
||||||
|
has_yesterday_actions = (RepositoryActionCount
|
||||||
|
.select(RepositoryActionCount.repository)
|
||||||
|
.where(RepositoryActionCount.date == yesterday))
|
||||||
|
|
||||||
|
to_count = (Repository
|
||||||
|
.select()
|
||||||
|
.where(~(Repository.id << (has_yesterday_actions)))
|
||||||
|
.order_by(db_random_func()).get())
|
||||||
|
return to_count
|
||||||
|
except Repository.DoesNotExist:
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
def count_repository_actions(to_count):
|
||||||
|
""" Aggregates repository actions from the LogEntry table for the last day and writes them to
|
||||||
|
the RepositoryActionCount table. Return True if the repository was updated and False
|
||||||
|
otherwise.
|
||||||
|
"""
|
||||||
|
today = date.today()
|
||||||
|
yesterday = today - timedelta(days=1)
|
||||||
|
|
||||||
|
actions = (LogEntry
|
||||||
|
.select()
|
||||||
|
.where(LogEntry.repository == to_count,
|
||||||
|
LogEntry.datetime >= yesterday,
|
||||||
|
LogEntry.datetime < today)
|
||||||
|
.count())
|
||||||
|
|
||||||
|
try:
|
||||||
|
RepositoryActionCount.create(repository=to_count, date=yesterday, count=actions)
|
||||||
|
return True
|
||||||
|
except IntegrityError:
|
||||||
|
logger.exception('Exception when writing count for repository')
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def update_repository_score(repo):
|
||||||
|
""" Updates the repository score entry for the given table by retrieving information from
|
||||||
|
the RepositoryActionCount table. Note that count_repository_actions for the repo should
|
||||||
|
be called first. Returns True if the row was updated and False otherwise.
|
||||||
|
"""
|
||||||
|
today = date.today()
|
||||||
|
|
||||||
|
# Retrieve the counts for each bucket and calculate the final score.
|
||||||
|
final_score = 0.0
|
||||||
|
last_end_timedelta = timedelta(days=0)
|
||||||
|
|
||||||
|
for bucket in SEARCH_BUCKETS:
|
||||||
|
start_date = today - bucket.delta
|
||||||
|
end_date = today - last_end_timedelta
|
||||||
|
last_end_timedelta = bucket.delta
|
||||||
|
|
||||||
|
query = (RepositoryActionCount
|
||||||
|
.select(fn.Sum(RepositoryActionCount.count), fn.Count(RepositoryActionCount.id))
|
||||||
|
.where(RepositoryActionCount.date >= start_date,
|
||||||
|
RepositoryActionCount.date < end_date,
|
||||||
|
RepositoryActionCount.repository == repo))
|
||||||
|
|
||||||
|
bucket_tuple = query.tuples()[0]
|
||||||
|
logger.debug('Got bucket tuple %s for bucket %s for repository %s', bucket_tuple, bucket,
|
||||||
|
repo.id)
|
||||||
|
|
||||||
|
bucket_sum = bucket_tuple[0]
|
||||||
|
bucket_count = bucket_tuple[1]
|
||||||
|
if not bucket_count:
|
||||||
|
continue
|
||||||
|
|
||||||
|
bucket_score = bucket_sum / (bucket_count * 1.0)
|
||||||
|
final_score += bucket_score * bucket.weight
|
||||||
|
|
||||||
|
# Update the existing repo search score row or create a new one.
|
||||||
|
normalized_score = int(final_score * 100.0)
|
||||||
|
try:
|
||||||
|
try:
|
||||||
|
search_score_row = RepositorySearchScore.get(repository=repo)
|
||||||
|
search_score_row.last_updated = datetime.now()
|
||||||
|
search_score_row.score = normalized_score
|
||||||
|
search_score_row.save()
|
||||||
|
return True
|
||||||
|
except RepositorySearchScore.DoesNotExist:
|
||||||
|
RepositorySearchScore.create(repository=repo, score=normalized_score, last_updated=today)
|
||||||
|
return True
|
||||||
|
except IntegrityError:
|
||||||
|
logger.debug('RepositorySearchScore row already existed; skipping')
|
||||||
|
return False
|
38
data/model/test/test_repositoryactioncount.py
Normal file
38
data/model/test/test_repositoryactioncount.py
Normal file
|
@ -0,0 +1,38 @@
|
||||||
|
from datetime import date, timedelta
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from data.database import RepositoryActionCount, RepositorySearchScore
|
||||||
|
from data.model.repository import create_repository
|
||||||
|
from data.model.repositoryactioncount import update_repository_score, SEARCH_BUCKETS
|
||||||
|
from test.fixtures import database_uri, init_db_path, sqlitedb_file
|
||||||
|
|
||||||
|
@pytest.mark.parametrize('bucket_sums,expected_score', [
|
||||||
|
((0, 0, 0, 0), 0),
|
||||||
|
|
||||||
|
((1, 6, 24, 152), 100),
|
||||||
|
((2, 6, 24, 152), 101),
|
||||||
|
((1, 6, 24, 304), 171),
|
||||||
|
|
||||||
|
((100, 480, 24, 152), 703),
|
||||||
|
((1, 6, 24, 15200), 7131),
|
||||||
|
|
||||||
|
((300, 500, 1000, 0), 1733),
|
||||||
|
((5000, 0, 0, 0), 5434),
|
||||||
|
])
|
||||||
|
def test_update_repository_score(bucket_sums, expected_score, database_uri):
|
||||||
|
# Create a new repository.
|
||||||
|
repo = create_repository('devtable', 'somenewrepo', None, repo_kind='image')
|
||||||
|
|
||||||
|
# Delete the RAC created in create_repository.
|
||||||
|
RepositoryActionCount.delete().where(RepositoryActionCount.repository == repo).execute()
|
||||||
|
|
||||||
|
# Add RAC rows for each of the buckets.
|
||||||
|
for index, bucket in enumerate(SEARCH_BUCKETS):
|
||||||
|
for day in range(0, bucket.days):
|
||||||
|
RepositoryActionCount.create(repository=repo,
|
||||||
|
count=(bucket_sums[index] / bucket.days * 1.0),
|
||||||
|
date=date.today() - bucket.delta + timedelta(days=day))
|
||||||
|
|
||||||
|
assert update_repository_score(repo)
|
||||||
|
assert RepositorySearchScore.get(repository=repo).score == expected_score
|
|
@ -854,8 +854,13 @@ def populate_database(minimal=False, with_storage=False):
|
||||||
|
|
||||||
model.user.create_user_prompt(new_user_4, 'confirm_username')
|
model.user.create_user_prompt(new_user_4, 'confirm_username')
|
||||||
|
|
||||||
while repositoryactioncounter.count_repository_actions():
|
while True:
|
||||||
pass
|
to_count = model.repositoryactioncount.find_uncounted_repository()
|
||||||
|
if not to_count:
|
||||||
|
break
|
||||||
|
|
||||||
|
model.repositoryactioncount.count_repository_actions(to_count)
|
||||||
|
model.repositoryactioncount.update_repository_score(to_count)
|
||||||
|
|
||||||
|
|
||||||
def find_models_missing_data():
|
def find_models_missing_data():
|
||||||
|
|
Binary file not shown.
|
@ -1,62 +1,39 @@
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
from datetime import date, timedelta
|
|
||||||
|
|
||||||
from app import app # This is required to initialize the database.
|
from app import app # This is required to initialize the database.
|
||||||
from data.database import Repository, LogEntry, RepositoryActionCount, db_random_func
|
from data import model
|
||||||
from workers.worker import Worker
|
from workers.worker import Worker
|
||||||
|
|
||||||
POLL_PERIOD_SECONDS = 10
|
POLL_PERIOD_SECONDS = 10
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
def count_repository_actions():
|
|
||||||
""" Aggregates repository actions from the LogEntry table and writes them to
|
|
||||||
the RepositoryActionCount table. Returns the number of repositories for
|
|
||||||
which actions were logged. Returns 0 when there is no more work.
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
# Get a random repository to count.
|
|
||||||
today = date.today()
|
|
||||||
yesterday = today - timedelta(days=1)
|
|
||||||
has_yesterday_actions = (RepositoryActionCount
|
|
||||||
.select(RepositoryActionCount.repository)
|
|
||||||
.where(RepositoryActionCount.date == yesterday))
|
|
||||||
|
|
||||||
to_count = (Repository
|
|
||||||
.select()
|
|
||||||
.where(~(Repository.id << (has_yesterday_actions)))
|
|
||||||
.order_by(db_random_func()).get())
|
|
||||||
|
|
||||||
logger.debug('Counting: %s', to_count.id)
|
|
||||||
|
|
||||||
actions = (LogEntry
|
|
||||||
.select()
|
|
||||||
.where(LogEntry.repository == to_count,
|
|
||||||
LogEntry.datetime >= yesterday,
|
|
||||||
LogEntry.datetime < today)
|
|
||||||
.count())
|
|
||||||
|
|
||||||
# Create the row.
|
|
||||||
try:
|
|
||||||
RepositoryActionCount.create(repository=to_count, date=yesterday, count=actions)
|
|
||||||
return 1
|
|
||||||
except:
|
|
||||||
logger.exception('Exception when writing count')
|
|
||||||
except Repository.DoesNotExist:
|
|
||||||
logger.debug('No further repositories to count')
|
|
||||||
|
|
||||||
return 0
|
|
||||||
|
|
||||||
|
|
||||||
class RepositoryActionCountWorker(Worker):
|
class RepositoryActionCountWorker(Worker):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
super(RepositoryActionCountWorker, self).__init__()
|
super(RepositoryActionCountWorker, self).__init__()
|
||||||
self.add_operation(self._count_repository_actions, POLL_PERIOD_SECONDS)
|
self.add_operation(self._count_repository_actions, POLL_PERIOD_SECONDS)
|
||||||
|
|
||||||
def _count_repository_actions(self):
|
def _count_repository_actions(self):
|
||||||
""" Counts actions for a random repository for the previous day. """
|
""" Counts actions and aggregates search scores for a random repository for the
|
||||||
count_repository_actions()
|
previous day. """
|
||||||
|
to_count = model.repositoryactioncount.find_uncounted_repository()
|
||||||
|
if to_count is None:
|
||||||
|
logger.debug('No further repositories to count')
|
||||||
|
return
|
||||||
|
|
||||||
|
logger.debug('Found repository #%s to count', to_count.id)
|
||||||
|
was_counted = model.repositoryactioncount.count_repository_actions(to_count)
|
||||||
|
if not was_counted:
|
||||||
|
logger.debug('Repository #%s was counted by another worker', to_count.id)
|
||||||
|
return
|
||||||
|
|
||||||
|
logger.debug('Updating search score for repository #%s', to_count.id)
|
||||||
|
was_updated = model.repositoryactioncount.update_repository_score(to_count)
|
||||||
|
if not was_updated:
|
||||||
|
logger.debug('Repository #%s had its search score updated by another worker', to_count.id)
|
||||||
|
return
|
||||||
|
|
||||||
|
logger.debug('Repository #%s search score updated', to_count.id)
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
worker = RepositoryActionCountWorker()
|
worker = RepositoryActionCountWorker()
|
||||||
|
|
10
workers/test/test_repositoryactioncounter.py
Normal file
10
workers/test/test_repositoryactioncounter.py
Normal file
|
@ -0,0 +1,10 @@
|
||||||
|
from data import model, database
|
||||||
|
from test.fixtures import app, appconfig, database_uri, init_db_path, sqlitedb_file
|
||||||
|
from workers.repositoryactioncounter import RepositoryActionCountWorker
|
||||||
|
|
||||||
|
def test_repositoryactioncount(app):
|
||||||
|
database.RepositoryActionCount.delete().execute()
|
||||||
|
database.RepositorySearchScore.delete().execute()
|
||||||
|
|
||||||
|
rac = RepositoryActionCountWorker()
|
||||||
|
rac._count_repository_actions()
|
Reference in a new issue