Merge pull request #2575 from jakedt/fixrace
Resolve race condition between multiple log archivers
This commit is contained in:
commit
0d04fd8bd2
2 changed files with 26 additions and 14 deletions
|
@ -248,3 +248,13 @@ def get_archivable_build():
|
||||||
return RepositoryBuild.get(id=found_id)
|
return RepositoryBuild.get(id=found_id)
|
||||||
except RepositoryBuild.DoesNotExist:
|
except RepositoryBuild.DoesNotExist:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def mark_build_archived(build_uuid):
|
||||||
|
""" Mark a build as archived, and return True if we were the ones who actually
|
||||||
|
updated the row. """
|
||||||
|
return (RepositoryBuild
|
||||||
|
.update(logs_archived=True)
|
||||||
|
.where(RepositoryBuild.uuid == build_uuid,
|
||||||
|
RepositoryBuild.logs_archived == False)
|
||||||
|
.execute()) > 0
|
||||||
|
|
|
@ -15,6 +15,7 @@ MEMORY_TEMPFILE_SIZE = 64 * 1024 # Large enough to handle approximately 99% of
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class ArchiveBuildLogsWorker(Worker):
|
class ArchiveBuildLogsWorker(Worker):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
super(ArchiveBuildLogsWorker, self).__init__()
|
super(ArchiveBuildLogsWorker, self).__init__()
|
||||||
|
@ -38,22 +39,23 @@ class ArchiveBuildLogsWorker(Worker):
|
||||||
'logs': entries,
|
'logs': entries,
|
||||||
}
|
}
|
||||||
|
|
||||||
with CloseForLongOperation(app.config):
|
if length > 0:
|
||||||
with SpooledTemporaryFile(MEMORY_TEMPFILE_SIZE) as tempfile:
|
with CloseForLongOperation(app.config):
|
||||||
with GzipFile('testarchive', fileobj=tempfile) as zipstream:
|
with SpooledTemporaryFile(MEMORY_TEMPFILE_SIZE) as tempfile:
|
||||||
for chunk in StreamingJSONEncoder().iterencode(to_encode):
|
with GzipFile('testarchive', fileobj=tempfile) as zipstream:
|
||||||
zipstream.write(chunk)
|
for chunk in StreamingJSONEncoder().iterencode(to_encode):
|
||||||
|
zipstream.write(chunk)
|
||||||
|
|
||||||
tempfile.seek(0)
|
tempfile.seek(0)
|
||||||
log_archive.store_file(tempfile, JSON_MIMETYPE, content_encoding='gzip',
|
log_archive.store_file(tempfile, JSON_MIMETYPE, content_encoding='gzip',
|
||||||
file_id=to_archive.uuid)
|
file_id=to_archive.uuid)
|
||||||
|
|
||||||
to_update = model.build.get_repository_build(to_archive.uuid)
|
we_updated = model.build.mark_build_archived(to_archive.uuid)
|
||||||
to_update.logs_archived = True
|
if we_updated:
|
||||||
to_update.save()
|
build_logs.expire_status(to_archive.uuid)
|
||||||
|
build_logs.delete_log_entries(to_archive.uuid)
|
||||||
build_logs.expire_status(to_update.uuid)
|
else:
|
||||||
build_logs.delete_log_entries(to_update.uuid)
|
logger.debug('Another worker pre-empted us when archiving: %s', to_archive.uuid)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|
Reference in a new issue