Archived logs commit 1. Squash me.

This commit is contained in:
Jake Moshenko 2014-09-08 16:43:17 -04:00
parent 54fbb2a4c0
commit 451e034ca1
9 changed files with 402 additions and 22 deletions

2
app.py
View file

@ -20,6 +20,7 @@ from util.exceptionlog import Sentry
from util.queuemetrics import QueueMetrics
from data.billing import Billing
from data.buildlogs import BuildLogs
from data.archivedlogs import LogArchive
from data.queue import WorkQueue
from data.userevent import UserEventsBuilderModule
from datetime import datetime
@ -89,6 +90,7 @@ login_manager = LoginManager(app)
mail = Mail(app)
storage = Storage(app)
userfiles = Userfiles(app)
log_archive = LogArchive(app)
analytics = Analytics(app)
billing = Billing(app)
sentry = Sentry(app)

View file

@ -83,6 +83,10 @@ class DefaultConfig(object):
BUILDLOGS_REDIS_HOSTNAME = 'logs.quay.io'
BUILDLOGS_OPTIONS = []
# Build logs archive
LOG_ARCHIVE_TYPE = 'LocalArchivedLogs'
LOG_ARCHIVE_PATH = 'test/data/registry/logarchive'
# Real-time user events
USER_EVENTS_REDIS_HOSTNAME = 'logs.quay.io'

39
data/archivedlogs.py Normal file
View file

@ -0,0 +1,39 @@
from data.userfiles import LocalUserfiles, UserfilesHandlers, S3Userfiles, FakeUserfiles
class LogArchive(object):
def __init__(self, app=None):
self.app = app
if app is not None:
self.state = self.init_app(app)
else:
self.state = None
def init_app(self, app):
storage_type = app.config.get('LOG_ARCHIVE_TYPE', 'LocalArchivedLogs')
path = app.config.get('LOG_ARCHIVE_PATH', '')
if storage_type == 'LocalArchivedLogs':
archive = LocalUserfiles(app, path)
app.add_url_rule('/archivedlogs/<file_id>',
view_func=UserfilesHandlers.as_view('log_archive_handlers',
local_userfiles=archive))
elif storage_type == 'S3ArchivedLogs':
access_key = app.config.get('LOG_ARCHIVE_AWS_ACCESS_KEY', '')
secret_key = app.config.get('LOG_ARCHIVE_AWS_SECRET_KEY', '')
bucket = app.config.get('LOG_ARCHIVE_S3_BUCKET', '')
archive = S3Userfiles(path, access_key, secret_key, bucket)
elif storage_type == 'FakeArchivedLogs':
archive = FakeUserfiles()
else:
raise RuntimeError('Unknown log archive type: %s' % storage_type)
# register extension with app
app.extensions = getattr(app, 'extensions', {})
app.extensions['log_archive'] = archive
return archive
def __getattr__(self, name):
return getattr(self.state, name, None)

View file

@ -289,6 +289,16 @@ class RepositoryTag(BaseModel):
)
class BUILD_PHASE(object):
""" Build phases enum """
ERROR = 'error'
UNPACKING = 'unpacking'
PULLING = 'pulling'
BUILDING = 'building'
PUSHING = 'pushing'
COMPLETE = 'complete'
class RepositoryBuild(BaseModel):
uuid = CharField(default=uuid_generator, index=True)
repository = ForeignKeyField(Repository, index=True)
@ -300,6 +310,7 @@ class RepositoryBuild(BaseModel):
display_name = CharField()
trigger = ForeignKeyField(RepositoryBuildTrigger, null=True, index=True)
pull_robot = ForeignKeyField(User, null=True, related_name='buildpullrobot')
logs_archived = BooleanField(default=False)
class QueueItem(BaseModel):

View file

@ -12,6 +12,7 @@ from util.backoff import exponential_backoff
EXPONENTIAL_BACKOFF_SCALE = timedelta(seconds=1)
PRESUMED_DEAD_BUILD_AGE = timedelta(days=15)
logger = logging.getLogger(__name__)
@ -1877,3 +1878,11 @@ def confirm_email_authorization_for_repo(code):
found.save()
return found
def archivable_buildlogs_query():
presumed_dead_date = datetime.utcnow() - PRESUMED_DEAD_BUILD_AGE
return (RepositoryBuild.select()
.where((RepositoryBuild.phase == BUILD_PHASE.COMPLETE) |
(RepositoryBuild.phase == BUILD_PHASE.ERROR) |
(RepositoryBuild.started < presumed_dead_date), RepositoryBuild.logs_archived == False))

View file

@ -58,9 +58,12 @@ class S3Userfiles(object):
encrypt_key=True)
return (url, file_id)
def store_file(self, file_like_obj, content_type):
def store_file(self, file_like_obj, content_type, file_id=None):
self._initialize_s3()
file_id = str(uuid4())
if file_id is None:
file_id = str(uuid4())
full_key = os.path.join(self._prefix, file_id)
k = Key(self._bucket, full_key)
logger.debug('Setting s3 content type to: %s' % content_type)
@ -161,8 +164,9 @@ class LocalUserfiles(object):
except IOError:
break
def store_file(self, file_like_obj, content_type):
file_id = str(uuid4())
def store_file(self, file_like_obj, content_type, file_id=None):
if file_id is None:
file_id = str(uuid4())
# Rewind the file to match what s3 does
file_like_obj.seek(0, os.SEEK_SET)

View file

@ -0,0 +1,267 @@
# Adapted from https://gist.github.com/akaihola/1415730#file-streamingjson-py
# Copyright (c) Django Software Foundation and individual contributors.
# All rights reserved.
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# 3. Neither the name of Django nor the names of its contributors may be used
# to endorse or promote products derived from this software without
# specific prior written permission.
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
# ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import collections
import json
from json.encoder import encode_basestring, encode_basestring_ascii, FLOAT_REPR, INFINITY
from types import GeneratorType
class StreamingJSONEncoder(json.JSONEncoder):
def iterencode(self, o, _one_shot=False):
"""Encode the given object and yield each string
representation as available.
For example::
for chunk in StreamingJSONEncoder().iterencode(bigobject):
mysocket.write(chunk)
This method is a verbatim copy of
:meth:`json.JSONEncoder.iterencode`. It is
needed because we need to call our patched
:func:`streamingjsonencoder._make_iterencode`.
"""
if self.check_circular:
markers = {}
else:
markers = None
if self.ensure_ascii:
_encoder = encode_basestring_ascii
else:
_encoder = encode_basestring
if self.encoding != 'utf-8':
def _encoder(o, _orig_encoder=_encoder, _encoding=self.encoding):
if isinstance(o, str):
o = o.decode(_encoding)
return _orig_encoder(o)
def floatstr(o, allow_nan=self.allow_nan, _repr=FLOAT_REPR, _inf=INFINITY, _neginf=-INFINITY):
# Check for specials. Note that this type of test is processor- and/or
# platform-specific, so do tests which don't depend on the internals.
if o != o:
text = 'NaN'
elif o == _inf:
text = 'Infinity'
elif o == _neginf:
text = '-Infinity'
else:
return _repr(o)
if not allow_nan:
raise ValueError("Out of range float values are not JSON compliant: %r"
% (o,))
return text
_iterencode = _make_iterencode(
markers, self.default, _encoder, self.indent, floatstr,
self.key_separator, self.item_separator, self.sort_keys,
self.skipkeys, _one_shot)
return _iterencode(o, 0)
def _make_iterencode(markers, _default, _encoder, _indent, _floatstr, _key_separator,
_item_separator, _sort_keys, _skipkeys, _one_shot, False=False, True=True,
ValueError=ValueError, basestring=basestring, dict=dict, float=float,
GeneratorType=GeneratorType, id=id, int=int, isinstance=isinstance, list=list,
long=long, str=str, tuple=tuple):
"""
This is a patched version of
:func:`django.utils.simplejson.encoder.iterencode`. Whenever it encounters
a generator in the data structure, it encodes it as a JSON list.
"""
def _iterencode_list(lst, _current_indent_level):
if not lst:
# note: empty generators aren't caught here, see below
yield '[]'
return
if markers is not None:
markerid = id(lst)
if markerid in markers:
raise ValueError("Circular reference detected")
markers[markerid] = lst
buf = '['
if _indent is not None:
_current_indent_level += 1
newline_indent = '\n' + (' ' * (_indent * _current_indent_level))
separator = _item_separator + newline_indent
buf += newline_indent
else:
newline_indent = None
separator = _item_separator
first = True
for value in lst:
if first:
first = False
else:
buf = separator
if isinstance(value, basestring):
yield buf + _encoder(value)
elif value is None:
yield buf + 'null'
elif value is True:
yield buf + 'true'
elif value is False:
yield buf + 'false'
elif isinstance(value, (int, long)):
yield buf + str(value)
elif isinstance(value, float):
yield buf + _floatstr(value)
else:
yield buf
if isinstance(value, (list, tuple, GeneratorType)):
chunks = _iterencode_list(value, _current_indent_level)
elif isinstance(value, dict):
chunks = _iterencode_dict(value, _current_indent_level)
else:
chunks = _iterencode(value, _current_indent_level)
for chunk in chunks:
yield chunk
if first:
# we had an empty generator
yield buf
if newline_indent is not None:
_current_indent_level -= 1
yield '\n' + (' ' * (_indent * _current_indent_level))
yield ']'
if markers is not None:
del markers[markerid]
def _iterencode_dict(dct, _current_indent_level):
if not dct:
yield '{}'
return
if markers is not None:
markerid = id(dct)
if markerid in markers:
raise ValueError("Circular reference detected")
markers[markerid] = dct
yield '{'
if _indent is not None:
_current_indent_level += 1
newline_indent = '\n' + (' ' * (_indent * _current_indent_level))
item_separator = _item_separator + newline_indent
yield newline_indent
else:
newline_indent = None
item_separator = _item_separator
first = True
if _sort_keys:
items = dct.items()
items.sort(key=lambda kv: kv[0])
else:
items = dct.iteritems()
for key, value in items:
if isinstance(key, basestring):
pass
# JavaScript is weakly typed for these, so it makes sense to
# also allow them. Many encoders seem to do something like this.
elif isinstance(key, float):
key = _floatstr(key)
elif isinstance(key, (int, long)):
key = str(key)
elif key is True:
key = 'true'
elif key is False:
key = 'false'
elif key is None:
key = 'null'
elif _skipkeys:
continue
else:
raise TypeError("key %r is not a string" % (key,))
if first:
first = False
else:
yield item_separator
yield _encoder(key)
yield _key_separator
if isinstance(value, basestring):
yield _encoder(value)
elif value is None:
yield 'null'
elif value is True:
yield 'true'
elif value is False:
yield 'false'
elif isinstance(value, (int, long)):
yield str(value)
elif isinstance(value, float):
yield _floatstr(value)
else:
if isinstance(value, collections.Mapping):
chunks = _iterencode_dict(value, _current_indent_level)
elif isinstance(value, collections.Iterable):
chunks = _iterencode_list(value, _current_indent_level)
else:
chunks = _iterencode(value, _current_indent_level)
for chunk in chunks:
yield chunk
if newline_indent is not None:
_current_indent_level -= 1
yield '\n' + (' ' * (_indent * _current_indent_level))
yield '}'
if markers is not None:
del markers[markerid]
def _iterencode(o, _current_indent_level):
if isinstance(o, basestring):
yield _encoder(o)
elif o is None:
yield 'null'
elif o is True:
yield 'true'
elif o is False:
yield 'false'
elif isinstance(o, (int, long)):
yield str(o)
elif isinstance(o, float):
yield _floatstr(o)
elif isinstance(o, collections.Mapping):
for chunk in _iterencode_dict(o, _current_indent_level):
yield chunk
elif isinstance(o, collections.Iterable):
for chunk in _iterencode_list(o, _current_indent_level):
yield chunk
else:
if markers is not None:
markerid = id(o)
if markerid in markers:
raise ValueError("Circular reference detected")
markers[markerid] = o
o = _default(o)
for chunk in _iterencode(o, _current_indent_level):
yield chunk
if markers is not None:
del markers[markerid]
return _iterencode

View file

@ -0,0 +1,42 @@
import logging
from apscheduler.schedulers.blocking import BlockingScheduler
from peewee import fn
from tempfile import SpooledTemporaryFile
from data import model
from data.database import configure, RepositoryBuild
from app import app, build_logs, log_archive
from util.streamingjsonencoder import StreamingJSONEncoder
POLL_PERIOD_SECONDS = 30
logger = logging.getLogger(__name__)
sched = BlockingScheduler()
@sched.scheduled_job(trigger='interval', seconds=5)
def archive_redis_buildlogs():
""" Archive a single build, choosing a candidate at random. This process must be idempotent to
avoid needing two-phase commit. """
try:
# Get a random build to archive
to_archive = model.archivable_buildlogs_query().order_by(fn.Random()).get()
logger.debug('Archiving: %s', to_archive.uuid)
length, entries = build_logs.get_log_entries(to_archive.uuid, 0)
to_encode = {
'start': 0,
'total': length,
'logs': entries,
}
for chunk in StreamingJSONEncoder().iterencode(to_encode):
print chunk
except RepositoryBuild.DoesNotExist:
logger.debug('No more builds to archive')
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
sched.start()

View file

@ -1,6 +1,7 @@
import logging.config
logging.config.fileConfig('conf/logging.conf', disable_existing_loggers=False)
if __name__ == "__main__":
logging.config.fileConfig('conf/logging.conf', disable_existing_loggers=False)
import logging
import argparse
@ -23,6 +24,7 @@ from collections import defaultdict
from requests.exceptions import ConnectionError
from data import model
from data.database import BUILD_PHASE
from workers.worker import Worker, WorkerUnhealthyException, JobException
from app import userfiles as user_files, build_logs, sentry, dockerfile_build_queue
from endpoints.notificationhelper import spawn_notification
@ -545,7 +547,7 @@ class DockerfileBuildWorker(Worker):
if c_type not in self._mime_processors:
log_appender('error', build_logs.PHASE)
repository_build.phase = 'error'
repository_build.phase = BUILD_PHASE.ERROR
repository_build.save()
message = 'Unknown mime-type: %s' % c_type
log_appender(message, build_logs.ERROR)
@ -554,7 +556,7 @@ class DockerfileBuildWorker(Worker):
# Try to build the build directory package from the buildpack.
log_appender('unpacking', build_logs.PHASE)
repository_build.phase = 'unpacking'
repository_build.phase = BUILD_PHASE.UNPACKING
repository_build.save()
build_dir = None
@ -572,20 +574,20 @@ class DockerfileBuildWorker(Worker):
repository_build.uuid, self._cache_size_gb,
pull_credentials) as build_ctxt:
log_appender('pulling', build_logs.PHASE)
repository_build.phase = 'pulling'
repository_build.phase = BUILD_PHASE.PULLING
repository_build.save()
build_ctxt.pull()
self.extend_processing(RESERVATION_TIME)
log_appender('building', build_logs.PHASE)
repository_build.phase = 'building'
repository_build.phase = BUILD_PHASE.BUILDING
repository_build.save()
built_image = build_ctxt.build(self.extend_processing)
if not built_image:
log_appender('error', build_logs.PHASE)
repository_build.phase = 'error'
repository_build.phase = BUILD_PHASE.ERROR
repository_build.save()
message = 'Unable to build dockerfile.'
@ -598,13 +600,13 @@ class DockerfileBuildWorker(Worker):
self.extend_processing(RESERVATION_TIME)
log_appender('pushing', build_logs.PHASE)
repository_build.phase = 'pushing'
repository_build.phase = BUILD_PHASE.PUSHING
repository_build.save()
build_ctxt.push(built_image)
log_appender('complete', build_logs.PHASE)
repository_build.phase = 'complete'
repository_build.phase = BUILD_PHASE.COMPLETE
repository_build.save()
# Spawn a notification that the build has completed.
@ -641,20 +643,20 @@ class DockerfileBuildWorker(Worker):
sentry.client.captureException()
log_appender('error', build_logs.PHASE)
logger.exception('Exception when processing request.')
repository_build.phase = 'error'
repository_build.phase = BUILD_PHASE.ERROR
repository_build.save()
log_appender(str(exc), build_logs.ERROR)
# Raise the exception to the queue.
raise JobException(str(exc))
if __name__ == "__main__":
desc = 'Worker daemon to monitor dockerfile build'
parser = argparse.ArgumentParser(description=desc)
parser.add_argument('--cachegb', default=20, type=float,
help='Maximum cache size in gigabytes.')
args = parser.parse_args()
desc = 'Worker daemon to monitor dockerfile build'
parser = argparse.ArgumentParser(description=desc)
parser.add_argument('--cachegb', default=20, type=float,
help='Maximum cache size in gigabytes.')
args = parser.parse_args()
worker = DockerfileBuildWorker(args.cachegb, dockerfile_build_queue,
reservation_seconds=RESERVATION_TIME)
worker.start(start_status_server_port=8000)
worker = DockerfileBuildWorker(args.cachegb, dockerfile_build_queue,
reservation_seconds=RESERVATION_TIME)
worker.start(start_status_server_port=8000)