diff --git a/app.py b/app.py index 81c59a30c..e03480782 100644 --- a/app.py +++ b/app.py @@ -20,6 +20,7 @@ from util.exceptionlog import Sentry from util.queuemetrics import QueueMetrics from data.billing import Billing from data.buildlogs import BuildLogs +from data.archivedlogs import LogArchive from data.queue import WorkQueue from data.userevent import UserEventsBuilderModule from datetime import datetime @@ -89,6 +90,7 @@ login_manager = LoginManager(app) mail = Mail(app) storage = Storage(app) userfiles = Userfiles(app) +log_archive = LogArchive(app) analytics = Analytics(app) billing = Billing(app) sentry = Sentry(app) diff --git a/config.py b/config.py index f797cb36a..1355aa2a9 100644 --- a/config.py +++ b/config.py @@ -83,6 +83,10 @@ class DefaultConfig(object): BUILDLOGS_REDIS_HOSTNAME = 'logs.quay.io' BUILDLOGS_OPTIONS = [] + # Build logs archive + LOG_ARCHIVE_TYPE = 'LocalArchivedLogs' + LOG_ARCHIVE_PATH = 'test/data/registry/logarchive' + # Real-time user events USER_EVENTS_REDIS_HOSTNAME = 'logs.quay.io' diff --git a/data/archivedlogs.py b/data/archivedlogs.py new file mode 100644 index 000000000..0801b8815 --- /dev/null +++ b/data/archivedlogs.py @@ -0,0 +1,39 @@ +from data.userfiles import LocalUserfiles, UserfilesHandlers, S3Userfiles, FakeUserfiles + +class LogArchive(object): + def __init__(self, app=None): + self.app = app + if app is not None: + self.state = self.init_app(app) + else: + self.state = None + + def init_app(self, app): + storage_type = app.config.get('LOG_ARCHIVE_TYPE', 'LocalArchivedLogs') + path = app.config.get('LOG_ARCHIVE_PATH', '') + + if storage_type == 'LocalArchivedLogs': + archive = LocalUserfiles(app, path) + app.add_url_rule('/archivedlogs/', + view_func=UserfilesHandlers.as_view('log_archive_handlers', + local_userfiles=archive)) + + elif storage_type == 'S3ArchivedLogs': + access_key = app.config.get('LOG_ARCHIVE_AWS_ACCESS_KEY', '') + secret_key = app.config.get('LOG_ARCHIVE_AWS_SECRET_KEY', '') + bucket = app.config.get('LOG_ARCHIVE_S3_BUCKET', '') + archive = S3Userfiles(path, access_key, secret_key, bucket) + + elif storage_type == 'FakeArchivedLogs': + archive = FakeUserfiles() + + else: + raise RuntimeError('Unknown log archive type: %s' % storage_type) + + # register extension with app + app.extensions = getattr(app, 'extensions', {}) + app.extensions['log_archive'] = archive + return archive + + def __getattr__(self, name): + return getattr(self.state, name, None) diff --git a/data/database.py b/data/database.py index 4731a06bb..96e85a7d2 100644 --- a/data/database.py +++ b/data/database.py @@ -289,6 +289,16 @@ class RepositoryTag(BaseModel): ) +class BUILD_PHASE(object): + """ Build phases enum """ + ERROR = 'error' + UNPACKING = 'unpacking' + PULLING = 'pulling' + BUILDING = 'building' + PUSHING = 'pushing' + COMPLETE = 'complete' + + class RepositoryBuild(BaseModel): uuid = CharField(default=uuid_generator, index=True) repository = ForeignKeyField(Repository, index=True) @@ -300,6 +310,7 @@ class RepositoryBuild(BaseModel): display_name = CharField() trigger = ForeignKeyField(RepositoryBuildTrigger, null=True, index=True) pull_robot = ForeignKeyField(User, null=True, related_name='buildpullrobot') + logs_archived = BooleanField(default=False) class QueueItem(BaseModel): diff --git a/data/model/legacy.py b/data/model/legacy.py index 64bcdc860..92a130dca 100644 --- a/data/model/legacy.py +++ b/data/model/legacy.py @@ -12,6 +12,7 @@ from util.backoff import exponential_backoff EXPONENTIAL_BACKOFF_SCALE = timedelta(seconds=1) +PRESUMED_DEAD_BUILD_AGE = timedelta(days=15) logger = logging.getLogger(__name__) @@ -1877,3 +1878,11 @@ def confirm_email_authorization_for_repo(code): found.save() return found + + +def archivable_buildlogs_query(): + presumed_dead_date = datetime.utcnow() - PRESUMED_DEAD_BUILD_AGE + return (RepositoryBuild.select() + .where((RepositoryBuild.phase == BUILD_PHASE.COMPLETE) | + (RepositoryBuild.phase == BUILD_PHASE.ERROR) | + (RepositoryBuild.started < presumed_dead_date), RepositoryBuild.logs_archived == False)) diff --git a/data/userfiles.py b/data/userfiles.py index 79fbcb507..c12553493 100644 --- a/data/userfiles.py +++ b/data/userfiles.py @@ -58,9 +58,12 @@ class S3Userfiles(object): encrypt_key=True) return (url, file_id) - def store_file(self, file_like_obj, content_type): + def store_file(self, file_like_obj, content_type, file_id=None): self._initialize_s3() - file_id = str(uuid4()) + + if file_id is None: + file_id = str(uuid4()) + full_key = os.path.join(self._prefix, file_id) k = Key(self._bucket, full_key) logger.debug('Setting s3 content type to: %s' % content_type) @@ -161,8 +164,9 @@ class LocalUserfiles(object): except IOError: break - def store_file(self, file_like_obj, content_type): - file_id = str(uuid4()) + def store_file(self, file_like_obj, content_type, file_id=None): + if file_id is None: + file_id = str(uuid4()) # Rewind the file to match what s3 does file_like_obj.seek(0, os.SEEK_SET) diff --git a/util/streamingjsonencoder.py b/util/streamingjsonencoder.py new file mode 100644 index 000000000..f51a4ec9b --- /dev/null +++ b/util/streamingjsonencoder.py @@ -0,0 +1,267 @@ +# Adapted from https://gist.github.com/akaihola/1415730#file-streamingjson-py + +# Copyright (c) Django Software Foundation and individual contributors. +# All rights reserved. + +# Redistribution and use in source and binary forms, with or without modification, +# are permitted provided that the following conditions are met: + +# 1. Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. + +# 2. Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. + +# 3. Neither the name of Django nor the names of its contributors may be used +# to endorse or promote products derived from this software without +# specific prior written permission. + +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR +# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON +# ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import collections +import json +from json.encoder import encode_basestring, encode_basestring_ascii, FLOAT_REPR, INFINITY +from types import GeneratorType + + +class StreamingJSONEncoder(json.JSONEncoder): + def iterencode(self, o, _one_shot=False): + """Encode the given object and yield each string + representation as available. + + For example:: + + for chunk in StreamingJSONEncoder().iterencode(bigobject): + mysocket.write(chunk) + + This method is a verbatim copy of + :meth:`json.JSONEncoder.iterencode`. It is + needed because we need to call our patched + :func:`streamingjsonencoder._make_iterencode`. + """ + if self.check_circular: + markers = {} + else: + markers = None + if self.ensure_ascii: + _encoder = encode_basestring_ascii + else: + _encoder = encode_basestring + if self.encoding != 'utf-8': + def _encoder(o, _orig_encoder=_encoder, _encoding=self.encoding): + if isinstance(o, str): + o = o.decode(_encoding) + return _orig_encoder(o) + + def floatstr(o, allow_nan=self.allow_nan, _repr=FLOAT_REPR, _inf=INFINITY, _neginf=-INFINITY): + # Check for specials. Note that this type of test is processor- and/or + # platform-specific, so do tests which don't depend on the internals. + + if o != o: + text = 'NaN' + elif o == _inf: + text = 'Infinity' + elif o == _neginf: + text = '-Infinity' + else: + return _repr(o) + + if not allow_nan: + raise ValueError("Out of range float values are not JSON compliant: %r" + % (o,)) + + return text + + _iterencode = _make_iterencode( + markers, self.default, _encoder, self.indent, floatstr, + self.key_separator, self.item_separator, self.sort_keys, + self.skipkeys, _one_shot) + return _iterencode(o, 0) + + +def _make_iterencode(markers, _default, _encoder, _indent, _floatstr, _key_separator, + _item_separator, _sort_keys, _skipkeys, _one_shot, False=False, True=True, + ValueError=ValueError, basestring=basestring, dict=dict, float=float, + GeneratorType=GeneratorType, id=id, int=int, isinstance=isinstance, list=list, + long=long, str=str, tuple=tuple): + """ + This is a patched version of + :func:`django.utils.simplejson.encoder.iterencode`. Whenever it encounters + a generator in the data structure, it encodes it as a JSON list. + """ + def _iterencode_list(lst, _current_indent_level): + if not lst: + # note: empty generators aren't caught here, see below + yield '[]' + return + if markers is not None: + markerid = id(lst) + if markerid in markers: + raise ValueError("Circular reference detected") + markers[markerid] = lst + buf = '[' + if _indent is not None: + _current_indent_level += 1 + newline_indent = '\n' + (' ' * (_indent * _current_indent_level)) + separator = _item_separator + newline_indent + buf += newline_indent + else: + newline_indent = None + separator = _item_separator + first = True + for value in lst: + if first: + first = False + else: + buf = separator + if isinstance(value, basestring): + yield buf + _encoder(value) + elif value is None: + yield buf + 'null' + elif value is True: + yield buf + 'true' + elif value is False: + yield buf + 'false' + elif isinstance(value, (int, long)): + yield buf + str(value) + elif isinstance(value, float): + yield buf + _floatstr(value) + else: + yield buf + if isinstance(value, (list, tuple, GeneratorType)): + chunks = _iterencode_list(value, _current_indent_level) + elif isinstance(value, dict): + chunks = _iterencode_dict(value, _current_indent_level) + else: + chunks = _iterencode(value, _current_indent_level) + for chunk in chunks: + yield chunk + if first: + # we had an empty generator + yield buf + if newline_indent is not None: + _current_indent_level -= 1 + yield '\n' + (' ' * (_indent * _current_indent_level)) + yield ']' + if markers is not None: + del markers[markerid] + + def _iterencode_dict(dct, _current_indent_level): + if not dct: + yield '{}' + return + if markers is not None: + markerid = id(dct) + if markerid in markers: + raise ValueError("Circular reference detected") + markers[markerid] = dct + yield '{' + if _indent is not None: + _current_indent_level += 1 + newline_indent = '\n' + (' ' * (_indent * _current_indent_level)) + item_separator = _item_separator + newline_indent + yield newline_indent + else: + newline_indent = None + item_separator = _item_separator + first = True + if _sort_keys: + items = dct.items() + items.sort(key=lambda kv: kv[0]) + else: + items = dct.iteritems() + for key, value in items: + if isinstance(key, basestring): + pass + # JavaScript is weakly typed for these, so it makes sense to + # also allow them. Many encoders seem to do something like this. + elif isinstance(key, float): + key = _floatstr(key) + elif isinstance(key, (int, long)): + key = str(key) + elif key is True: + key = 'true' + elif key is False: + key = 'false' + elif key is None: + key = 'null' + elif _skipkeys: + continue + else: + raise TypeError("key %r is not a string" % (key,)) + if first: + first = False + else: + yield item_separator + yield _encoder(key) + yield _key_separator + if isinstance(value, basestring): + yield _encoder(value) + elif value is None: + yield 'null' + elif value is True: + yield 'true' + elif value is False: + yield 'false' + elif isinstance(value, (int, long)): + yield str(value) + elif isinstance(value, float): + yield _floatstr(value) + else: + if isinstance(value, collections.Mapping): + chunks = _iterencode_dict(value, _current_indent_level) + elif isinstance(value, collections.Iterable): + chunks = _iterencode_list(value, _current_indent_level) + else: + chunks = _iterencode(value, _current_indent_level) + for chunk in chunks: + yield chunk + if newline_indent is not None: + _current_indent_level -= 1 + yield '\n' + (' ' * (_indent * _current_indent_level)) + yield '}' + if markers is not None: + del markers[markerid] + + def _iterencode(o, _current_indent_level): + if isinstance(o, basestring): + yield _encoder(o) + elif o is None: + yield 'null' + elif o is True: + yield 'true' + elif o is False: + yield 'false' + elif isinstance(o, (int, long)): + yield str(o) + elif isinstance(o, float): + yield _floatstr(o) + elif isinstance(o, collections.Mapping): + for chunk in _iterencode_dict(o, _current_indent_level): + yield chunk + elif isinstance(o, collections.Iterable): + for chunk in _iterencode_list(o, _current_indent_level): + yield chunk + else: + if markers is not None: + markerid = id(o) + if markerid in markers: + raise ValueError("Circular reference detected") + markers[markerid] = o + o = _default(o) + for chunk in _iterencode(o, _current_indent_level): + yield chunk + if markers is not None: + del markers[markerid] + + return _iterencode \ No newline at end of file diff --git a/workers/buildlogsarchiver.py b/workers/buildlogsarchiver.py new file mode 100644 index 000000000..c11131890 --- /dev/null +++ b/workers/buildlogsarchiver.py @@ -0,0 +1,42 @@ +import logging + +from apscheduler.schedulers.blocking import BlockingScheduler +from peewee import fn +from tempfile import SpooledTemporaryFile + +from data import model +from data.database import configure, RepositoryBuild +from app import app, build_logs, log_archive +from util.streamingjsonencoder import StreamingJSONEncoder + +POLL_PERIOD_SECONDS = 30 + +logger = logging.getLogger(__name__) +sched = BlockingScheduler() + +@sched.scheduled_job(trigger='interval', seconds=5) +def archive_redis_buildlogs(): + """ Archive a single build, choosing a candidate at random. This process must be idempotent to + avoid needing two-phase commit. """ + try: + # Get a random build to archive + to_archive = model.archivable_buildlogs_query().order_by(fn.Random()).get() + logger.debug('Archiving: %s', to_archive.uuid) + + length, entries = build_logs.get_log_entries(to_archive.uuid, 0) + to_encode = { + 'start': 0, + 'total': length, + 'logs': entries, + } + + for chunk in StreamingJSONEncoder().iterencode(to_encode): + print chunk + + except RepositoryBuild.DoesNotExist: + logger.debug('No more builds to archive') + + +if __name__ == "__main__": + logging.basicConfig(level=logging.DEBUG) + sched.start() diff --git a/workers/dockerfilebuild.py b/workers/dockerfilebuild.py index b373a00a9..b9dfa99c1 100644 --- a/workers/dockerfilebuild.py +++ b/workers/dockerfilebuild.py @@ -1,6 +1,7 @@ import logging.config -logging.config.fileConfig('conf/logging.conf', disable_existing_loggers=False) +if __name__ == "__main__": + logging.config.fileConfig('conf/logging.conf', disable_existing_loggers=False) import logging import argparse @@ -23,6 +24,7 @@ from collections import defaultdict from requests.exceptions import ConnectionError from data import model +from data.database import BUILD_PHASE from workers.worker import Worker, WorkerUnhealthyException, JobException from app import userfiles as user_files, build_logs, sentry, dockerfile_build_queue from endpoints.notificationhelper import spawn_notification @@ -545,7 +547,7 @@ class DockerfileBuildWorker(Worker): if c_type not in self._mime_processors: log_appender('error', build_logs.PHASE) - repository_build.phase = 'error' + repository_build.phase = BUILD_PHASE.ERROR repository_build.save() message = 'Unknown mime-type: %s' % c_type log_appender(message, build_logs.ERROR) @@ -554,7 +556,7 @@ class DockerfileBuildWorker(Worker): # Try to build the build directory package from the buildpack. log_appender('unpacking', build_logs.PHASE) - repository_build.phase = 'unpacking' + repository_build.phase = BUILD_PHASE.UNPACKING repository_build.save() build_dir = None @@ -572,20 +574,20 @@ class DockerfileBuildWorker(Worker): repository_build.uuid, self._cache_size_gb, pull_credentials) as build_ctxt: log_appender('pulling', build_logs.PHASE) - repository_build.phase = 'pulling' + repository_build.phase = BUILD_PHASE.PULLING repository_build.save() build_ctxt.pull() self.extend_processing(RESERVATION_TIME) log_appender('building', build_logs.PHASE) - repository_build.phase = 'building' + repository_build.phase = BUILD_PHASE.BUILDING repository_build.save() built_image = build_ctxt.build(self.extend_processing) if not built_image: log_appender('error', build_logs.PHASE) - repository_build.phase = 'error' + repository_build.phase = BUILD_PHASE.ERROR repository_build.save() message = 'Unable to build dockerfile.' @@ -598,13 +600,13 @@ class DockerfileBuildWorker(Worker): self.extend_processing(RESERVATION_TIME) log_appender('pushing', build_logs.PHASE) - repository_build.phase = 'pushing' + repository_build.phase = BUILD_PHASE.PUSHING repository_build.save() build_ctxt.push(built_image) log_appender('complete', build_logs.PHASE) - repository_build.phase = 'complete' + repository_build.phase = BUILD_PHASE.COMPLETE repository_build.save() # Spawn a notification that the build has completed. @@ -641,20 +643,20 @@ class DockerfileBuildWorker(Worker): sentry.client.captureException() log_appender('error', build_logs.PHASE) logger.exception('Exception when processing request.') - repository_build.phase = 'error' + repository_build.phase = BUILD_PHASE.ERROR repository_build.save() log_appender(str(exc), build_logs.ERROR) # Raise the exception to the queue. raise JobException(str(exc)) +if __name__ == "__main__": + desc = 'Worker daemon to monitor dockerfile build' + parser = argparse.ArgumentParser(description=desc) + parser.add_argument('--cachegb', default=20, type=float, + help='Maximum cache size in gigabytes.') + args = parser.parse_args() -desc = 'Worker daemon to monitor dockerfile build' -parser = argparse.ArgumentParser(description=desc) -parser.add_argument('--cachegb', default=20, type=float, - help='Maximum cache size in gigabytes.') -args = parser.parse_args() - -worker = DockerfileBuildWorker(args.cachegb, dockerfile_build_queue, - reservation_seconds=RESERVATION_TIME) -worker.start(start_status_server_port=8000) + worker = DockerfileBuildWorker(args.cachegb, dockerfile_build_queue, + reservation_seconds=RESERVATION_TIME) + worker.start(start_status_server_port=8000)