From 8b3a3178b0f69ef53c88fe5c10cc7300edfbe960 Mon Sep 17 00:00:00 2001 From: Jake Moshenko Date: Thu, 11 Sep 2014 15:33:10 -0400 Subject: [PATCH] Finish the build logs archiver, add handlers for cloud and local that handle gzip encoded archived content. --- data/archivedlogs.py | 33 +++++++++++++++++++++++++++++---- data/buildlogs.py | 7 +++++++ data/userfiles.py | 5 +++-- endpoints/api/build.py | 8 ++++++-- storage/basestorage.py | 2 +- storage/cloud.py | 10 ++++++++-- storage/fakestorage.py | 2 +- storage/local.py | 2 +- test/testlogs.py | 8 ++++++++ workers/buildlogsarchiver.py | 23 ++++++++++++++++++----- 10 files changed, 82 insertions(+), 18 deletions(-) diff --git a/data/archivedlogs.py b/data/archivedlogs.py index bde3e9151..e190b9782 100644 --- a/data/archivedlogs.py +++ b/data/archivedlogs.py @@ -1,5 +1,30 @@ +import logging + +from gzip import GzipFile +from flask import send_file, abort +from cStringIO import StringIO + from data.userfiles import DelegateUserfiles, UserfilesHandlers + +JSON_MIMETYPE = 'application/json' + + +logger = logging.getLogger(__name__) + + +class LogArchiveHandlers(UserfilesHandlers): + def get(self, file_id): + path = self._files.get_file_id_path(file_id) + try: + with self._storage.stream_read_file(self._locations, path) as gzip_stream: + with GzipFile(fileobj=gzip_stream) as unzipped: + unzipped_buffer = StringIO(unzipped.read()) + return send_file(unzipped_buffer, mimetype=JSON_MIMETYPE) + except IOError: + abort(404) + + class LogArchive(object): def __init__(self, app=None, distributed_storage=None): self.app = app @@ -17,10 +42,10 @@ class LogArchive(object): log_archive = DelegateUserfiles(app, distributed_storage, location, path, handler_name) app.add_url_rule('/logarchive/', - view_func=UserfilesHandlers.as_view(handler_name, - distributed_storage=distributed_storage, - location=location, - files=log_archive)) + view_func=LogArchiveHandlers.as_view(handler_name, + distributed_storage=distributed_storage, + location=location, + files=log_archive)) # register extension with app app.extensions = getattr(app, 'extensions', {}) diff --git a/data/buildlogs.py b/data/buildlogs.py index 2ccd03899..4fdd24e72 100644 --- a/data/buildlogs.py +++ b/data/buildlogs.py @@ -51,6 +51,13 @@ class RedisBuildLogs(object): except redis.ConnectionError: raise BuildStatusRetrievalError('Cannot retrieve build logs') + def delete_log_entries(self, build_id): + """ + Deletes the logs and status keys completely. + """ + self._redis.delete(self._logs_key(build_id)) + + @staticmethod def _status_key(build_id): return 'builds/%s/status' % build_id diff --git a/data/userfiles.py b/data/userfiles.py index 8e7227e01..f4b786df5 100644 --- a/data/userfiles.py +++ b/data/userfiles.py @@ -81,12 +81,13 @@ class DelegateUserfiles(object): return (url, file_id) - def store_file(self, file_like_obj, content_type, file_id=None): + def store_file(self, file_like_obj, content_type, content_encoding=None, file_id=None): if file_id is None: file_id = str(uuid4()) path = self.get_file_id_path(file_id) - self._storage.stream_write(self._locations, path, file_like_obj, content_type) + self._storage.stream_write(self._locations, path, file_like_obj, content_type, + content_encoding) return file_id def get_file_url(self, file_id, expires_in=300, requires_cors=False): diff --git a/endpoints/api/build.py b/endpoints/api/build.py index 74677fadb..d792234dd 100644 --- a/endpoints/api/build.py +++ b/endpoints/api/build.py @@ -1,9 +1,9 @@ import logging import json -from flask import request +from flask import request, redirect -from app import app, userfiles as user_files, build_logs +from app import app, userfiles as user_files, build_logs, log_archive from endpoints.api import (RepositoryParamResource, parse_args, query_param, nickname, resource, require_repo_read, require_repo_write, validate_json_request, ApiResource, internal_only, format_date, api, Unauthorized, NotFound) @@ -215,6 +215,10 @@ class RepositoryBuildLogs(RepositoryParamResource): build = model.get_repository_build(namespace, repository, build_uuid) + # If the logs have been archived, just redirect to the completed archive + if build.logs_archived: + return redirect(log_archive.get_file_url(build.uuid)) + start = int(request.args.get('start', 0)) try: diff --git a/storage/basestorage.py b/storage/basestorage.py index 78d49aa1f..675c3a738 100644 --- a/storage/basestorage.py +++ b/storage/basestorage.py @@ -75,7 +75,7 @@ class BaseStorage(StoragePaths): def stream_read_file(self, path): raise NotImplementedError - def stream_write(self, path, fp, content_type=None): + def stream_write(self, path, fp, content_type=None, content_encoding=None): raise NotImplementedError def list_directory(self, path=None): diff --git a/storage/cloud.py b/storage/cloud.py index f7d922d6c..824c57534 100644 --- a/storage/cloud.py +++ b/storage/cloud.py @@ -128,7 +128,7 @@ class _CloudStorage(BaseStorage): raise IOError('No such key: \'{0}\''.format(path)) return StreamReadKeyAsFile(key) - def stream_write(self, path, fp, content_type=None): + def stream_write(self, path, fp, content_type=None, content_encoding=None): # Minimum size of upload part size on S3 is 5MB self._initialize_cloud_conn() buffer_size = 5 * 1024 * 1024 @@ -140,6 +140,9 @@ class _CloudStorage(BaseStorage): if content_type is not None: metadata['Content-Type'] = content_type + if content_encoding is not None: + metadata['Content-Encoding'] = content_encoding + mp = self._cloud_bucket.initiate_multipart_upload(path, metadata=metadata, **self._upload_params) num_part = 1 @@ -224,7 +227,7 @@ class GoogleCloudStorage(_CloudStorage): connect_kwargs, upload_params, storage_path, access_key, secret_key, bucket_name) - def stream_write(self, path, fp, content_type=None): + def stream_write(self, path, fp, content_type=None, content_encoding=None): # Minimum size of upload part size on S3 is 5MB self._initialize_cloud_conn() path = self._init_path(path) @@ -233,6 +236,9 @@ class GoogleCloudStorage(_CloudStorage): if content_type is not None: key.set_metadata('Content-Type', content_type) + if content_encoding is not None: + key.set_metadata('Content-Encoding', content_encoding) + key.set_contents_from_stream(fp) diff --git a/storage/fakestorage.py b/storage/fakestorage.py index 232f5af24..cebed1e80 100644 --- a/storage/fakestorage.py +++ b/storage/fakestorage.py @@ -14,7 +14,7 @@ class FakeStorage(BaseStorage): def stream_read(self, path): yield '' - def stream_write(self, path, fp, content_type=None): + def stream_write(self, path, fp, content_type=None, content_encoding=None): pass def remove(self, path): diff --git a/storage/local.py b/storage/local.py index 987431e33..056c68c05 100644 --- a/storage/local.py +++ b/storage/local.py @@ -43,7 +43,7 @@ class LocalStorage(BaseStorage): path = self._init_path(path) return io.open(path, mode='rb') - def stream_write(self, path, fp, content_type=None): + def stream_write(self, path, fp, content_type=None, content_encoding=None): # Size is mandatory path = self._init_path(path, create=True) with open(path, mode='wb') as f: diff --git a/test/testlogs.py b/test/testlogs.py index 27fe7c47b..023fd159d 100644 --- a/test/testlogs.py +++ b/test/testlogs.py @@ -198,3 +198,11 @@ class TestBuildLogs(RedisBuildLogs): return None else: return super(TestBuildLogs, self).get_status(build_id) + + def delete_log_entries(self, build_id): + if build_id == self.test_build_id: + return + if not self.allow_delegate: + return None + else: + return super(TestBuildLogs, self).delete_log_entries(build_id) diff --git a/workers/buildlogsarchiver.py b/workers/buildlogsarchiver.py index c11131890..c51666d73 100644 --- a/workers/buildlogsarchiver.py +++ b/workers/buildlogsarchiver.py @@ -3,10 +3,12 @@ import logging from apscheduler.schedulers.blocking import BlockingScheduler from peewee import fn from tempfile import SpooledTemporaryFile +from gzip import GzipFile from data import model -from data.database import configure, RepositoryBuild -from app import app, build_logs, log_archive +from data.archivedlogs import JSON_MIMETYPE +from data.database import RepositoryBuild +from app import build_logs, log_archive from util.streamingjsonencoder import StreamingJSONEncoder POLL_PERIOD_SECONDS = 30 @@ -14,7 +16,7 @@ POLL_PERIOD_SECONDS = 30 logger = logging.getLogger(__name__) sched = BlockingScheduler() -@sched.scheduled_job(trigger='interval', seconds=5) +@sched.scheduled_job(trigger='interval', seconds=1) def archive_redis_buildlogs(): """ Archive a single build, choosing a candidate at random. This process must be idempotent to avoid needing two-phase commit. """ @@ -30,8 +32,19 @@ def archive_redis_buildlogs(): 'logs': entries, } - for chunk in StreamingJSONEncoder().iterencode(to_encode): - print chunk + with SpooledTemporaryFile() as tempfile: + with GzipFile('testarchive', fileobj=tempfile) as zipstream: + for chunk in StreamingJSONEncoder().iterencode(to_encode): + zipstream.write(chunk) + + tempfile.seek(0) + log_archive.store_file(tempfile, JSON_MIMETYPE, content_encoding='gzip', + file_id=to_archive.uuid) + + to_archive.logs_archived = True + to_archive.save() + + build_logs.delete_log_entries(to_archive.uuid) except RepositoryBuild.DoesNotExist: logger.debug('No more builds to archive')