Finish the build logs archiver, add handlers for cloud and local that handle gzip encoded archived content.

This commit is contained in:
Jake Moshenko 2014-09-11 15:33:10 -04:00
parent 2455c17f96
commit 8b3a3178b0
10 changed files with 82 additions and 18 deletions

View file

@ -1,5 +1,30 @@
import logging
from gzip import GzipFile
from flask import send_file, abort
from cStringIO import StringIO
from data.userfiles import DelegateUserfiles, UserfilesHandlers from data.userfiles import DelegateUserfiles, UserfilesHandlers
JSON_MIMETYPE = 'application/json'
logger = logging.getLogger(__name__)
class LogArchiveHandlers(UserfilesHandlers):
def get(self, file_id):
path = self._files.get_file_id_path(file_id)
try:
with self._storage.stream_read_file(self._locations, path) as gzip_stream:
with GzipFile(fileobj=gzip_stream) as unzipped:
unzipped_buffer = StringIO(unzipped.read())
return send_file(unzipped_buffer, mimetype=JSON_MIMETYPE)
except IOError:
abort(404)
class LogArchive(object): class LogArchive(object):
def __init__(self, app=None, distributed_storage=None): def __init__(self, app=None, distributed_storage=None):
self.app = app self.app = app
@ -17,10 +42,10 @@ class LogArchive(object):
log_archive = DelegateUserfiles(app, distributed_storage, location, path, handler_name) log_archive = DelegateUserfiles(app, distributed_storage, location, path, handler_name)
app.add_url_rule('/logarchive/<file_id>', app.add_url_rule('/logarchive/<file_id>',
view_func=UserfilesHandlers.as_view(handler_name, view_func=LogArchiveHandlers.as_view(handler_name,
distributed_storage=distributed_storage, distributed_storage=distributed_storage,
location=location, location=location,
files=log_archive)) files=log_archive))
# register extension with app # register extension with app
app.extensions = getattr(app, 'extensions', {}) app.extensions = getattr(app, 'extensions', {})

View file

@ -51,6 +51,13 @@ class RedisBuildLogs(object):
except redis.ConnectionError: except redis.ConnectionError:
raise BuildStatusRetrievalError('Cannot retrieve build logs') raise BuildStatusRetrievalError('Cannot retrieve build logs')
def delete_log_entries(self, build_id):
"""
Deletes the logs and status keys completely.
"""
self._redis.delete(self._logs_key(build_id))
@staticmethod @staticmethod
def _status_key(build_id): def _status_key(build_id):
return 'builds/%s/status' % build_id return 'builds/%s/status' % build_id

View file

@ -81,12 +81,13 @@ class DelegateUserfiles(object):
return (url, file_id) return (url, file_id)
def store_file(self, file_like_obj, content_type, file_id=None): def store_file(self, file_like_obj, content_type, content_encoding=None, file_id=None):
if file_id is None: if file_id is None:
file_id = str(uuid4()) file_id = str(uuid4())
path = self.get_file_id_path(file_id) path = self.get_file_id_path(file_id)
self._storage.stream_write(self._locations, path, file_like_obj, content_type) self._storage.stream_write(self._locations, path, file_like_obj, content_type,
content_encoding)
return file_id return file_id
def get_file_url(self, file_id, expires_in=300, requires_cors=False): def get_file_url(self, file_id, expires_in=300, requires_cors=False):

View file

@ -1,9 +1,9 @@
import logging import logging
import json import json
from flask import request from flask import request, redirect
from app import app, userfiles as user_files, build_logs from app import app, userfiles as user_files, build_logs, log_archive
from endpoints.api import (RepositoryParamResource, parse_args, query_param, nickname, resource, from endpoints.api import (RepositoryParamResource, parse_args, query_param, nickname, resource,
require_repo_read, require_repo_write, validate_json_request, require_repo_read, require_repo_write, validate_json_request,
ApiResource, internal_only, format_date, api, Unauthorized, NotFound) ApiResource, internal_only, format_date, api, Unauthorized, NotFound)
@ -215,6 +215,10 @@ class RepositoryBuildLogs(RepositoryParamResource):
build = model.get_repository_build(namespace, repository, build_uuid) build = model.get_repository_build(namespace, repository, build_uuid)
# If the logs have been archived, just redirect to the completed archive
if build.logs_archived:
return redirect(log_archive.get_file_url(build.uuid))
start = int(request.args.get('start', 0)) start = int(request.args.get('start', 0))
try: try:

View file

@ -75,7 +75,7 @@ class BaseStorage(StoragePaths):
def stream_read_file(self, path): def stream_read_file(self, path):
raise NotImplementedError raise NotImplementedError
def stream_write(self, path, fp, content_type=None): def stream_write(self, path, fp, content_type=None, content_encoding=None):
raise NotImplementedError raise NotImplementedError
def list_directory(self, path=None): def list_directory(self, path=None):

View file

@ -128,7 +128,7 @@ class _CloudStorage(BaseStorage):
raise IOError('No such key: \'{0}\''.format(path)) raise IOError('No such key: \'{0}\''.format(path))
return StreamReadKeyAsFile(key) return StreamReadKeyAsFile(key)
def stream_write(self, path, fp, content_type=None): def stream_write(self, path, fp, content_type=None, content_encoding=None):
# Minimum size of upload part size on S3 is 5MB # Minimum size of upload part size on S3 is 5MB
self._initialize_cloud_conn() self._initialize_cloud_conn()
buffer_size = 5 * 1024 * 1024 buffer_size = 5 * 1024 * 1024
@ -140,6 +140,9 @@ class _CloudStorage(BaseStorage):
if content_type is not None: if content_type is not None:
metadata['Content-Type'] = content_type metadata['Content-Type'] = content_type
if content_encoding is not None:
metadata['Content-Encoding'] = content_encoding
mp = self._cloud_bucket.initiate_multipart_upload(path, metadata=metadata, mp = self._cloud_bucket.initiate_multipart_upload(path, metadata=metadata,
**self._upload_params) **self._upload_params)
num_part = 1 num_part = 1
@ -224,7 +227,7 @@ class GoogleCloudStorage(_CloudStorage):
connect_kwargs, upload_params, storage_path, connect_kwargs, upload_params, storage_path,
access_key, secret_key, bucket_name) access_key, secret_key, bucket_name)
def stream_write(self, path, fp, content_type=None): def stream_write(self, path, fp, content_type=None, content_encoding=None):
# Minimum size of upload part size on S3 is 5MB # Minimum size of upload part size on S3 is 5MB
self._initialize_cloud_conn() self._initialize_cloud_conn()
path = self._init_path(path) path = self._init_path(path)
@ -233,6 +236,9 @@ class GoogleCloudStorage(_CloudStorage):
if content_type is not None: if content_type is not None:
key.set_metadata('Content-Type', content_type) key.set_metadata('Content-Type', content_type)
if content_encoding is not None:
key.set_metadata('Content-Encoding', content_encoding)
key.set_contents_from_stream(fp) key.set_contents_from_stream(fp)

View file

@ -14,7 +14,7 @@ class FakeStorage(BaseStorage):
def stream_read(self, path): def stream_read(self, path):
yield '' yield ''
def stream_write(self, path, fp, content_type=None): def stream_write(self, path, fp, content_type=None, content_encoding=None):
pass pass
def remove(self, path): def remove(self, path):

View file

@ -43,7 +43,7 @@ class LocalStorage(BaseStorage):
path = self._init_path(path) path = self._init_path(path)
return io.open(path, mode='rb') return io.open(path, mode='rb')
def stream_write(self, path, fp, content_type=None): def stream_write(self, path, fp, content_type=None, content_encoding=None):
# Size is mandatory # Size is mandatory
path = self._init_path(path, create=True) path = self._init_path(path, create=True)
with open(path, mode='wb') as f: with open(path, mode='wb') as f:

View file

@ -198,3 +198,11 @@ class TestBuildLogs(RedisBuildLogs):
return None return None
else: else:
return super(TestBuildLogs, self).get_status(build_id) return super(TestBuildLogs, self).get_status(build_id)
def delete_log_entries(self, build_id):
if build_id == self.test_build_id:
return
if not self.allow_delegate:
return None
else:
return super(TestBuildLogs, self).delete_log_entries(build_id)

View file

@ -3,10 +3,12 @@ import logging
from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.schedulers.blocking import BlockingScheduler
from peewee import fn from peewee import fn
from tempfile import SpooledTemporaryFile from tempfile import SpooledTemporaryFile
from gzip import GzipFile
from data import model from data import model
from data.database import configure, RepositoryBuild from data.archivedlogs import JSON_MIMETYPE
from app import app, build_logs, log_archive from data.database import RepositoryBuild
from app import build_logs, log_archive
from util.streamingjsonencoder import StreamingJSONEncoder from util.streamingjsonencoder import StreamingJSONEncoder
POLL_PERIOD_SECONDS = 30 POLL_PERIOD_SECONDS = 30
@ -14,7 +16,7 @@ POLL_PERIOD_SECONDS = 30
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
sched = BlockingScheduler() sched = BlockingScheduler()
@sched.scheduled_job(trigger='interval', seconds=5) @sched.scheduled_job(trigger='interval', seconds=1)
def archive_redis_buildlogs(): def archive_redis_buildlogs():
""" Archive a single build, choosing a candidate at random. This process must be idempotent to """ Archive a single build, choosing a candidate at random. This process must be idempotent to
avoid needing two-phase commit. """ avoid needing two-phase commit. """
@ -30,8 +32,19 @@ def archive_redis_buildlogs():
'logs': entries, 'logs': entries,
} }
for chunk in StreamingJSONEncoder().iterencode(to_encode): with SpooledTemporaryFile() as tempfile:
print chunk with GzipFile('testarchive', fileobj=tempfile) as zipstream:
for chunk in StreamingJSONEncoder().iterencode(to_encode):
zipstream.write(chunk)
tempfile.seek(0)
log_archive.store_file(tempfile, JSON_MIMETYPE, content_encoding='gzip',
file_id=to_archive.uuid)
to_archive.logs_archived = True
to_archive.save()
build_logs.delete_log_entries(to_archive.uuid)
except RepositoryBuild.DoesNotExist: except RepositoryBuild.DoesNotExist:
logger.debug('No more builds to archive') logger.debug('No more builds to archive')