From 54fbb2a4c040c3430945747f6e08621620557dcb Mon Sep 17 00:00:00 2001 From: Jake Moshenko Date: Mon, 8 Sep 2014 16:42:43 -0400 Subject: [PATCH 01/12] Rename collections to morecollections to avoid a conflict with the built in module. --- data/billing.py | 2 +- data/migrations/env.py | 2 +- util/{collections.py => morecollections.py} | 0 3 files changed, 2 insertions(+), 2 deletions(-) rename util/{collections.py => morecollections.py} (100%) diff --git a/data/billing.py b/data/billing.py index 8c604aac2..e1510c054 100644 --- a/data/billing.py +++ b/data/billing.py @@ -3,7 +3,7 @@ import stripe from datetime import datetime, timedelta from calendar import timegm -from util.collections import AttrDict +from util.morecollections import AttrDict PLANS = [ # Deprecated Plans diff --git a/data/migrations/env.py b/data/migrations/env.py index 863e3d98f..d64cf4ee7 100644 --- a/data/migrations/env.py +++ b/data/migrations/env.py @@ -8,7 +8,7 @@ from peewee import SqliteDatabase from data.database import all_models, db from app import app from data.model.sqlalchemybridge import gen_sqlalchemy_metadata -from util.collections import AttrDict +from util.morecollections import AttrDict # this is the Alembic Config object, which provides # access to the values within the .ini file in use. diff --git a/util/collections.py b/util/morecollections.py similarity index 100% rename from util/collections.py rename to util/morecollections.py From 451e034ca17428c81b4de37583845147b9b12f1f Mon Sep 17 00:00:00 2001 From: Jake Moshenko Date: Mon, 8 Sep 2014 16:43:17 -0400 Subject: [PATCH 02/12] Archived logs commit 1. Squash me. --- app.py | 2 + config.py | 4 + data/archivedlogs.py | 39 +++++ data/database.py | 11 ++ data/model/legacy.py | 9 ++ data/userfiles.py | 12 +- util/streamingjsonencoder.py | 267 +++++++++++++++++++++++++++++++++++ workers/buildlogsarchiver.py | 42 ++++++ workers/dockerfilebuild.py | 38 ++--- 9 files changed, 402 insertions(+), 22 deletions(-) create mode 100644 data/archivedlogs.py create mode 100644 util/streamingjsonencoder.py create mode 100644 workers/buildlogsarchiver.py diff --git a/app.py b/app.py index 81c59a30c..e03480782 100644 --- a/app.py +++ b/app.py @@ -20,6 +20,7 @@ from util.exceptionlog import Sentry from util.queuemetrics import QueueMetrics from data.billing import Billing from data.buildlogs import BuildLogs +from data.archivedlogs import LogArchive from data.queue import WorkQueue from data.userevent import UserEventsBuilderModule from datetime import datetime @@ -89,6 +90,7 @@ login_manager = LoginManager(app) mail = Mail(app) storage = Storage(app) userfiles = Userfiles(app) +log_archive = LogArchive(app) analytics = Analytics(app) billing = Billing(app) sentry = Sentry(app) diff --git a/config.py b/config.py index f797cb36a..1355aa2a9 100644 --- a/config.py +++ b/config.py @@ -83,6 +83,10 @@ class DefaultConfig(object): BUILDLOGS_REDIS_HOSTNAME = 'logs.quay.io' BUILDLOGS_OPTIONS = [] + # Build logs archive + LOG_ARCHIVE_TYPE = 'LocalArchivedLogs' + LOG_ARCHIVE_PATH = 'test/data/registry/logarchive' + # Real-time user events USER_EVENTS_REDIS_HOSTNAME = 'logs.quay.io' diff --git a/data/archivedlogs.py b/data/archivedlogs.py new file mode 100644 index 000000000..0801b8815 --- /dev/null +++ b/data/archivedlogs.py @@ -0,0 +1,39 @@ +from data.userfiles import LocalUserfiles, UserfilesHandlers, S3Userfiles, FakeUserfiles + +class LogArchive(object): + def __init__(self, app=None): + self.app = app + if app is not None: + self.state = self.init_app(app) + else: + self.state = None + + def init_app(self, app): + storage_type = app.config.get('LOG_ARCHIVE_TYPE', 'LocalArchivedLogs') + path = app.config.get('LOG_ARCHIVE_PATH', '') + + if storage_type == 'LocalArchivedLogs': + archive = LocalUserfiles(app, path) + app.add_url_rule('/archivedlogs/', + view_func=UserfilesHandlers.as_view('log_archive_handlers', + local_userfiles=archive)) + + elif storage_type == 'S3ArchivedLogs': + access_key = app.config.get('LOG_ARCHIVE_AWS_ACCESS_KEY', '') + secret_key = app.config.get('LOG_ARCHIVE_AWS_SECRET_KEY', '') + bucket = app.config.get('LOG_ARCHIVE_S3_BUCKET', '') + archive = S3Userfiles(path, access_key, secret_key, bucket) + + elif storage_type == 'FakeArchivedLogs': + archive = FakeUserfiles() + + else: + raise RuntimeError('Unknown log archive type: %s' % storage_type) + + # register extension with app + app.extensions = getattr(app, 'extensions', {}) + app.extensions['log_archive'] = archive + return archive + + def __getattr__(self, name): + return getattr(self.state, name, None) diff --git a/data/database.py b/data/database.py index 4731a06bb..96e85a7d2 100644 --- a/data/database.py +++ b/data/database.py @@ -289,6 +289,16 @@ class RepositoryTag(BaseModel): ) +class BUILD_PHASE(object): + """ Build phases enum """ + ERROR = 'error' + UNPACKING = 'unpacking' + PULLING = 'pulling' + BUILDING = 'building' + PUSHING = 'pushing' + COMPLETE = 'complete' + + class RepositoryBuild(BaseModel): uuid = CharField(default=uuid_generator, index=True) repository = ForeignKeyField(Repository, index=True) @@ -300,6 +310,7 @@ class RepositoryBuild(BaseModel): display_name = CharField() trigger = ForeignKeyField(RepositoryBuildTrigger, null=True, index=True) pull_robot = ForeignKeyField(User, null=True, related_name='buildpullrobot') + logs_archived = BooleanField(default=False) class QueueItem(BaseModel): diff --git a/data/model/legacy.py b/data/model/legacy.py index 64bcdc860..92a130dca 100644 --- a/data/model/legacy.py +++ b/data/model/legacy.py @@ -12,6 +12,7 @@ from util.backoff import exponential_backoff EXPONENTIAL_BACKOFF_SCALE = timedelta(seconds=1) +PRESUMED_DEAD_BUILD_AGE = timedelta(days=15) logger = logging.getLogger(__name__) @@ -1877,3 +1878,11 @@ def confirm_email_authorization_for_repo(code): found.save() return found + + +def archivable_buildlogs_query(): + presumed_dead_date = datetime.utcnow() - PRESUMED_DEAD_BUILD_AGE + return (RepositoryBuild.select() + .where((RepositoryBuild.phase == BUILD_PHASE.COMPLETE) | + (RepositoryBuild.phase == BUILD_PHASE.ERROR) | + (RepositoryBuild.started < presumed_dead_date), RepositoryBuild.logs_archived == False)) diff --git a/data/userfiles.py b/data/userfiles.py index 79fbcb507..c12553493 100644 --- a/data/userfiles.py +++ b/data/userfiles.py @@ -58,9 +58,12 @@ class S3Userfiles(object): encrypt_key=True) return (url, file_id) - def store_file(self, file_like_obj, content_type): + def store_file(self, file_like_obj, content_type, file_id=None): self._initialize_s3() - file_id = str(uuid4()) + + if file_id is None: + file_id = str(uuid4()) + full_key = os.path.join(self._prefix, file_id) k = Key(self._bucket, full_key) logger.debug('Setting s3 content type to: %s' % content_type) @@ -161,8 +164,9 @@ class LocalUserfiles(object): except IOError: break - def store_file(self, file_like_obj, content_type): - file_id = str(uuid4()) + def store_file(self, file_like_obj, content_type, file_id=None): + if file_id is None: + file_id = str(uuid4()) # Rewind the file to match what s3 does file_like_obj.seek(0, os.SEEK_SET) diff --git a/util/streamingjsonencoder.py b/util/streamingjsonencoder.py new file mode 100644 index 000000000..f51a4ec9b --- /dev/null +++ b/util/streamingjsonencoder.py @@ -0,0 +1,267 @@ +# Adapted from https://gist.github.com/akaihola/1415730#file-streamingjson-py + +# Copyright (c) Django Software Foundation and individual contributors. +# All rights reserved. + +# Redistribution and use in source and binary forms, with or without modification, +# are permitted provided that the following conditions are met: + +# 1. Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. + +# 2. Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. + +# 3. Neither the name of Django nor the names of its contributors may be used +# to endorse or promote products derived from this software without +# specific prior written permission. + +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR +# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON +# ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import collections +import json +from json.encoder import encode_basestring, encode_basestring_ascii, FLOAT_REPR, INFINITY +from types import GeneratorType + + +class StreamingJSONEncoder(json.JSONEncoder): + def iterencode(self, o, _one_shot=False): + """Encode the given object and yield each string + representation as available. + + For example:: + + for chunk in StreamingJSONEncoder().iterencode(bigobject): + mysocket.write(chunk) + + This method is a verbatim copy of + :meth:`json.JSONEncoder.iterencode`. It is + needed because we need to call our patched + :func:`streamingjsonencoder._make_iterencode`. + """ + if self.check_circular: + markers = {} + else: + markers = None + if self.ensure_ascii: + _encoder = encode_basestring_ascii + else: + _encoder = encode_basestring + if self.encoding != 'utf-8': + def _encoder(o, _orig_encoder=_encoder, _encoding=self.encoding): + if isinstance(o, str): + o = o.decode(_encoding) + return _orig_encoder(o) + + def floatstr(o, allow_nan=self.allow_nan, _repr=FLOAT_REPR, _inf=INFINITY, _neginf=-INFINITY): + # Check for specials. Note that this type of test is processor- and/or + # platform-specific, so do tests which don't depend on the internals. + + if o != o: + text = 'NaN' + elif o == _inf: + text = 'Infinity' + elif o == _neginf: + text = '-Infinity' + else: + return _repr(o) + + if not allow_nan: + raise ValueError("Out of range float values are not JSON compliant: %r" + % (o,)) + + return text + + _iterencode = _make_iterencode( + markers, self.default, _encoder, self.indent, floatstr, + self.key_separator, self.item_separator, self.sort_keys, + self.skipkeys, _one_shot) + return _iterencode(o, 0) + + +def _make_iterencode(markers, _default, _encoder, _indent, _floatstr, _key_separator, + _item_separator, _sort_keys, _skipkeys, _one_shot, False=False, True=True, + ValueError=ValueError, basestring=basestring, dict=dict, float=float, + GeneratorType=GeneratorType, id=id, int=int, isinstance=isinstance, list=list, + long=long, str=str, tuple=tuple): + """ + This is a patched version of + :func:`django.utils.simplejson.encoder.iterencode`. Whenever it encounters + a generator in the data structure, it encodes it as a JSON list. + """ + def _iterencode_list(lst, _current_indent_level): + if not lst: + # note: empty generators aren't caught here, see below + yield '[]' + return + if markers is not None: + markerid = id(lst) + if markerid in markers: + raise ValueError("Circular reference detected") + markers[markerid] = lst + buf = '[' + if _indent is not None: + _current_indent_level += 1 + newline_indent = '\n' + (' ' * (_indent * _current_indent_level)) + separator = _item_separator + newline_indent + buf += newline_indent + else: + newline_indent = None + separator = _item_separator + first = True + for value in lst: + if first: + first = False + else: + buf = separator + if isinstance(value, basestring): + yield buf + _encoder(value) + elif value is None: + yield buf + 'null' + elif value is True: + yield buf + 'true' + elif value is False: + yield buf + 'false' + elif isinstance(value, (int, long)): + yield buf + str(value) + elif isinstance(value, float): + yield buf + _floatstr(value) + else: + yield buf + if isinstance(value, (list, tuple, GeneratorType)): + chunks = _iterencode_list(value, _current_indent_level) + elif isinstance(value, dict): + chunks = _iterencode_dict(value, _current_indent_level) + else: + chunks = _iterencode(value, _current_indent_level) + for chunk in chunks: + yield chunk + if first: + # we had an empty generator + yield buf + if newline_indent is not None: + _current_indent_level -= 1 + yield '\n' + (' ' * (_indent * _current_indent_level)) + yield ']' + if markers is not None: + del markers[markerid] + + def _iterencode_dict(dct, _current_indent_level): + if not dct: + yield '{}' + return + if markers is not None: + markerid = id(dct) + if markerid in markers: + raise ValueError("Circular reference detected") + markers[markerid] = dct + yield '{' + if _indent is not None: + _current_indent_level += 1 + newline_indent = '\n' + (' ' * (_indent * _current_indent_level)) + item_separator = _item_separator + newline_indent + yield newline_indent + else: + newline_indent = None + item_separator = _item_separator + first = True + if _sort_keys: + items = dct.items() + items.sort(key=lambda kv: kv[0]) + else: + items = dct.iteritems() + for key, value in items: + if isinstance(key, basestring): + pass + # JavaScript is weakly typed for these, so it makes sense to + # also allow them. Many encoders seem to do something like this. + elif isinstance(key, float): + key = _floatstr(key) + elif isinstance(key, (int, long)): + key = str(key) + elif key is True: + key = 'true' + elif key is False: + key = 'false' + elif key is None: + key = 'null' + elif _skipkeys: + continue + else: + raise TypeError("key %r is not a string" % (key,)) + if first: + first = False + else: + yield item_separator + yield _encoder(key) + yield _key_separator + if isinstance(value, basestring): + yield _encoder(value) + elif value is None: + yield 'null' + elif value is True: + yield 'true' + elif value is False: + yield 'false' + elif isinstance(value, (int, long)): + yield str(value) + elif isinstance(value, float): + yield _floatstr(value) + else: + if isinstance(value, collections.Mapping): + chunks = _iterencode_dict(value, _current_indent_level) + elif isinstance(value, collections.Iterable): + chunks = _iterencode_list(value, _current_indent_level) + else: + chunks = _iterencode(value, _current_indent_level) + for chunk in chunks: + yield chunk + if newline_indent is not None: + _current_indent_level -= 1 + yield '\n' + (' ' * (_indent * _current_indent_level)) + yield '}' + if markers is not None: + del markers[markerid] + + def _iterencode(o, _current_indent_level): + if isinstance(o, basestring): + yield _encoder(o) + elif o is None: + yield 'null' + elif o is True: + yield 'true' + elif o is False: + yield 'false' + elif isinstance(o, (int, long)): + yield str(o) + elif isinstance(o, float): + yield _floatstr(o) + elif isinstance(o, collections.Mapping): + for chunk in _iterencode_dict(o, _current_indent_level): + yield chunk + elif isinstance(o, collections.Iterable): + for chunk in _iterencode_list(o, _current_indent_level): + yield chunk + else: + if markers is not None: + markerid = id(o) + if markerid in markers: + raise ValueError("Circular reference detected") + markers[markerid] = o + o = _default(o) + for chunk in _iterencode(o, _current_indent_level): + yield chunk + if markers is not None: + del markers[markerid] + + return _iterencode \ No newline at end of file diff --git a/workers/buildlogsarchiver.py b/workers/buildlogsarchiver.py new file mode 100644 index 000000000..c11131890 --- /dev/null +++ b/workers/buildlogsarchiver.py @@ -0,0 +1,42 @@ +import logging + +from apscheduler.schedulers.blocking import BlockingScheduler +from peewee import fn +from tempfile import SpooledTemporaryFile + +from data import model +from data.database import configure, RepositoryBuild +from app import app, build_logs, log_archive +from util.streamingjsonencoder import StreamingJSONEncoder + +POLL_PERIOD_SECONDS = 30 + +logger = logging.getLogger(__name__) +sched = BlockingScheduler() + +@sched.scheduled_job(trigger='interval', seconds=5) +def archive_redis_buildlogs(): + """ Archive a single build, choosing a candidate at random. This process must be idempotent to + avoid needing two-phase commit. """ + try: + # Get a random build to archive + to_archive = model.archivable_buildlogs_query().order_by(fn.Random()).get() + logger.debug('Archiving: %s', to_archive.uuid) + + length, entries = build_logs.get_log_entries(to_archive.uuid, 0) + to_encode = { + 'start': 0, + 'total': length, + 'logs': entries, + } + + for chunk in StreamingJSONEncoder().iterencode(to_encode): + print chunk + + except RepositoryBuild.DoesNotExist: + logger.debug('No more builds to archive') + + +if __name__ == "__main__": + logging.basicConfig(level=logging.DEBUG) + sched.start() diff --git a/workers/dockerfilebuild.py b/workers/dockerfilebuild.py index b373a00a9..b9dfa99c1 100644 --- a/workers/dockerfilebuild.py +++ b/workers/dockerfilebuild.py @@ -1,6 +1,7 @@ import logging.config -logging.config.fileConfig('conf/logging.conf', disable_existing_loggers=False) +if __name__ == "__main__": + logging.config.fileConfig('conf/logging.conf', disable_existing_loggers=False) import logging import argparse @@ -23,6 +24,7 @@ from collections import defaultdict from requests.exceptions import ConnectionError from data import model +from data.database import BUILD_PHASE from workers.worker import Worker, WorkerUnhealthyException, JobException from app import userfiles as user_files, build_logs, sentry, dockerfile_build_queue from endpoints.notificationhelper import spawn_notification @@ -545,7 +547,7 @@ class DockerfileBuildWorker(Worker): if c_type not in self._mime_processors: log_appender('error', build_logs.PHASE) - repository_build.phase = 'error' + repository_build.phase = BUILD_PHASE.ERROR repository_build.save() message = 'Unknown mime-type: %s' % c_type log_appender(message, build_logs.ERROR) @@ -554,7 +556,7 @@ class DockerfileBuildWorker(Worker): # Try to build the build directory package from the buildpack. log_appender('unpacking', build_logs.PHASE) - repository_build.phase = 'unpacking' + repository_build.phase = BUILD_PHASE.UNPACKING repository_build.save() build_dir = None @@ -572,20 +574,20 @@ class DockerfileBuildWorker(Worker): repository_build.uuid, self._cache_size_gb, pull_credentials) as build_ctxt: log_appender('pulling', build_logs.PHASE) - repository_build.phase = 'pulling' + repository_build.phase = BUILD_PHASE.PULLING repository_build.save() build_ctxt.pull() self.extend_processing(RESERVATION_TIME) log_appender('building', build_logs.PHASE) - repository_build.phase = 'building' + repository_build.phase = BUILD_PHASE.BUILDING repository_build.save() built_image = build_ctxt.build(self.extend_processing) if not built_image: log_appender('error', build_logs.PHASE) - repository_build.phase = 'error' + repository_build.phase = BUILD_PHASE.ERROR repository_build.save() message = 'Unable to build dockerfile.' @@ -598,13 +600,13 @@ class DockerfileBuildWorker(Worker): self.extend_processing(RESERVATION_TIME) log_appender('pushing', build_logs.PHASE) - repository_build.phase = 'pushing' + repository_build.phase = BUILD_PHASE.PUSHING repository_build.save() build_ctxt.push(built_image) log_appender('complete', build_logs.PHASE) - repository_build.phase = 'complete' + repository_build.phase = BUILD_PHASE.COMPLETE repository_build.save() # Spawn a notification that the build has completed. @@ -641,20 +643,20 @@ class DockerfileBuildWorker(Worker): sentry.client.captureException() log_appender('error', build_logs.PHASE) logger.exception('Exception when processing request.') - repository_build.phase = 'error' + repository_build.phase = BUILD_PHASE.ERROR repository_build.save() log_appender(str(exc), build_logs.ERROR) # Raise the exception to the queue. raise JobException(str(exc)) +if __name__ == "__main__": + desc = 'Worker daemon to monitor dockerfile build' + parser = argparse.ArgumentParser(description=desc) + parser.add_argument('--cachegb', default=20, type=float, + help='Maximum cache size in gigabytes.') + args = parser.parse_args() -desc = 'Worker daemon to monitor dockerfile build' -parser = argparse.ArgumentParser(description=desc) -parser.add_argument('--cachegb', default=20, type=float, - help='Maximum cache size in gigabytes.') -args = parser.parse_args() - -worker = DockerfileBuildWorker(args.cachegb, dockerfile_build_queue, - reservation_seconds=RESERVATION_TIME) -worker.start(start_status_server_port=8000) + worker = DockerfileBuildWorker(args.cachegb, dockerfile_build_queue, + reservation_seconds=RESERVATION_TIME) + worker.start(start_status_server_port=8000) From 8b3a3178b0f69ef53c88fe5c10cc7300edfbe960 Mon Sep 17 00:00:00 2001 From: Jake Moshenko Date: Thu, 11 Sep 2014 15:33:10 -0400 Subject: [PATCH 03/12] Finish the build logs archiver, add handlers for cloud and local that handle gzip encoded archived content. --- data/archivedlogs.py | 33 +++++++++++++++++++++++++++++---- data/buildlogs.py | 7 +++++++ data/userfiles.py | 5 +++-- endpoints/api/build.py | 8 ++++++-- storage/basestorage.py | 2 +- storage/cloud.py | 10 ++++++++-- storage/fakestorage.py | 2 +- storage/local.py | 2 +- test/testlogs.py | 8 ++++++++ workers/buildlogsarchiver.py | 23 ++++++++++++++++++----- 10 files changed, 82 insertions(+), 18 deletions(-) diff --git a/data/archivedlogs.py b/data/archivedlogs.py index bde3e9151..e190b9782 100644 --- a/data/archivedlogs.py +++ b/data/archivedlogs.py @@ -1,5 +1,30 @@ +import logging + +from gzip import GzipFile +from flask import send_file, abort +from cStringIO import StringIO + from data.userfiles import DelegateUserfiles, UserfilesHandlers + +JSON_MIMETYPE = 'application/json' + + +logger = logging.getLogger(__name__) + + +class LogArchiveHandlers(UserfilesHandlers): + def get(self, file_id): + path = self._files.get_file_id_path(file_id) + try: + with self._storage.stream_read_file(self._locations, path) as gzip_stream: + with GzipFile(fileobj=gzip_stream) as unzipped: + unzipped_buffer = StringIO(unzipped.read()) + return send_file(unzipped_buffer, mimetype=JSON_MIMETYPE) + except IOError: + abort(404) + + class LogArchive(object): def __init__(self, app=None, distributed_storage=None): self.app = app @@ -17,10 +42,10 @@ class LogArchive(object): log_archive = DelegateUserfiles(app, distributed_storage, location, path, handler_name) app.add_url_rule('/logarchive/', - view_func=UserfilesHandlers.as_view(handler_name, - distributed_storage=distributed_storage, - location=location, - files=log_archive)) + view_func=LogArchiveHandlers.as_view(handler_name, + distributed_storage=distributed_storage, + location=location, + files=log_archive)) # register extension with app app.extensions = getattr(app, 'extensions', {}) diff --git a/data/buildlogs.py b/data/buildlogs.py index 2ccd03899..4fdd24e72 100644 --- a/data/buildlogs.py +++ b/data/buildlogs.py @@ -51,6 +51,13 @@ class RedisBuildLogs(object): except redis.ConnectionError: raise BuildStatusRetrievalError('Cannot retrieve build logs') + def delete_log_entries(self, build_id): + """ + Deletes the logs and status keys completely. + """ + self._redis.delete(self._logs_key(build_id)) + + @staticmethod def _status_key(build_id): return 'builds/%s/status' % build_id diff --git a/data/userfiles.py b/data/userfiles.py index 8e7227e01..f4b786df5 100644 --- a/data/userfiles.py +++ b/data/userfiles.py @@ -81,12 +81,13 @@ class DelegateUserfiles(object): return (url, file_id) - def store_file(self, file_like_obj, content_type, file_id=None): + def store_file(self, file_like_obj, content_type, content_encoding=None, file_id=None): if file_id is None: file_id = str(uuid4()) path = self.get_file_id_path(file_id) - self._storage.stream_write(self._locations, path, file_like_obj, content_type) + self._storage.stream_write(self._locations, path, file_like_obj, content_type, + content_encoding) return file_id def get_file_url(self, file_id, expires_in=300, requires_cors=False): diff --git a/endpoints/api/build.py b/endpoints/api/build.py index 74677fadb..d792234dd 100644 --- a/endpoints/api/build.py +++ b/endpoints/api/build.py @@ -1,9 +1,9 @@ import logging import json -from flask import request +from flask import request, redirect -from app import app, userfiles as user_files, build_logs +from app import app, userfiles as user_files, build_logs, log_archive from endpoints.api import (RepositoryParamResource, parse_args, query_param, nickname, resource, require_repo_read, require_repo_write, validate_json_request, ApiResource, internal_only, format_date, api, Unauthorized, NotFound) @@ -215,6 +215,10 @@ class RepositoryBuildLogs(RepositoryParamResource): build = model.get_repository_build(namespace, repository, build_uuid) + # If the logs have been archived, just redirect to the completed archive + if build.logs_archived: + return redirect(log_archive.get_file_url(build.uuid)) + start = int(request.args.get('start', 0)) try: diff --git a/storage/basestorage.py b/storage/basestorage.py index 78d49aa1f..675c3a738 100644 --- a/storage/basestorage.py +++ b/storage/basestorage.py @@ -75,7 +75,7 @@ class BaseStorage(StoragePaths): def stream_read_file(self, path): raise NotImplementedError - def stream_write(self, path, fp, content_type=None): + def stream_write(self, path, fp, content_type=None, content_encoding=None): raise NotImplementedError def list_directory(self, path=None): diff --git a/storage/cloud.py b/storage/cloud.py index f7d922d6c..824c57534 100644 --- a/storage/cloud.py +++ b/storage/cloud.py @@ -128,7 +128,7 @@ class _CloudStorage(BaseStorage): raise IOError('No such key: \'{0}\''.format(path)) return StreamReadKeyAsFile(key) - def stream_write(self, path, fp, content_type=None): + def stream_write(self, path, fp, content_type=None, content_encoding=None): # Minimum size of upload part size on S3 is 5MB self._initialize_cloud_conn() buffer_size = 5 * 1024 * 1024 @@ -140,6 +140,9 @@ class _CloudStorage(BaseStorage): if content_type is not None: metadata['Content-Type'] = content_type + if content_encoding is not None: + metadata['Content-Encoding'] = content_encoding + mp = self._cloud_bucket.initiate_multipart_upload(path, metadata=metadata, **self._upload_params) num_part = 1 @@ -224,7 +227,7 @@ class GoogleCloudStorage(_CloudStorage): connect_kwargs, upload_params, storage_path, access_key, secret_key, bucket_name) - def stream_write(self, path, fp, content_type=None): + def stream_write(self, path, fp, content_type=None, content_encoding=None): # Minimum size of upload part size on S3 is 5MB self._initialize_cloud_conn() path = self._init_path(path) @@ -233,6 +236,9 @@ class GoogleCloudStorage(_CloudStorage): if content_type is not None: key.set_metadata('Content-Type', content_type) + if content_encoding is not None: + key.set_metadata('Content-Encoding', content_encoding) + key.set_contents_from_stream(fp) diff --git a/storage/fakestorage.py b/storage/fakestorage.py index 232f5af24..cebed1e80 100644 --- a/storage/fakestorage.py +++ b/storage/fakestorage.py @@ -14,7 +14,7 @@ class FakeStorage(BaseStorage): def stream_read(self, path): yield '' - def stream_write(self, path, fp, content_type=None): + def stream_write(self, path, fp, content_type=None, content_encoding=None): pass def remove(self, path): diff --git a/storage/local.py b/storage/local.py index 987431e33..056c68c05 100644 --- a/storage/local.py +++ b/storage/local.py @@ -43,7 +43,7 @@ class LocalStorage(BaseStorage): path = self._init_path(path) return io.open(path, mode='rb') - def stream_write(self, path, fp, content_type=None): + def stream_write(self, path, fp, content_type=None, content_encoding=None): # Size is mandatory path = self._init_path(path, create=True) with open(path, mode='wb') as f: diff --git a/test/testlogs.py b/test/testlogs.py index 27fe7c47b..023fd159d 100644 --- a/test/testlogs.py +++ b/test/testlogs.py @@ -198,3 +198,11 @@ class TestBuildLogs(RedisBuildLogs): return None else: return super(TestBuildLogs, self).get_status(build_id) + + def delete_log_entries(self, build_id): + if build_id == self.test_build_id: + return + if not self.allow_delegate: + return None + else: + return super(TestBuildLogs, self).delete_log_entries(build_id) diff --git a/workers/buildlogsarchiver.py b/workers/buildlogsarchiver.py index c11131890..c51666d73 100644 --- a/workers/buildlogsarchiver.py +++ b/workers/buildlogsarchiver.py @@ -3,10 +3,12 @@ import logging from apscheduler.schedulers.blocking import BlockingScheduler from peewee import fn from tempfile import SpooledTemporaryFile +from gzip import GzipFile from data import model -from data.database import configure, RepositoryBuild -from app import app, build_logs, log_archive +from data.archivedlogs import JSON_MIMETYPE +from data.database import RepositoryBuild +from app import build_logs, log_archive from util.streamingjsonencoder import StreamingJSONEncoder POLL_PERIOD_SECONDS = 30 @@ -14,7 +16,7 @@ POLL_PERIOD_SECONDS = 30 logger = logging.getLogger(__name__) sched = BlockingScheduler() -@sched.scheduled_job(trigger='interval', seconds=5) +@sched.scheduled_job(trigger='interval', seconds=1) def archive_redis_buildlogs(): """ Archive a single build, choosing a candidate at random. This process must be idempotent to avoid needing two-phase commit. """ @@ -30,8 +32,19 @@ def archive_redis_buildlogs(): 'logs': entries, } - for chunk in StreamingJSONEncoder().iterencode(to_encode): - print chunk + with SpooledTemporaryFile() as tempfile: + with GzipFile('testarchive', fileobj=tempfile) as zipstream: + for chunk in StreamingJSONEncoder().iterencode(to_encode): + zipstream.write(chunk) + + tempfile.seek(0) + log_archive.store_file(tempfile, JSON_MIMETYPE, content_encoding='gzip', + file_id=to_archive.uuid) + + to_archive.logs_archived = True + to_archive.save() + + build_logs.delete_log_entries(to_archive.uuid) except RepositoryBuild.DoesNotExist: logger.debug('No more builds to archive') From 8a94e38028d2b2205ce3dcfde06d3cda81ba642a Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Thu, 11 Sep 2014 19:59:44 -0400 Subject: [PATCH 04/12] - Add an AngularPollChannel class for easier handling of HTTP polling. - Convert the build view page over to use the new class - Add code so that if the builds logs returned by the API start in the set we already have, we only add the new ones --- static/js/app.js | 72 +++++++++++++++++++++++++ static/js/controllers.js | 93 +++++++++++++-------------------- static/partials/repo-build.html | 2 +- 3 files changed, 110 insertions(+), 57 deletions(-) diff --git a/static/js/app.js b/static/js/app.js index 26b8a4be1..b5a4b3ce1 100644 --- a/static/js/app.js +++ b/static/js/app.js @@ -215,6 +215,78 @@ quayApp = angular.module('quay', quayDependencies, function($provide, cfpLoading return service; }]); + /** + * Specialized class for conducting an HTTP poll, while properly preventing multiple calls. + */ + $provide.factory('AngularPollChannel', ['ApiService', '$timeout', function(ApiService, $timeout) { + var _PollChannel = function(scope, requester, opt_sleeptime) { + this.scope_ = scope; + this.requester_ = requester; + this.sleeptime_ = opt_sleeptime || (60 * 1000 /* 60s */); + this.timer_ = null; + + this.working = false; + this.polling = false; + + var that = this; + scope.$on('$destroy', function() { + that.stop(); + }); + }; + + _PollChannel.prototype.stop = function() { + if (this.timer_) { + $timeout.cancel(this.timer_); + this.timer_ = null; + this.polling_ = false; + } + + this.working = false; + }; + + _PollChannel.prototype.start = function() { + // Make sure we invoke call outside the normal digest cycle, since + // we'll call $scope.$apply ourselves. + var that = this; + setTimeout(function() { that.call_(); }, 0); + }; + + _PollChannel.prototype.call_ = function() { + if (this.working) { return; } + + var that = this; + this.working = true; + this.scope_.$apply(function() { + that.requester_(function(status) { + if (status) { + that.working = false; + that.setupTimer_(); + } else { + that.stop(); + } + }); + }); + }; + + _PollChannel.prototype.setupTimer_ = function() { + if (this.timer_) { return; } + + var that = this; + this.polling = true; + this.timer_ = $timeout(function() { + that.timer_ = null; + that.call_(); + }, this.sleeptime_) + }; + + var service = { + 'create': function(scope, requester, opt_sleeptime) { + return new _PollChannel(scope, requester, opt_sleeptime); + } + }; + + return service; + }]); $provide.factory('DataFileService', [function() { var dataFileService = {}; diff --git a/static/js/controllers.js b/static/js/controllers.js index 9131a0140..f259ead68 100644 --- a/static/js/controllers.js +++ b/static/js/controllers.js @@ -978,14 +978,9 @@ function BuildPackageCtrl($scope, Restangular, ApiService, DataFileService, $rou } function RepoBuildCtrl($scope, Restangular, ApiService, $routeParams, $rootScope, $location, $interval, $sanitize, - ansi2html, AngularViewArray) { + ansi2html, AngularViewArray, AngularPollChannel) { var namespace = $routeParams.namespace; var name = $routeParams.name; - var pollTimerHandle = null; - - $scope.$on('$destroy', function() { - stopPollTimer(); - }); // Watch for changes to the current parameter. $scope.$on('$routeUpdate', function(){ @@ -995,8 +990,7 @@ function RepoBuildCtrl($scope, Restangular, ApiService, $routeParams, $rootScope }); $scope.builds = null; - $scope.polling = false; - + $scope.pollChannel = null; $scope.buildDialogShowCounter = 0; $scope.showNewBuildDialog = function() { @@ -1081,8 +1075,6 @@ function RepoBuildCtrl($scope, Restangular, ApiService, $routeParams, $rootScope $scope.setCurrentBuildInternal = function(index, build, opt_updateURL) { if (build == $scope.currentBuild) { return; } - stopPollTimer(); - $scope.logEntries = null; $scope.logStartIndex = null; $scope.currentParentEntry = null; @@ -1103,47 +1095,35 @@ function RepoBuildCtrl($scope, Restangular, ApiService, $routeParams, $rootScope $scope.adjustLogHeight(); }, 1); - // Load the first set of logs. - getBuildStatusAndLogs(); - - // If the build is currently processing, start the build timer. - checkPollTimer(); - }; - - var checkPollTimer = function() { - var build = $scope.currentBuild; - if (!build) { - stopPollTimer(); - return; + // Stop any existing polling. + if ($scope.pollChannel) { + $scope.pollChannel.stop(); } + + // Create a new channel for polling the build status and logs. + var conductStatusAndLogRequest = function(callback) { + getBuildStatusAndLogs(build, callback); + }; - if (build['phase'] != 'complete' && build['phase'] != 'error') { - startPollTimer(); - return true; - } else { - stopPollTimer(); - return false; - } + $scope.pollChannel = AngularPollChannel.create($scope, conductStatusAndLogRequest, 5 * 1000 /* 5s */); + $scope.pollChannel.start(); }; - var stopPollTimer = function() { - $interval.cancel(pollTimerHandle); - }; - - var startPollTimer = function() { - stopPollTimer(); - pollTimerHandle = $interval(getBuildStatusAndLogs, 2000); - }; - - var processLogs = function(logs, startIndex) { + var processLogs = function(logs, startIndex, endIndex) { if (!$scope.logEntries) { $scope.logEntries = []; } + // If the start index given is less than that requested, then we've received a larger + // pool of logs, and we need to only consider the new ones. + if (startIndex < $scope.logStartIndex) { + logs = logs.slice($scope.logStartIndex - startIndex); + } + for (var i = 0; i < logs.length; ++i) { var entry = logs[i]; var type = entry['type'] || 'entry'; if (type == 'command' || type == 'phase' || type == 'error') { entry['logs'] = AngularViewArray.create(); - entry['index'] = startIndex + i; + entry['index'] = $scope.logStartIndex + i; $scope.logEntries.push(entry); $scope.currentParentEntry = entry; @@ -1151,18 +1131,19 @@ function RepoBuildCtrl($scope, Restangular, ApiService, $routeParams, $rootScope $scope.currentParentEntry['logs'].push(entry); } } + + return endIndex; }; - var getBuildStatusAndLogs = function() { - if (!$scope.currentBuild || $scope.polling) { return; } - $scope.polling = true; - + var getBuildStatusAndLogs = function(build, callback) { var params = { 'repository': namespace + '/' + name, - 'build_uuid': $scope.currentBuild.id + 'build_uuid': build.id }; ApiService.getRepoBuildStatus(null, params, true).then(function(resp) { + if (build != $scope.currentBuild) { callback(false); return; } + // Note: We use extend here rather than replacing as Angular is depending on the // root build object to remain the same object. var matchingBuilds = $.grep($scope.builds, function(elem) { @@ -1177,22 +1158,16 @@ function RepoBuildCtrl($scope, Restangular, ApiService, $routeParams, $rootScope $scope.builds.push(currentBuild); } - checkPollTimer(); - // Load the updated logs for the build. var options = { 'start': $scope.logStartIndex }; - ApiService.getRepoBuildLogsAsResource(params, true).withOptions(options).get(function(resp) { - if ($scope.logStartIndex != null && resp['start'] != $scope.logStartIndex) { - $scope.polling = false; - return; - } + ApiService.getRepoBuildLogsAsResource(params, true).withOptions(options).get(function(resp) { + if (build != $scope.currentBuild) { callback(false); return; } - processLogs(resp['logs'], resp['start']); - $scope.logStartIndex = resp['total']; - $scope.polling = false; + // Process the logs we've received. + $scope.logStartIndex = processLogs(resp['logs'], resp['start'], resp['total']); // If the build status is an error, open the last two log entries. if (currentBuild['phase'] == 'error' && $scope.logEntries.length > 1) { @@ -1205,9 +1180,15 @@ function RepoBuildCtrl($scope, Restangular, ApiService, $routeParams, $rootScope openLogEntries($scope.logEntries[$scope.logEntries.length - 2]); openLogEntries($scope.logEntries[$scope.logEntries.length - 1]); } + + // If the build phase is an error or a complete, then we mark the channel + // as closed. + callback(currentBuild['phase'] != 'error' && currentBuild['phase'] != 'complete'); }, function() { - $scope.polling = false; + callback(false); }); + }, function() { + callback(false); }); }; diff --git a/static/partials/repo-build.html b/static/partials/repo-build.html index 3afe87508..214078a03 100644 --- a/static/partials/repo-build.html +++ b/static/partials/repo-build.html @@ -94,7 +94,7 @@
- +