diff --git a/data/model/build.py b/data/model/build.py index 70501c5dc..7f2479772 100644 --- a/data/model/build.py +++ b/data/model/build.py @@ -4,7 +4,7 @@ from peewee import JOIN_LEFT_OUTER from datetime import timedelta, datetime from data.database import (BuildTriggerService, RepositoryBuildTrigger, Repository, Namespace, User, - RepositoryBuild, BUILD_PHASE, db_for_update) + RepositoryBuild, BUILD_PHASE, db_for_update, db_random_func) from data.model import (InvalidBuildTriggerException, InvalidRepositoryBuildException, db_transaction, user as user_model) @@ -163,11 +163,23 @@ def cancel_repository_build(build, work_queue): return True -def archivable_buildlogs_query(): +def get_archivable_build(): presumed_dead_date = datetime.utcnow() - PRESUMED_DEAD_BUILD_AGE - return (RepositoryBuild - .select() + candidates = (RepositoryBuild + .select(RepositoryBuild.id) .where((RepositoryBuild.phase == BUILD_PHASE.COMPLETE) | (RepositoryBuild.phase == BUILD_PHASE.ERROR) | (RepositoryBuild.started < presumed_dead_date), - RepositoryBuild.logs_archived == False)) + RepositoryBuild.logs_archived == False) + .limit(50) + .alias('candidates')) + + try: + return (RepositoryBuild + .select(candidates.c.id) + .from_(candidates) + .order_by(db_random_func()) + .get()) + except RepositoryBuild.DoesNotExist: + return None + diff --git a/test/test_queries.py b/test/test_queries.py new file mode 100644 index 000000000..6ef9b24d9 --- /dev/null +++ b/test/test_queries.py @@ -0,0 +1,40 @@ +import unittest + +from app import app +from initdb import setup_database_for_testing, finished_database_for_testing +from data import model +from data.database import RepositoryBuild + +ADMIN_ACCESS_USER = 'devtable' +SIMPLE_REPO = 'simple' + +class TestSpecificQueries(unittest.TestCase): + def setUp(self): + setup_database_for_testing(self) + self.app = app.test_client() + self.ctx = app.test_request_context() + self.ctx.__enter__() + + def tearDown(self): + finished_database_for_testing(self) + self.ctx.__exit__(True, None, None) + + def test_archivable_buildlogs(self): + # Make sure there are no archivable logs. + result = model.build.get_archivable_build() + self.assertIsNone(result) + + # Add a build that we know needs to be archived. + repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO) + token = model.token.create_access_token(repo, 'write') + created = RepositoryBuild.create(repository=repo, access_token=token, + phase=model.build.BUILD_PHASE.COMPLETE, + logs_archived=False, job_config='{}', + display_name='') + + # Make sure we now find an archivable build. + result = model.build.get_archivable_build() + self.assertEquals(created.id, result.id) + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/workers/buildlogsarchiver.py b/workers/buildlogsarchiver.py index f4882c453..86fb13387 100644 --- a/workers/buildlogsarchiver.py +++ b/workers/buildlogsarchiver.py @@ -24,34 +24,34 @@ class ArchiveBuildLogsWorker(Worker): def _archive_redis_buildlogs(self): """ Archive a single build, choosing a candidate at random. This process must be idempotent to avoid needing two-phase commit. """ - try: - # Get a random build to archive - to_archive = model.build.archivable_buildlogs_query().order_by(db_random_func()).get() - logger.debug('Archiving: %s', to_archive.uuid) - - length, entries = build_logs.get_log_entries(to_archive.uuid, 0) - to_encode = { - 'start': 0, - 'total': length, - 'logs': entries, - } - - with SpooledTemporaryFile(MEMORY_TEMPFILE_SIZE) as tempfile: - with GzipFile('testarchive', fileobj=tempfile) as zipstream: - for chunk in StreamingJSONEncoder().iterencode(to_encode): - zipstream.write(chunk) - - tempfile.seek(0) - log_archive.store_file(tempfile, JSON_MIMETYPE, content_encoding='gzip', - file_id=to_archive.uuid) - - to_archive.logs_archived = True - to_archive.save() - - build_logs.expire_log_entries(to_archive.uuid) - - except RepositoryBuild.DoesNotExist: + # Get a random build to archive + to_archive = model.build.get_archivable_build() + if to_archive is None: logger.debug('No more builds to archive') + return + + logger.debug('Archiving: %s', to_archive.uuid) + + length, entries = build_logs.get_log_entries(to_archive.uuid, 0) + to_encode = { + 'start': 0, + 'total': length, + 'logs': entries, + } + + with SpooledTemporaryFile(MEMORY_TEMPFILE_SIZE) as tempfile: + with GzipFile('testarchive', fileobj=tempfile) as zipstream: + for chunk in StreamingJSONEncoder().iterencode(to_encode): + zipstream.write(chunk) + + tempfile.seek(0) + log_archive.store_file(tempfile, JSON_MIMETYPE, content_encoding='gzip', + file_id=to_archive.uuid) + + to_archive.logs_archived = True + to_archive.save() + + build_logs.expire_log_entries(to_archive.uuid) if __name__ == "__main__":