This commit is contained in:
Joseph Schorr 2014-09-12 14:01:17 -04:00
commit 69c367514c
29 changed files with 712 additions and 99 deletions

View file

@ -38,6 +38,7 @@ ADD conf/init/gunicorn /etc/service/gunicorn
ADD conf/init/nginx /etc/service/nginx
ADD conf/init/diffsworker /etc/service/diffsworker
ADD conf/init/notificationworker /etc/service/notificationworker
ADD conf/init/buildlogsarchiver /etc/service/buildlogsarchiver
# Download any external libs.
RUN mkdir static/fonts static/ldn

2
app.py
View file

@ -20,6 +20,7 @@ from util.exceptionlog import Sentry
from util.queuemetrics import QueueMetrics
from data.billing import Billing
from data.buildlogs import BuildLogs
from data.archivedlogs import LogArchive
from data.queue import WorkQueue
from data.userevent import UserEventsBuilderModule
from datetime import datetime
@ -89,6 +90,7 @@ login_manager = LoginManager(app)
mail = Mail(app)
storage = Storage(app)
userfiles = Userfiles(app, storage)
log_archive = LogArchive(app, storage)
analytics = Analytics(app)
billing = Billing(app)
sentry = Sentry(app)

View file

@ -0,0 +1,2 @@
#!/bin/sh
exec svlogd /var/log/buildlogsarchiver/

View file

@ -0,0 +1,8 @@
#! /bin/bash
echo 'Starting build logs archiver worker'
cd /
venv/bin/python -m workers.buildlogsarchiver
echo 'Diffs worker exited'

View file

@ -172,3 +172,7 @@ class DefaultConfig(object):
# Userfiles
USERFILES_LOCATION = 'local_us'
USERFILES_PATH = 'userfiles/'
# Build logs archive
LOG_ARCHIVE_LOCATION = 'local_us'
LOG_ARCHIVE_PATH = 'logarchive/'

56
data/archivedlogs.py Normal file
View file

@ -0,0 +1,56 @@
import logging
from gzip import GzipFile
from flask import send_file, abort
from cStringIO import StringIO
from data.userfiles import DelegateUserfiles, UserfilesHandlers
JSON_MIMETYPE = 'application/json'
logger = logging.getLogger(__name__)
class LogArchiveHandlers(UserfilesHandlers):
def get(self, file_id):
path = self._files.get_file_id_path(file_id)
try:
with self._storage.stream_read_file(self._locations, path) as gzip_stream:
with GzipFile(fileobj=gzip_stream) as unzipped:
unzipped_buffer = StringIO(unzipped.read())
return send_file(unzipped_buffer, mimetype=JSON_MIMETYPE)
except IOError:
abort(404)
class LogArchive(object):
def __init__(self, app=None, distributed_storage=None):
self.app = app
if app is not None:
self.state = self.init_app(app, distributed_storage)
else:
self.state = None
def init_app(self, app, distributed_storage):
location = app.config.get('LOG_ARCHIVE_LOCATION')
path = app.config.get('LOG_ARCHIVE_PATH', None)
handler_name = 'logarchive_handlers'
log_archive = DelegateUserfiles(app, distributed_storage, location, path, handler_name)
app.add_url_rule('/logarchive/<file_id>',
view_func=LogArchiveHandlers.as_view(handler_name,
distributed_storage=distributed_storage,
location=location,
files=log_archive))
# register extension with app
app.extensions = getattr(app, 'extensions', {})
app.extensions['log_archive'] = log_archive
return log_archive
def __getattr__(self, name):
return getattr(self.state, name, None)

View file

@ -3,7 +3,7 @@ import stripe
from datetime import datetime, timedelta
from calendar import timegm
from util.collections import AttrDict
from util.morecollections import AttrDict
PLANS = [
# Deprecated Plans

View file

@ -2,6 +2,11 @@ import redis
import json
from util.dynamic import import_class
from datetime import timedelta
ONE_DAY = timedelta(days=1)
class BuildStatusRetrievalError(Exception):
pass
@ -25,7 +30,7 @@ class RedisBuildLogs(object):
"""
return self._redis.rpush(self._logs_key(build_id), json.dumps(log_obj))
def append_log_message(self, build_id, log_message, log_type=None):
def append_log_message(self, build_id, log_message, log_type=None, log_data=None):
"""
Wraps the message in an envelope and push it to the end of the log entry
list and returns the index at which it was inserted.
@ -37,6 +42,9 @@ class RedisBuildLogs(object):
if log_type:
log_obj['type'] = log_type
if log_data:
log_obj['data'] = log_data
return self._redis.rpush(self._logs_key(build_id), json.dumps(log_obj)) - 1
def get_log_entries(self, build_id, start_index):
@ -51,6 +59,13 @@ class RedisBuildLogs(object):
except redis.ConnectionError:
raise BuildStatusRetrievalError('Cannot retrieve build logs')
def expire_log_entries(self, build_id):
"""
Sets the log entry to expire in 1 day.
"""
self._redis.expire(self._logs_key(build_id), ONE_DAY)
@staticmethod
def _status_key(build_id):
return 'builds/%s/status' % build_id

View file

@ -289,6 +289,16 @@ class RepositoryTag(BaseModel):
)
class BUILD_PHASE(object):
""" Build phases enum """
ERROR = 'error'
UNPACKING = 'unpacking'
PULLING = 'pulling'
BUILDING = 'building'
PUSHING = 'pushing'
COMPLETE = 'complete'
class RepositoryBuild(BaseModel):
uuid = CharField(default=uuid_generator, index=True)
repository = ForeignKeyField(Repository, index=True)
@ -300,6 +310,7 @@ class RepositoryBuild(BaseModel):
display_name = CharField()
trigger = ForeignKeyField(RepositoryBuildTrigger, null=True, index=True)
pull_robot = ForeignKeyField(User, null=True, related_name='buildpullrobot')
logs_archived = BooleanField(default=False)
class QueueItem(BaseModel):

View file

@ -8,7 +8,7 @@ from peewee import SqliteDatabase
from data.database import all_models, db
from app import app
from data.model.sqlalchemybridge import gen_sqlalchemy_metadata
from util.collections import AttrDict
from util.morecollections import AttrDict
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.

View file

@ -0,0 +1,26 @@
"""Add support for build log migration.
Revision ID: 34fd69f63809
Revises: 4a0c94399f38
Create Date: 2014-09-12 11:50:09.217777
"""
# revision identifiers, used by Alembic.
revision = '34fd69f63809'
down_revision = '4a0c94399f38'
from alembic import op
import sqlalchemy as sa
def upgrade(tables):
### commands auto generated by Alembic - please adjust! ###
op.add_column('repositorybuild', sa.Column('logs_archived', sa.Boolean(), nullable=False, server_default=sa.sql.expression.false()))
### end Alembic commands ###
def downgrade(tables):
### commands auto generated by Alembic - please adjust! ###
op.drop_column('repositorybuild', 'logs_archived')
### end Alembic commands ###

View file

@ -12,6 +12,7 @@ from util.backoff import exponential_backoff
EXPONENTIAL_BACKOFF_SCALE = timedelta(seconds=1)
PRESUMED_DEAD_BUILD_AGE = timedelta(days=15)
logger = logging.getLogger(__name__)
@ -1877,3 +1878,11 @@ def confirm_email_authorization_for_repo(code):
found.save()
return found
def archivable_buildlogs_query():
presumed_dead_date = datetime.utcnow() - PRESUMED_DEAD_BUILD_AGE
return (RepositoryBuild.select()
.where((RepositoryBuild.phase == BUILD_PHASE.COMPLETE) |
(RepositoryBuild.phase == BUILD_PHASE.ERROR) |
(RepositoryBuild.started < presumed_dead_date), RepositoryBuild.logs_archived == False))

View file

@ -81,10 +81,13 @@ class DelegateUserfiles(object):
return (url, file_id)
def store_file(self, file_like_obj, content_type):
def store_file(self, file_like_obj, content_type, content_encoding=None, file_id=None):
if file_id is None:
file_id = str(uuid4())
path = self.get_file_id_path(file_id)
self._storage.stream_write(self._locations, path, file_like_obj, content_type)
self._storage.stream_write(self._locations, path, file_like_obj, content_type,
content_encoding)
return file_id
def get_file_url(self, file_id, expires_in=300, requires_cors=False):

View file

@ -1,9 +1,9 @@
import logging
import json
from flask import request
from flask import request, redirect
from app import app, userfiles as user_files, build_logs
from app import app, userfiles as user_files, build_logs, log_archive
from endpoints.api import (RepositoryParamResource, parse_args, query_param, nickname, resource,
require_repo_read, require_repo_write, validate_json_request,
ApiResource, internal_only, format_date, api, Unauthorized, NotFound)
@ -215,6 +215,10 @@ class RepositoryBuildLogs(RepositoryParamResource):
build = model.get_repository_build(namespace, repository, build_uuid)
# If the logs have been archived, just redirect to the completed archive
if build.logs_archived:
return redirect(log_archive.get_file_url(build.uuid))
start = int(request.args.get('start', 0))
try:

View file

@ -291,6 +291,9 @@ class GithubBuildTrigger(BuildTrigger):
with tarfile.open(fileobj=tarball) as archive:
tarball_subdir = archive.getnames()[0]
# Seek to position 0 to make boto multipart happy
tarball.seek(0)
dockerfile_id = user_files.store_file(tarball, TARBALL_MIME)
logger.debug('Successfully prepared job')

View file

@ -2558,7 +2558,7 @@ p.editable:hover i {
margin-top: 10px;
}
.repo-build .build-log-error-element {
.repo-build .build-log-error-element .error-message-container {
position: relative;
display: inline-block;
margin: 10px;
@ -2568,7 +2568,7 @@ p.editable:hover i {
margin-left: 22px;
}
.repo-build .build-log-error-element i.fa {
.repo-build .build-log-error-element .error-message-container i.fa {
color: red;
position: absolute;
top: 13px;

View file

@ -1,4 +1,23 @@
<span bindonce class="build-log-error-element">
<div bindonce class="build-log-error-element">
<span class="error-message-container">
<i class="fa fa-exclamation-triangle"></i>
<span class="error-message" bo-text="error.message"></span>
</span>
<span ng-if="error.message == 'HTTP code: 403' && getLocalPullInfo().isLocal">
caused by attempting to pull private repository <a href="/repository/{{ getLocalPullInfo().repo }}">{{ getLocalPullInfo().repo }}</a>
<span ng-if="getLocalPullInfo().login">with inaccessible crdentials</span>
<span ng-if="!getLocalPullInfo().login">without credentials</span>
</span>
</span>
<div class="alert alert-danger" ng-if="error.message == 'HTTP code: 403' && getLocalPullInfo().isLocal">
<div ng-if="getLocalPullInfo().login">
Note: The credentials <b>{{ getLocalPullInfo().login.username }}</b> for registry <b>{{ getLocalPullInfo().login.registry }}</b> cannot
access repository <a href="/repository/{{ getLocalPullInfo().repo }}">{{ getLocalPullInfo().repo }}</a>.
</div>
<div ng-if="!getLocalPullInfo().login">
Note: No robot account is specified for this build. Without such credentials, this pull will always fail. Please setup a new
build trigger with a robot account that has access to <a href="/repository/{{ getLocalPullInfo().repo }}">{{ getLocalPullInfo().repo }}</a> or make that repository public.
</div>
</div>
</div>

View file

@ -153,6 +153,14 @@ quayApp = angular.module('quay', quayDependencies, function($provide, cfpLoading
this.currentIndex_ = 0;
}
_ViewArray.prototype.length = function() {
return this.entries.length;
};
_ViewArray.prototype.get = function(index) {
return this.entries[index];
};
_ViewArray.prototype.push = function(elem) {
this.entries.push(elem);
this.hasEntries = true;
@ -215,6 +223,78 @@ quayApp = angular.module('quay', quayDependencies, function($provide, cfpLoading
return service;
}]);
/**
* Specialized class for conducting an HTTP poll, while properly preventing multiple calls.
*/
$provide.factory('AngularPollChannel', ['ApiService', '$timeout', function(ApiService, $timeout) {
var _PollChannel = function(scope, requester, opt_sleeptime) {
this.scope_ = scope;
this.requester_ = requester;
this.sleeptime_ = opt_sleeptime || (60 * 1000 /* 60s */);
this.timer_ = null;
this.working = false;
this.polling = false;
var that = this;
scope.$on('$destroy', function() {
that.stop();
});
};
_PollChannel.prototype.stop = function() {
if (this.timer_) {
$timeout.cancel(this.timer_);
this.timer_ = null;
this.polling_ = false;
}
this.working = false;
};
_PollChannel.prototype.start = function() {
// Make sure we invoke call outside the normal digest cycle, since
// we'll call $scope.$apply ourselves.
var that = this;
setTimeout(function() { that.call_(); }, 0);
};
_PollChannel.prototype.call_ = function() {
if (this.working) { return; }
var that = this;
this.working = true;
this.scope_.$apply(function() {
that.requester_(function(status) {
if (status) {
that.working = false;
that.setupTimer_();
} else {
that.stop();
}
});
});
};
_PollChannel.prototype.setupTimer_ = function() {
if (this.timer_) { return; }
var that = this;
this.polling = true;
this.timer_ = $timeout(function() {
that.timer_ = null;
that.call_();
}, this.sleeptime_)
};
var service = {
'create': function(scope, requester, opt_sleeptime) {
return new _PollChannel(scope, requester, opt_sleeptime);
}
};
return service;
}]);
$provide.factory('DataFileService', [function() {
var dataFileService = {};
@ -4368,9 +4448,48 @@ quayApp.directive('buildLogError', function () {
transclude: false,
restrict: 'C',
scope: {
'error': '=error'
'error': '=error',
'entries': '=entries'
},
controller: function($scope, $element) {
controller: function($scope, $element, Config) {
$scope.getLocalPullInfo = function() {
if ($scope.entries.__localpull !== undefined) {
return $scope.entries.__localpull;
}
var localInfo = {
'isLocal': false
};
// Find the 'pulling' phase entry, and then extra any metadata found under
// it.
for (var i = 0; i < $scope.entries.length; ++i) {
var entry = $scope.entries[i];
if (entry.type == 'phase' && entry.message == 'pulling') {
for (var j = 0; j < entry.logs.length(); ++j) {
var log = entry.logs.get(j);
if (log.data && log.data.phasestep == 'login') {
localInfo['login'] = log.data;
}
if (log.data && log.data.phasestep == 'pull') {
var repo_url = log.data['repo_url'];
var repo_and_tag = repo_url.substring(Config.SERVER_HOSTNAME.length + 1);
var tagIndex = repo_and_tag.lastIndexOf(':');
var repo = repo_and_tag.substring(0, tagIndex);
localInfo['repo_url'] = repo_url;
localInfo['repo'] = repo;
localInfo['isLocal'] = repo_url.indexOf(Config.SERVER_HOSTNAME + '/') == 0;
}
}
break;
}
}
return $scope.entries.__localpull = localInfo;
};
}
};
return directiveDefinitionObject;

View file

@ -978,14 +978,9 @@ function BuildPackageCtrl($scope, Restangular, ApiService, DataFileService, $rou
}
function RepoBuildCtrl($scope, Restangular, ApiService, $routeParams, $rootScope, $location, $interval, $sanitize,
ansi2html, AngularViewArray) {
ansi2html, AngularViewArray, AngularPollChannel) {
var namespace = $routeParams.namespace;
var name = $routeParams.name;
var pollTimerHandle = null;
$scope.$on('$destroy', function() {
stopPollTimer();
});
// Watch for changes to the current parameter.
$scope.$on('$routeUpdate', function(){
@ -995,8 +990,7 @@ function RepoBuildCtrl($scope, Restangular, ApiService, $routeParams, $rootScope
});
$scope.builds = null;
$scope.polling = false;
$scope.pollChannel = null;
$scope.buildDialogShowCounter = 0;
$scope.showNewBuildDialog = function() {
@ -1081,8 +1075,6 @@ function RepoBuildCtrl($scope, Restangular, ApiService, $routeParams, $rootScope
$scope.setCurrentBuildInternal = function(index, build, opt_updateURL) {
if (build == $scope.currentBuild) { return; }
stopPollTimer();
$scope.logEntries = null;
$scope.logStartIndex = null;
$scope.currentParentEntry = null;
@ -1103,47 +1095,35 @@ function RepoBuildCtrl($scope, Restangular, ApiService, $routeParams, $rootScope
$scope.adjustLogHeight();
}, 1);
// Load the first set of logs.
getBuildStatusAndLogs();
// If the build is currently processing, start the build timer.
checkPollTimer();
};
var checkPollTimer = function() {
var build = $scope.currentBuild;
if (!build) {
stopPollTimer();
return;
// Stop any existing polling.
if ($scope.pollChannel) {
$scope.pollChannel.stop();
}
if (build['phase'] != 'complete' && build['phase'] != 'error') {
startPollTimer();
return true;
} else {
stopPollTimer();
return false;
}
// Create a new channel for polling the build status and logs.
var conductStatusAndLogRequest = function(callback) {
getBuildStatusAndLogs(build, callback);
};
var stopPollTimer = function() {
$interval.cancel(pollTimerHandle);
$scope.pollChannel = AngularPollChannel.create($scope, conductStatusAndLogRequest, 5 * 1000 /* 5s */);
$scope.pollChannel.start();
};
var startPollTimer = function() {
stopPollTimer();
pollTimerHandle = $interval(getBuildStatusAndLogs, 2000);
};
var processLogs = function(logs, startIndex) {
var processLogs = function(logs, startIndex, endIndex) {
if (!$scope.logEntries) { $scope.logEntries = []; }
// If the start index given is less than that requested, then we've received a larger
// pool of logs, and we need to only consider the new ones.
if (startIndex < $scope.logStartIndex) {
logs = logs.slice($scope.logStartIndex - startIndex);
}
for (var i = 0; i < logs.length; ++i) {
var entry = logs[i];
var type = entry['type'] || 'entry';
if (type == 'command' || type == 'phase' || type == 'error') {
entry['logs'] = AngularViewArray.create();
entry['index'] = startIndex + i;
entry['index'] = $scope.logStartIndex + i;
$scope.logEntries.push(entry);
$scope.currentParentEntry = entry;
@ -1151,18 +1131,19 @@ function RepoBuildCtrl($scope, Restangular, ApiService, $routeParams, $rootScope
$scope.currentParentEntry['logs'].push(entry);
}
}
return endIndex;
};
var getBuildStatusAndLogs = function() {
if (!$scope.currentBuild || $scope.polling) { return; }
$scope.polling = true;
var getBuildStatusAndLogs = function(build, callback) {
var params = {
'repository': namespace + '/' + name,
'build_uuid': $scope.currentBuild.id
'build_uuid': build.id
};
ApiService.getRepoBuildStatus(null, params, true).then(function(resp) {
if (build != $scope.currentBuild) { callback(false); return; }
// Note: We use extend here rather than replacing as Angular is depending on the
// root build object to remain the same object.
var matchingBuilds = $.grep($scope.builds, function(elem) {
@ -1177,22 +1158,16 @@ function RepoBuildCtrl($scope, Restangular, ApiService, $routeParams, $rootScope
$scope.builds.push(currentBuild);
}
checkPollTimer();
// Load the updated logs for the build.
var options = {
'start': $scope.logStartIndex
};
ApiService.getRepoBuildLogsAsResource(params, true).withOptions(options).get(function(resp) {
if ($scope.logStartIndex != null && resp['start'] != $scope.logStartIndex) {
$scope.polling = false;
return;
}
if (build != $scope.currentBuild) { callback(false); return; }
processLogs(resp['logs'], resp['start']);
$scope.logStartIndex = resp['total'];
$scope.polling = false;
// Process the logs we've received.
$scope.logStartIndex = processLogs(resp['logs'], resp['start'], resp['total']);
// If the build status is an error, open the last two log entries.
if (currentBuild['phase'] == 'error' && $scope.logEntries.length > 1) {
@ -1205,9 +1180,15 @@ function RepoBuildCtrl($scope, Restangular, ApiService, $routeParams, $rootScope
openLogEntries($scope.logEntries[$scope.logEntries.length - 2]);
openLogEntries($scope.logEntries[$scope.logEntries.length - 1]);
}
// If the build phase is an error or a complete, then we mark the channel
// as closed.
callback(currentBuild['phase'] != 'error' && currentBuild['phase'] != 'complete');
}, function() {
$scope.polling = false;
callback(false);
});
}, function() {
callback(false);
});
};

View file

@ -77,7 +77,7 @@
<span class="container-content build-log-phase" phase="container"></span>
</div>
<div ng-switch-when="error">
<span class="container-content build-log-error" error="container"></span>
<span class="container-content build-log-error" error="container" entries="logEntries"></span>
</div>
<div ng-switch-when="command">
<span class="container-content build-log-command" command="container"></span>
@ -94,7 +94,7 @@
</div>
</div>
<div style="margin-top: 10px">
<span class="quay-spinner" ng-show="polling"></span>
<span class="quay-spinner" ng-show="pollChannel.working"></span>
<button class="btn" ng-show="(build.phase == 'error' || build.phase == 'complete') && build.resource_key"
ng-class="build.phase == 'error' ? 'btn-success' : 'btn-default'"
ng-click="askRestartBuild(build)">

View file

@ -75,7 +75,7 @@ class BaseStorage(StoragePaths):
def stream_read_file(self, path):
raise NotImplementedError
def stream_write(self, path, fp, content_type=None):
def stream_write(self, path, fp, content_type=None, content_encoding=None):
raise NotImplementedError
def list_directory(self, path=None):

View file

@ -128,7 +128,7 @@ class _CloudStorage(BaseStorage):
raise IOError('No such key: \'{0}\''.format(path))
return StreamReadKeyAsFile(key)
def stream_write(self, path, fp, content_type=None):
def stream_write(self, path, fp, content_type=None, content_encoding=None):
# Minimum size of upload part size on S3 is 5MB
self._initialize_cloud_conn()
buffer_size = 5 * 1024 * 1024
@ -140,6 +140,9 @@ class _CloudStorage(BaseStorage):
if content_type is not None:
metadata['Content-Type'] = content_type
if content_encoding is not None:
metadata['Content-Encoding'] = content_encoding
mp = self._cloud_bucket.initiate_multipart_upload(path, metadata=metadata,
**self._upload_params)
num_part = 1
@ -224,7 +227,7 @@ class GoogleCloudStorage(_CloudStorage):
connect_kwargs, upload_params, storage_path,
access_key, secret_key, bucket_name)
def stream_write(self, path, fp, content_type=None):
def stream_write(self, path, fp, content_type=None, content_encoding=None):
# Minimum size of upload part size on S3 is 5MB
self._initialize_cloud_conn()
path = self._init_path(path)
@ -233,6 +236,9 @@ class GoogleCloudStorage(_CloudStorage):
if content_type is not None:
key.set_metadata('Content-Type', content_type)
if content_encoding is not None:
key.set_metadata('Content-Encoding', content_encoding)
key.set_contents_from_stream(fp)

View file

@ -14,7 +14,7 @@ class FakeStorage(BaseStorage):
def stream_read(self, path):
yield ''
def stream_write(self, path, fp, content_type=None):
def stream_write(self, path, fp, content_type=None, content_encoding=None):
pass
def remove(self, path):

View file

@ -43,7 +43,7 @@ class LocalStorage(BaseStorage):
path = self._init_path(path)
return io.open(path, mode='rb')
def stream_write(self, path, fp, content_type=None):
def stream_write(self, path, fp, content_type=None, content_encoding=None):
# Size is mandatory
path = self._init_path(path, create=True)
with open(path, mode='wb') as f:

View file

@ -198,3 +198,11 @@ class TestBuildLogs(RedisBuildLogs):
return None
else:
return super(TestBuildLogs, self).get_status(build_id)
def expire_log_entries(self, build_id):
if build_id == self.test_build_id:
return
if not self.allow_delegate:
return None
else:
return super(TestBuildLogs, self).expire_log_entries(build_id)

View file

@ -0,0 +1,267 @@
# Adapted from https://gist.github.com/akaihola/1415730#file-streamingjson-py
# Copyright (c) Django Software Foundation and individual contributors.
# All rights reserved.
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# 3. Neither the name of Django nor the names of its contributors may be used
# to endorse or promote products derived from this software without
# specific prior written permission.
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
# ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import collections
import json
from json.encoder import encode_basestring, encode_basestring_ascii, FLOAT_REPR, INFINITY
from types import GeneratorType
class StreamingJSONEncoder(json.JSONEncoder):
def iterencode(self, o, _one_shot=False):
"""Encode the given object and yield each string
representation as available.
For example::
for chunk in StreamingJSONEncoder().iterencode(bigobject):
mysocket.write(chunk)
This method is a verbatim copy of
:meth:`json.JSONEncoder.iterencode`. It is
needed because we need to call our patched
:func:`streamingjsonencoder._make_iterencode`.
"""
if self.check_circular:
markers = {}
else:
markers = None
if self.ensure_ascii:
_encoder = encode_basestring_ascii
else:
_encoder = encode_basestring
if self.encoding != 'utf-8':
def _encoder(o, _orig_encoder=_encoder, _encoding=self.encoding):
if isinstance(o, str):
o = o.decode(_encoding)
return _orig_encoder(o)
def floatstr(o, allow_nan=self.allow_nan, _repr=FLOAT_REPR, _inf=INFINITY, _neginf=-INFINITY):
# Check for specials. Note that this type of test is processor- and/or
# platform-specific, so do tests which don't depend on the internals.
if o != o:
text = 'NaN'
elif o == _inf:
text = 'Infinity'
elif o == _neginf:
text = '-Infinity'
else:
return _repr(o)
if not allow_nan:
raise ValueError("Out of range float values are not JSON compliant: %r"
% (o,))
return text
_iterencode = _make_iterencode(
markers, self.default, _encoder, self.indent, floatstr,
self.key_separator, self.item_separator, self.sort_keys,
self.skipkeys, _one_shot)
return _iterencode(o, 0)
def _make_iterencode(markers, _default, _encoder, _indent, _floatstr, _key_separator,
_item_separator, _sort_keys, _skipkeys, _one_shot, False=False, True=True,
ValueError=ValueError, basestring=basestring, dict=dict, float=float,
GeneratorType=GeneratorType, id=id, int=int, isinstance=isinstance, list=list,
long=long, str=str, tuple=tuple):
"""
This is a patched version of
:func:`django.utils.simplejson.encoder.iterencode`. Whenever it encounters
a generator in the data structure, it encodes it as a JSON list.
"""
def _iterencode_list(lst, _current_indent_level):
if not lst:
# note: empty generators aren't caught here, see below
yield '[]'
return
if markers is not None:
markerid = id(lst)
if markerid in markers:
raise ValueError("Circular reference detected")
markers[markerid] = lst
buf = '['
if _indent is not None:
_current_indent_level += 1
newline_indent = '\n' + (' ' * (_indent * _current_indent_level))
separator = _item_separator + newline_indent
buf += newline_indent
else:
newline_indent = None
separator = _item_separator
first = True
for value in lst:
if first:
first = False
else:
buf = separator
if isinstance(value, basestring):
yield buf + _encoder(value)
elif value is None:
yield buf + 'null'
elif value is True:
yield buf + 'true'
elif value is False:
yield buf + 'false'
elif isinstance(value, (int, long)):
yield buf + str(value)
elif isinstance(value, float):
yield buf + _floatstr(value)
else:
yield buf
if isinstance(value, (list, tuple, GeneratorType)):
chunks = _iterencode_list(value, _current_indent_level)
elif isinstance(value, dict):
chunks = _iterencode_dict(value, _current_indent_level)
else:
chunks = _iterencode(value, _current_indent_level)
for chunk in chunks:
yield chunk
if first:
# we had an empty generator
yield buf
if newline_indent is not None:
_current_indent_level -= 1
yield '\n' + (' ' * (_indent * _current_indent_level))
yield ']'
if markers is not None:
del markers[markerid]
def _iterencode_dict(dct, _current_indent_level):
if not dct:
yield '{}'
return
if markers is not None:
markerid = id(dct)
if markerid in markers:
raise ValueError("Circular reference detected")
markers[markerid] = dct
yield '{'
if _indent is not None:
_current_indent_level += 1
newline_indent = '\n' + (' ' * (_indent * _current_indent_level))
item_separator = _item_separator + newline_indent
yield newline_indent
else:
newline_indent = None
item_separator = _item_separator
first = True
if _sort_keys:
items = dct.items()
items.sort(key=lambda kv: kv[0])
else:
items = dct.iteritems()
for key, value in items:
if isinstance(key, basestring):
pass
# JavaScript is weakly typed for these, so it makes sense to
# also allow them. Many encoders seem to do something like this.
elif isinstance(key, float):
key = _floatstr(key)
elif isinstance(key, (int, long)):
key = str(key)
elif key is True:
key = 'true'
elif key is False:
key = 'false'
elif key is None:
key = 'null'
elif _skipkeys:
continue
else:
raise TypeError("key %r is not a string" % (key,))
if first:
first = False
else:
yield item_separator
yield _encoder(key)
yield _key_separator
if isinstance(value, basestring):
yield _encoder(value)
elif value is None:
yield 'null'
elif value is True:
yield 'true'
elif value is False:
yield 'false'
elif isinstance(value, (int, long)):
yield str(value)
elif isinstance(value, float):
yield _floatstr(value)
else:
if isinstance(value, collections.Mapping):
chunks = _iterencode_dict(value, _current_indent_level)
elif isinstance(value, collections.Iterable):
chunks = _iterencode_list(value, _current_indent_level)
else:
chunks = _iterencode(value, _current_indent_level)
for chunk in chunks:
yield chunk
if newline_indent is not None:
_current_indent_level -= 1
yield '\n' + (' ' * (_indent * _current_indent_level))
yield '}'
if markers is not None:
del markers[markerid]
def _iterencode(o, _current_indent_level):
if isinstance(o, basestring):
yield _encoder(o)
elif o is None:
yield 'null'
elif o is True:
yield 'true'
elif o is False:
yield 'false'
elif isinstance(o, (int, long)):
yield str(o)
elif isinstance(o, float):
yield _floatstr(o)
elif isinstance(o, collections.Mapping):
for chunk in _iterencode_dict(o, _current_indent_level):
yield chunk
elif isinstance(o, collections.Iterable):
for chunk in _iterencode_list(o, _current_indent_level):
yield chunk
else:
if markers is not None:
markerid = id(o)
if markerid in markers:
raise ValueError("Circular reference detected")
markers[markerid] = o
o = _default(o)
for chunk in _iterencode(o, _current_indent_level):
yield chunk
if markers is not None:
del markers[markerid]
return _iterencode

View file

@ -0,0 +1,55 @@
import logging
from apscheduler.schedulers.blocking import BlockingScheduler
from peewee import fn
from tempfile import SpooledTemporaryFile
from gzip import GzipFile
from data import model
from data.archivedlogs import JSON_MIMETYPE
from data.database import RepositoryBuild
from app import build_logs, log_archive
from util.streamingjsonencoder import StreamingJSONEncoder
POLL_PERIOD_SECONDS = 30
logger = logging.getLogger(__name__)
sched = BlockingScheduler()
@sched.scheduled_job(trigger='interval', seconds=30)
def archive_redis_buildlogs():
""" Archive a single build, choosing a candidate at random. This process must be idempotent to
avoid needing two-phase commit. """
try:
# Get a random build to archive
to_archive = model.archivable_buildlogs_query().order_by(fn.Random()).get()
logger.debug('Archiving: %s', to_archive.uuid)
length, entries = build_logs.get_log_entries(to_archive.uuid, 0)
to_encode = {
'start': 0,
'total': length,
'logs': entries,
}
with SpooledTemporaryFile() as tempfile:
with GzipFile('testarchive', fileobj=tempfile) as zipstream:
for chunk in StreamingJSONEncoder().iterencode(to_encode):
zipstream.write(chunk)
tempfile.seek(0)
log_archive.store_file(tempfile, JSON_MIMETYPE, content_encoding='gzip',
file_id=to_archive.uuid)
to_archive.logs_archived = True
to_archive.save()
build_logs.expire_log_entries(to_archive.uuid)
except RepositoryBuild.DoesNotExist:
logger.debug('No more builds to archive')
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
sched.start()

View file

@ -1,6 +1,7 @@
import logging.config
logging.config.fileConfig('conf/logging.conf', disable_existing_loggers=False)
if __name__ == "__main__":
logging.config.fileConfig('conf/logging.conf', disable_existing_loggers=False)
import logging
import argparse
@ -23,6 +24,7 @@ from collections import defaultdict
from requests.exceptions import ConnectionError
from data import model
from data.database import BUILD_PHASE
from workers.worker import Worker, WorkerUnhealthyException, JobException
from app import userfiles as user_files, build_logs, sentry, dockerfile_build_queue
from endpoints.notificationhelper import spawn_notification
@ -223,6 +225,13 @@ class DockerfileBuildContext(object):
if self._pull_credentials:
logger.debug('Logging in with pull credentials: %s@%s',
self._pull_credentials['username'], self._pull_credentials['registry'])
self._build_logger('Pulling base image: %s' % image_and_tag, log_data = {
'phasestep': 'login',
'username': self._pull_credentials['username'],
'registry': self._pull_credentials['registry']
})
self._build_cl.login(self._pull_credentials['username'], self._pull_credentials['password'],
registry=self._pull_credentials['registry'], reauth=True)
@ -233,7 +242,12 @@ class DockerfileBuildContext(object):
raise JobException('Missing FROM command in Dockerfile')
image_and_tag = ':'.join(image_and_tag_tuple)
self._build_logger('Pulling base image: %s' % image_and_tag)
self._build_logger('Pulling base image: %s' % image_and_tag, log_data = {
'phasestep': 'pull',
'repo_url': image_and_tag
})
pull_status = self._build_cl.pull(image_and_tag, stream=True)
self.__monitor_completion(pull_status, 'Downloading', self._status, 'pull_completion')
@ -545,7 +559,7 @@ class DockerfileBuildWorker(Worker):
if c_type not in self._mime_processors:
log_appender('error', build_logs.PHASE)
repository_build.phase = 'error'
repository_build.phase = BUILD_PHASE.ERROR
repository_build.save()
message = 'Unknown mime-type: %s' % c_type
log_appender(message, build_logs.ERROR)
@ -554,7 +568,7 @@ class DockerfileBuildWorker(Worker):
# Try to build the build directory package from the buildpack.
log_appender('unpacking', build_logs.PHASE)
repository_build.phase = 'unpacking'
repository_build.phase = BUILD_PHASE.UNPACKING
repository_build.save()
build_dir = None
@ -572,20 +586,20 @@ class DockerfileBuildWorker(Worker):
repository_build.uuid, self._cache_size_gb,
pull_credentials) as build_ctxt:
log_appender('pulling', build_logs.PHASE)
repository_build.phase = 'pulling'
repository_build.phase = BUILD_PHASE.PULLING
repository_build.save()
build_ctxt.pull()
self.extend_processing(RESERVATION_TIME)
log_appender('building', build_logs.PHASE)
repository_build.phase = 'building'
repository_build.phase = BUILD_PHASE.BUILDING
repository_build.save()
built_image = build_ctxt.build(self.extend_processing)
if not built_image:
log_appender('error', build_logs.PHASE)
repository_build.phase = 'error'
repository_build.phase = BUILD_PHASE.ERROR
repository_build.save()
message = 'Unable to build dockerfile.'
@ -598,13 +612,13 @@ class DockerfileBuildWorker(Worker):
self.extend_processing(RESERVATION_TIME)
log_appender('pushing', build_logs.PHASE)
repository_build.phase = 'pushing'
repository_build.phase = BUILD_PHASE.PUSHING
repository_build.save()
build_ctxt.push(built_image)
log_appender('complete', build_logs.PHASE)
repository_build.phase = 'complete'
repository_build.phase = BUILD_PHASE.COMPLETE
repository_build.save()
# Spawn a notification that the build has completed.
@ -641,20 +655,20 @@ class DockerfileBuildWorker(Worker):
sentry.client.captureException()
log_appender('error', build_logs.PHASE)
logger.exception('Exception when processing request.')
repository_build.phase = 'error'
repository_build.phase = BUILD_PHASE.ERROR
repository_build.save()
log_appender(str(exc), build_logs.ERROR)
# Raise the exception to the queue.
raise JobException(str(exc))
desc = 'Worker daemon to monitor dockerfile build'
parser = argparse.ArgumentParser(description=desc)
parser.add_argument('--cachegb', default=20, type=float,
if __name__ == "__main__":
desc = 'Worker daemon to monitor dockerfile build'
parser = argparse.ArgumentParser(description=desc)
parser.add_argument('--cachegb', default=20, type=float,
help='Maximum cache size in gigabytes.')
args = parser.parse_args()
args = parser.parse_args()
worker = DockerfileBuildWorker(args.cachegb, dockerfile_build_queue,
worker = DockerfileBuildWorker(args.cachegb, dockerfile_build_queue,
reservation_seconds=RESERVATION_TIME)
worker.start(start_status_server_port=8000)
worker.start(start_status_server_port=8000)