From a159bd3e7746af1dc7e8a381456aee729cda5f45 Mon Sep 17 00:00:00 2001 From: Jake Moshenko Date: Mon, 24 Apr 2017 13:37:25 -0400 Subject: [PATCH] Resolve race condition between multiple log archivers --- data/model/build.py | 10 ++++++++++ workers/buildlogsarchiver.py | 30 ++++++++++++++++-------------- 2 files changed, 26 insertions(+), 14 deletions(-) diff --git a/data/model/build.py b/data/model/build.py index 346d9ebef..8d2dc6048 100644 --- a/data/model/build.py +++ b/data/model/build.py @@ -248,3 +248,13 @@ def get_archivable_build(): return RepositoryBuild.get(id=found_id) except RepositoryBuild.DoesNotExist: return None + + +def mark_build_archived(build_uuid): + """ Mark a build as archived, and return True if we were the ones who actually + updated the row. """ + return (RepositoryBuild + .update(logs_archived=True) + .where(RepositoryBuild.uuid == build_uuid, + RepositoryBuild.logs_archived == False) + .execute()) > 0 diff --git a/workers/buildlogsarchiver.py b/workers/buildlogsarchiver.py index 0c17c7646..7b9f2ba98 100644 --- a/workers/buildlogsarchiver.py +++ b/workers/buildlogsarchiver.py @@ -15,6 +15,7 @@ MEMORY_TEMPFILE_SIZE = 64 * 1024 # Large enough to handle approximately 99% of logger = logging.getLogger(__name__) + class ArchiveBuildLogsWorker(Worker): def __init__(self): super(ArchiveBuildLogsWorker, self).__init__() @@ -38,22 +39,23 @@ class ArchiveBuildLogsWorker(Worker): 'logs': entries, } - with CloseForLongOperation(app.config): - with SpooledTemporaryFile(MEMORY_TEMPFILE_SIZE) as tempfile: - with GzipFile('testarchive', fileobj=tempfile) as zipstream: - for chunk in StreamingJSONEncoder().iterencode(to_encode): - zipstream.write(chunk) + if length > 0: + with CloseForLongOperation(app.config): + with SpooledTemporaryFile(MEMORY_TEMPFILE_SIZE) as tempfile: + with GzipFile('testarchive', fileobj=tempfile) as zipstream: + for chunk in StreamingJSONEncoder().iterencode(to_encode): + zipstream.write(chunk) - tempfile.seek(0) - log_archive.store_file(tempfile, JSON_MIMETYPE, content_encoding='gzip', - file_id=to_archive.uuid) + tempfile.seek(0) + log_archive.store_file(tempfile, JSON_MIMETYPE, content_encoding='gzip', + file_id=to_archive.uuid) - to_update = model.build.get_repository_build(to_archive.uuid) - to_update.logs_archived = True - to_update.save() - - build_logs.expire_status(to_update.uuid) - build_logs.delete_log_entries(to_update.uuid) + we_updated = model.build.mark_build_archived(to_archive.uuid) + if we_updated: + build_logs.expire_status(to_archive.uuid) + build_logs.delete_log_entries(to_archive.uuid) + else: + logger.debug('Another worker pre-empted us when archiving: %s', to_archive.uuid) if __name__ == "__main__":