Merge branch 'master' of github.com:coreos-inc/quay

This commit is contained in:
Joseph Schorr 2015-02-19 16:22:05 -05:00
commit f650479266
21 changed files with 249 additions and 144 deletions

2
app.py
View file

@ -32,6 +32,7 @@ from util.queuemetrics import QueueMetrics
from util.config.provider import FileConfigProvider, TestConfigProvider
from util.config.configutil import generate_secret_key
from util.config.superusermanager import SuperUserManager
from buildman.jobutil.buildreporter import BuildMetrics
OVERRIDE_CONFIG_DIRECTORY = 'conf/stack/'
OVERRIDE_CONFIG_YAML_FILENAME = 'conf/stack/config.yaml'
@ -118,6 +119,7 @@ userevents = UserEventsBuilderModule(app)
superusers = SuperUserManager(app)
signer = Signer(app, OVERRIDE_CONFIG_DIRECTORY)
queue_metrics = QueueMetrics(app)
build_metrics = BuildMetrics(app)
tf = app.config['DB_TRANSACTION_FACTORY']

12
buildman/enums.py Normal file
View file

@ -0,0 +1,12 @@
class BuildJobResult(object):
""" Build job result enum """
INCOMPLETE = 'incomplete'
COMPLETE = 'complete'
ERROR = 'error'
class BuildServerStatus(object):
""" Build server status enum """
STARTING = 'starting'
RUNNING = 'running'
SHUTDOWN = 'shutting_down'

View file

@ -0,0 +1,72 @@
from trollius import From
from buildman.enums import BuildJobResult
from util.cloudwatch import get_queue
class BuildReporter(object):
"""
Base class for reporting build statuses to a metrics service.
"""
def report_completion_status(self, status):
"""
Method to invoke the recording of build's completion status to a metric service.
"""
raise NotImplementedError
class NullReporter(BuildReporter):
"""
The /dev/null of BuildReporters.
"""
def report_completion_status(self, *args):
pass
class CloudWatchBuildReporter(BuildReporter):
"""
Implements a BuildReporter for Amazon's CloudWatch.
"""
def __init__(self, queue, namespace_name, completed_name, failed_name, incompleted_name):
self._queue = queue
self._namespace_name = namespace_name
self._completed_name = completed_name
self._failed_name = failed_name
self._incompleted_name = incompleted_name
def _send_to_queue(self, *args, **kwargs):
self._queue.put((args, kwargs))
def report_completion_status(self, status):
if status == BuildJobResult.COMPLETE:
status_name = self._completed_name
elif status == BuildJobResult.ERROR:
status_name = self._failed_name
elif status == BuildJobResult.INCOMPLETE:
status_name = self._incompleted_name
else:
return
self._send_to_queue(self._namespace_name, status_name, 1, unit='Count')
class BuildMetrics(object):
"""
BuildMetrics initializes a reporter for recording the status of build completions.
"""
def __init__(self, app=None):
self._app = app
self._reporter = NullReporter()
if app is not None:
reporter_type = app.config.get('BUILD_METRICS_TYPE', 'Null')
if reporter_type == 'CloudWatch':
namespace = app.config['BUILD_METRICS_NAMESPACE']
completed_name = app.config['BUILD_METRICS_COMPLETED_NAME']
failed_name = app.config['BUILD_METRICS_FAILED_NAME']
incompleted_name = app.config['BUILD_METRICS_INCOMPLETED_NAME']
request_queue = get_queue(app)
self._reporter = CloudWatchBuildReporter(request_queue, namespace, completed_name,
failed_name, incompleted_name)
def __getattr__(self, name):
return getattr(self._reporter, name, None)

View file

@ -9,14 +9,15 @@ from autobahn.wamp import types
from aiowsgi import create_server as create_wsgi_server
from flask import Flask
from threading import Event
from trollius.tasks import Task
from trollius.coroutines import From
from datetime import timedelta
from buildman.enums import BuildJobResult, BuildServerStatus
from buildman.jobutil.buildstatus import StatusHandler
from buildman.jobutil.buildjob import BuildJob, BuildJobLoadException
from data import database
from data.queue import WorkQueue
from app import app
from app import app, build_metrics
logger = logging.getLogger(__name__)
@ -27,12 +28,6 @@ MINIMUM_JOB_EXTENSION = timedelta(minutes=2)
HEARTBEAT_PERIOD_SEC = 30
class BuildJobResult(object):
""" Build job result enum """
INCOMPLETE = 'incomplete'
COMPLETE = 'complete'
ERROR = 'error'
class BuilderServer(object):
""" Server which handles both HTTP and WAMP requests, managing the full state of the build
controller.
@ -40,7 +35,7 @@ class BuilderServer(object):
def __init__(self, registry_hostname, queue, build_logs, user_files, lifecycle_manager_klass,
lifecycle_manager_config, manager_hostname):
self._loop = None
self._current_status = 'starting'
self._current_status = BuildServerStatus.STARTING
self._current_components = []
self._job_count = 0
@ -60,7 +55,7 @@ class BuilderServer(object):
self._lifecycle_manager_config = lifecycle_manager_config
self._shutdown_event = Event()
self._current_status = 'running'
self._current_status = BuildServerStatus.RUNNING
self._register_controller()
@ -97,8 +92,14 @@ class BuilderServer(object):
logger.debug('Starting server on port %s, with controller on port %s', websocket_port,
controller_port)
TASKS = [
Task(self._initialize(loop, host, websocket_port, controller_port, ssl)),
Task(self._queue_metrics_updater()),
]
try:
loop.run_until_complete(self._initialize(loop, host, websocket_port, controller_port, ssl))
loop.run_until_complete(trollius.wait(TASKS))
except KeyboardInterrupt:
pass
finally:
@ -106,7 +107,7 @@ class BuilderServer(object):
def close(self):
logger.debug('Requested server shutdown')
self._current_status = 'shutting_down'
self._current_status = BuildServerStatus.SHUTDOWN
self._lifecycle_manager.shutdown()
self._shutdown_event.wait()
logger.debug('Shutting down server')
@ -147,12 +148,14 @@ class BuilderServer(object):
self._job_count = self._job_count - 1
if self._current_status == 'shutting_down' and not self._job_count:
if self._current_status == BuildServerStatus.SHUTDOWN and not self._job_count:
self._shutdown_event.set()
build_metrics.report_completion_status(job_status)
@trollius.coroutine
def _work_checker(self):
while self._current_status == 'running':
while self._current_status == BuildServerStatus.RUNNING:
with database.CloseForLongOperation(app.config):
yield From(trollius.sleep(WORK_CHECK_TIMEOUT))
@ -183,7 +186,11 @@ class BuilderServer(object):
logger.debug('All workers are busy. Requeuing.')
self._queue.incomplete(job_item, restore_retry=True, retry_after=0)
@trollius.coroutine
def _queue_metrics_updater(self):
while self._current_status == BuildServerStatus.RUNNING:
yield From(trollius.sleep(30))
self._queue.update_metrics()
@trollius.coroutine
def _initialize(self, loop, host, websocket_port, controller_port, ssl=None):

View file

@ -1,7 +1,6 @@
bind = '0.0.0.0:5000'
workers = 2
worker_class = 'gevent'
timeout = 2000
daemon = False
logconfig = 'conf/logging_debug.conf'
pythonpath = '.'

View file

@ -1,7 +1,6 @@
bind = 'unix:/tmp/gunicorn_registry.sock'
workers = 8
worker_class = 'gevent'
timeout = 2000
logconfig = 'conf/logging.conf'
pythonpath = '.'
preload_app = True

View file

@ -1,6 +1,5 @@
bind = 'unix:/tmp/gunicorn_verbs.sock'
workers = 4
timeout = 2000
logconfig = 'conf/logging.conf'
pythonpath = '.'
preload_app = True

View file

@ -1,7 +1,6 @@
bind = 'unix:/tmp/gunicorn_web.sock'
workers = 2
worker_class = 'gevent'
timeout = 30
logconfig = 'conf/logging.conf'
pythonpath = '.'
preload_app = True

View file

@ -34,7 +34,6 @@ location /v1/repositories/ {
proxy_request_buffering off;
proxy_pass http://registry_app_server;
proxy_read_timeout 2000;
proxy_temp_path /var/log/nginx/proxy_temp 1 2;
client_max_body_size 20G;
@ -48,7 +47,6 @@ location /v1/ {
proxy_request_buffering off;
proxy_pass http://registry_app_server;
proxy_read_timeout 2000;
proxy_temp_path /var/log/nginx/proxy_temp 1 2;
client_max_body_size 20G;
@ -60,7 +58,6 @@ location /c1/ {
proxy_request_buffering off;
proxy_pass http://verbs_app_server;
proxy_read_timeout 2000;
proxy_temp_path /var/log/nginx/proxy_temp 1 2;
limit_req zone=api burst=5 nodelay;
@ -80,7 +77,6 @@ location /v1/_ping {
location ~ ^/b1/controller(/?)(.*) {
proxy_pass http://build_manager_controller_server/$2;
proxy_read_timeout 2000;
}
location ~ ^/b1/socket(/?)(.*) {

View file

@ -35,7 +35,6 @@ location /v1/ {
proxy_request_buffering off;
proxy_pass http://registry_app_server;
proxy_read_timeout 2000;
proxy_temp_path /var/log/nginx/proxy_temp 1 2;
client_max_body_size 20G;
@ -47,7 +46,6 @@ location /c1/ {
proxy_request_buffering off;
proxy_pass http://verbs_app_server;
proxy_read_timeout 2000;
proxy_temp_path /var/log/nginx/proxy_temp 1 2;
}
@ -65,7 +63,6 @@ location /v1/_ping {
location ~ ^/b1/controller(/?)(.*) {
proxy_pass http://build_manager_controller_server/$2;
proxy_read_timeout 2000;
}
location ~ ^/b1/socket(/?)(.*) {

View file

@ -194,4 +194,7 @@ class DefaultConfig(object):
SYSTEM_SERVICES_PATH = "conf/init/"
# Services that should not be shown in the logs view.
SYSTEM_SERVICE_BLACKLIST = []
SYSTEM_SERVICE_BLACKLIST = []
# Temporary tag expiration in seconds, this may actually be longer based on GC policy
PUSH_TEMP_TAG_EXPIRATION_SEC = 60 * 60

View file

@ -469,6 +469,7 @@ class RepositoryTag(BaseModel):
repository = ForeignKeyField(Repository)
lifetime_start_ts = IntegerField(default=_get_epoch_timestamp)
lifetime_end_ts = IntegerField(null=True, index=True)
hidden = BooleanField(default=False)
class Meta:
database = db

View file

@ -0,0 +1,26 @@
"""Allow tags to be marked as hidden.
Revision ID: 4ef04c61fcf9
Revises: 509d2857566f
Create Date: 2015-02-18 16:34:16.586129
"""
# revision identifiers, used by Alembic.
revision = '4ef04c61fcf9'
down_revision = '509d2857566f'
from alembic import op
import sqlalchemy as sa
def upgrade(tables):
### commands auto generated by Alembic - please adjust! ###
op.add_column('repositorytag', sa.Column('hidden', sa.Boolean(), nullable=False, server_default=sa.sql.expression.false()))
### end Alembic commands ###
def downgrade(tables):
### commands auto generated by Alembic - please adjust! ###
op.drop_column('repositorytag', 'hidden')
### end Alembic commands ###

View file

@ -5,6 +5,7 @@ import json
import time
from datetime import datetime, timedelta, date
from uuid import uuid4
from data.database import (User, Repository, Image, AccessToken, Role, RepositoryPermission,
Visibility, RepositoryTag, EmailConfirmation, FederatedLogin,
@ -1561,15 +1562,20 @@ def _tag_alive(query):
(RepositoryTag.lifetime_end_ts > int(time.time())))
def list_repository_tags(namespace_name, repository_name):
return _tag_alive(RepositoryTag
.select(RepositoryTag, Image)
.join(Repository)
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
.switch(RepositoryTag)
.join(Image)
.where(Repository.name == repository_name,
Namespace.username == namespace_name))
def list_repository_tags(namespace_name, repository_name, include_hidden=False):
query = _tag_alive(RepositoryTag
.select(RepositoryTag, Image)
.join(Repository)
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
.switch(RepositoryTag)
.join(Image)
.where(Repository.name == repository_name,
Namespace.username == namespace_name))
if not include_hidden:
query = query.where(RepositoryTag.hidden == False)
return query
def _garbage_collect_tags(namespace_name, repository_name):
@ -1628,8 +1634,6 @@ def garbage_collect_repository(namespace_name, repository_name):
logger.info('Garbage collecting storage for images: %s', to_remove)
_garbage_collect_storage(storage_id_whitelist)
return len(to_remove)
def _garbage_collect_storage(storage_id_whitelist):
if len(storage_id_whitelist) == 0:
@ -1788,10 +1792,8 @@ def create_or_update_tag(namespace_name, repository_name, tag_name,
# No tag that needs to be ended
pass
tag = RepositoryTag.create(repository=repo, image=image, name=tag_name,
lifetime_start_ts=now_ts)
return tag
return RepositoryTag.create(repository=repo, image=image, name=tag_name,
lifetime_start_ts=now_ts)
def delete_tag(namespace_name, repository_name, tag_name):
@ -1814,6 +1816,17 @@ def delete_tag(namespace_name, repository_name, tag_name):
found.save()
def create_temporary_hidden_tag(repo, image, expiration_s):
""" Create a tag with a defined timeline, that will not appear in the UI or CLI. Returns the name
of the temporary tag. """
now_ts = int(time.time())
expire_ts = now_ts + expiration_s
tag_name = str(uuid4())
RepositoryTag.create(repository=repo, image=image, name=tag_name, lifetime_start_ts=now_ts,
lifetime_end_ts=expire_ts, hidden=True)
return tag_name
def purge_all_repository_tags(namespace_name, repository_name):
""" Immediately purge all repository tags without respecting the lifeline procedure """
try:

View file

@ -222,32 +222,6 @@ def create_repository(namespace, repository):
repo = model.create_repository(namespace, repository,
get_authenticated_user())
logger.debug('Determining already added images')
added_images = OrderedDict([(desc['id'], desc) for desc in image_descriptions])
new_repo_images = dict(added_images)
# Optimization: Lookup any existing images in the repository with matching docker IDs and
# remove them from the added dict, so we don't need to look them up one-by-one.
def chunks(l, n):
for i in xrange(0, len(l), n):
yield l[i:i+n]
# Note: We do this in chunks in an effort to not hit the SQL query size limit.
for chunk in chunks(new_repo_images.keys(), 50):
existing_images = model.lookup_repository_images(namespace, repository, chunk)
for existing in existing_images:
added_images.pop(existing.docker_image_id)
logger.debug('Creating/Linking necessary images')
username = get_authenticated_user() and get_authenticated_user().username
translations = {}
for image_description in added_images.values():
model.find_create_or_link_image(image_description['id'], repo, username,
translations, storage.preferred_locations[0])
logger.debug('Created images')
track_and_log('push_repo', repo)
return make_response('Created', 201)
@ -280,7 +254,7 @@ def update_images(namespace, repository):
event.publish_event_data('docker-cli', user_data)
logger.debug('GCing repository')
num_removed = model.garbage_collect_repository(namespace, repository)
model.garbage_collect_repository(namespace, repository)
# Generate a job for each notification that has been added to this repo
logger.debug('Adding notifications for repository')
@ -288,8 +262,8 @@ def update_images(namespace, repository):
updated_tags = session.get('pushed_tags', {})
event_data = {
'updated_tags': updated_tags,
'pruned_image_count': num_removed
}
track_and_log('push_repo', repo)
spawn_notification(repo, 'repo_push', event_data)
return make_response('Updated', 204)

View file

@ -9,6 +9,7 @@ from time import time
from app import storage as store, image_diff_queue, app
from auth.auth import process_auth, extract_namespace_repo_from_session
from auth.auth_context import get_authenticated_user
from util import checksums, changes
from util.http import abort, exact_abort
from auth.permissions import (ReadRepositoryPermission,
@ -456,9 +457,19 @@ def put_image_json(namespace, repository, image_id):
logger.debug('Looking up repo image')
repo_image = model.get_repo_image_extended(namespace, repository, image_id)
if not repo_image:
logger.debug('Image not found')
abort(404, 'Image %(image_id)s not found', issue='unknown-image',
image_id=image_id)
logger.debug('Image not found, creating image')
repo = model.get_repository(namespace, repository)
if repo is None:
abort(404, 'Repository does not exist: %(namespace)s/%(repository)s', issue='no-repo',
namespace=namespace, repository=repository)
username = get_authenticated_user() and get_authenticated_user().username
repo_image = model.find_create_or_link_image(image_id, repo, username, {},
store.preferred_locations[0])
# Create a temporary tag to prevent this image from getting garbage collected while the push
# is in progress.
model.create_temporary_hidden_tag(repo, repo_image, app.config['PUSH_TEMP_TAG_EXPIRATION_SEC'])
uuid = repo_image.storage.uuid

View file

@ -1 +1 @@
TEST=true python -m unittest discover
TEST=true TROLLIUSDEBUG=1 python -m unittest discover

Binary file not shown.

47
util/cloudwatch.py Normal file
View file

@ -0,0 +1,47 @@
import logging
import boto
from Queue import Queue
from threading import Thread
logger = logging.getLogger(__name__)
def get_queue(app):
"""
Returns a queue to the CloudWatchSender. If a queue/sender do not exist, creates them.
"""
access_key = app.config['CLOUDWATCH_AWS_ACCESS_KEY']
secret_key = app.config['CLOUDWATCH_AWS_SECRET_KEY']
queue = Queue()
sender = CloudWatchSender(queue, access_key, secret_key)
sender.start()
return queue
class CloudWatchSender(Thread):
"""
CloudWatchSender loops indefinitely and pulls metrics off of a queue then sends it to CloudWatch.
"""
def __init__(self, request_queue, aws_access_key, aws_secret_key):
Thread.__init__(self)
self.daemon = True
self._aws_access_key = aws_access_key
self._aws_secret_key = aws_secret_key
self._put_metrics_queue = request_queue
def run(self):
try:
logger.debug('Starting CloudWatch sender process.')
connection = boto.connect_cloudwatch(self._aws_access_key, self._aws_secret_key)
except:
logger.exception('Failed to connect to CloudWatch.')
while True:
put_metric_args, kwargs = self._put_metrics_queue.get()
logger.debug('Got queued put metrics request.')
try:
connection.put_metric_data(*put_metric_args, **kwargs)
except:
logger.exception('Failed to write to CloudWatch')

View file

@ -1,8 +1,6 @@
import logging
import boto
from Queue import Queue
from threading import Thread
from util.cloudwatch import get_queue
logger = logging.getLogger(__name__)
@ -13,8 +11,8 @@ class NullReporter(object):
pass
class QueueingCloudWatchReporter(object):
""" QueueingCloudWatchReporter reports metrics to the "SendToCloudWatch" process """
class CloudWatchReporter(object):
""" CloudWatchReporter reports work queue metrics to CloudWatch """
def __init__(self, request_queue, namespace, need_capacity_name, build_percent_name):
self._namespace = namespace
self._need_capacity_name = need_capacity_name
@ -36,70 +34,23 @@ class QueueingCloudWatchReporter(object):
self._send_to_queue(self._namespace, self._build_percent_name, building_percent,
unit='Percent')
class SendToCloudWatch(Thread):
""" SendToCloudWatch loops indefinitely and pulls metrics off of a queue then sends it to
CloudWatch. """
def __init__(self, request_queue, aws_access_key, aws_secret_key):
Thread.__init__(self)
self.daemon = True
self._aws_access_key = aws_access_key
self._aws_secret_key = aws_secret_key
self._put_metrics_queue = request_queue
def run(self):
try:
logger.debug('Starting CloudWatch sender process.')
connection = boto.connect_cloudwatch(self._aws_access_key, self._aws_secret_key)
except:
logger.exception('Failed to connect to CloudWatch.')
while True:
put_metric_args, kwargs = self._put_metrics_queue.get()
logger.debug('Got queued put metrics request.')
try:
connection.put_metric_data(*put_metric_args, **kwargs)
except:
logger.exception('Failed to write to CloudWatch')
class QueueMetrics(object):
"""
QueueMetrics initializes a reporter for recording metrics of work queues.
"""
def __init__(self, app=None):
self.app = app
self.sender = None
self._app = app
self._reporter = NullReporter()
if app is not None:
self.state = self.init_app(app)
else:
self.state = None
reporter_type = app.config.get('QUEUE_METRICS_TYPE', 'Null')
if reporter_type == 'CloudWatch':
namespace = app.config['QUEUE_METRICS_NAMESPACE']
req_capacity_name = app.config['QUEUE_METRICS_CAPACITY_SHORTAGE_NAME']
build_percent_name = app.config['QUEUE_METRICS_BUILD_PERCENT_NAME']
def init_app(self, app):
analytics_type = app.config.get('QUEUE_METRICS_TYPE', 'Null')
if analytics_type == 'CloudWatch':
access_key = app.config.get('QUEUE_METRICS_AWS_ACCESS_KEY')
secret_key = app.config.get('QUEUE_METRICS_AWS_SECRET_KEY')
namespace = app.config.get('QUEUE_METRICS_NAMESPACE')
req_capacity_name = app.config.get('QUEUE_METRICS_CAPACITY_SHORTAGE_NAME')
build_percent_name = app.config.get('QUEUE_METRICS_BUILD_PERCENT_NAME')
request_queue = Queue()
reporter = QueueingCloudWatchReporter(request_queue, namespace, req_capacity_name,
build_percent_name)
self.sender = SendToCloudWatch(request_queue, access_key, secret_key)
else:
reporter = NullReporter()
# register extension with app
app.extensions = getattr(app, 'extensions', {})
app.extensions['queuemetrics'] = reporter
return reporter
def run(self):
logger.debug('Asked to start CloudWatch reporter')
if self.sender is not None:
logger.debug('Starting CloudWatch reporter')
self.sender.start()
request_queue = get_queue(app)
self._reporter = CloudWatchReporter(request_queue, namespace, req_capacity_name,
build_percent_name)
def __getattr__(self, name):
return getattr(self.state, name, None)
return getattr(self._reporter, name, None)

5
web.py
View file

@ -1,7 +1,7 @@
import logging
import logging.config
from app import app as application, queue_metrics
from app import app as application
from endpoints.api import api_bp
from endpoints.web import web
@ -9,9 +9,6 @@ from endpoints.webhooks import webhooks
from endpoints.realtime import realtime
from endpoints.callbacks import callback
# Start the cloudwatch reporting.
queue_metrics.run()
application.register_blueprint(web)
application.register_blueprint(callback, url_prefix='/oauth2')
application.register_blueprint(api_bp, url_prefix='/api')