diff --git a/app.py b/app.py index c3b15d7aa..78243de75 100644 --- a/app.py +++ b/app.py @@ -32,6 +32,7 @@ from util.queuemetrics import QueueMetrics from util.config.provider import FileConfigProvider, TestConfigProvider from util.config.configutil import generate_secret_key from util.config.superusermanager import SuperUserManager +from buildman.jobutil.buildreporter import BuildMetrics OVERRIDE_CONFIG_DIRECTORY = 'conf/stack/' OVERRIDE_CONFIG_YAML_FILENAME = 'conf/stack/config.yaml' @@ -118,6 +119,7 @@ userevents = UserEventsBuilderModule(app) superusers = SuperUserManager(app) signer = Signer(app, OVERRIDE_CONFIG_DIRECTORY) queue_metrics = QueueMetrics(app) +build_metrics = BuildMetrics(app) tf = app.config['DB_TRANSACTION_FACTORY'] diff --git a/buildman/enums.py b/buildman/enums.py new file mode 100644 index 000000000..3d38217fe --- /dev/null +++ b/buildman/enums.py @@ -0,0 +1,12 @@ +class BuildJobResult(object): + """ Build job result enum """ + INCOMPLETE = 'incomplete' + COMPLETE = 'complete' + ERROR = 'error' + + +class BuildServerStatus(object): + """ Build server status enum """ + STARTING = 'starting' + RUNNING = 'running' + SHUTDOWN = 'shutting_down' diff --git a/buildman/jobutil/buildreporter.py b/buildman/jobutil/buildreporter.py new file mode 100644 index 000000000..16dd0ca5b --- /dev/null +++ b/buildman/jobutil/buildreporter.py @@ -0,0 +1,72 @@ +from trollius import From + +from buildman.enums import BuildJobResult +from util.cloudwatch import get_queue + + +class BuildReporter(object): + """ + Base class for reporting build statuses to a metrics service. + """ + def report_completion_status(self, status): + """ + Method to invoke the recording of build's completion status to a metric service. + """ + raise NotImplementedError + + +class NullReporter(BuildReporter): + """ + The /dev/null of BuildReporters. + """ + def report_completion_status(self, *args): + pass + + +class CloudWatchBuildReporter(BuildReporter): + """ + Implements a BuildReporter for Amazon's CloudWatch. + """ + def __init__(self, queue, namespace_name, completed_name, failed_name, incompleted_name): + self._queue = queue + self._namespace_name = namespace_name + self._completed_name = completed_name + self._failed_name = failed_name + self._incompleted_name = incompleted_name + + def _send_to_queue(self, *args, **kwargs): + self._queue.put((args, kwargs)) + + def report_completion_status(self, status): + if status == BuildJobResult.COMPLETE: + status_name = self._completed_name + elif status == BuildJobResult.ERROR: + status_name = self._failed_name + elif status == BuildJobResult.INCOMPLETE: + status_name = self._incompleted_name + else: + return + + self._send_to_queue(self._namespace_name, status_name, 1, unit='Count') + + +class BuildMetrics(object): + """ + BuildMetrics initializes a reporter for recording the status of build completions. + """ + def __init__(self, app=None): + self._app = app + self._reporter = NullReporter() + if app is not None: + reporter_type = app.config.get('BUILD_METRICS_TYPE', 'Null') + if reporter_type == 'CloudWatch': + namespace = app.config['BUILD_METRICS_NAMESPACE'] + completed_name = app.config['BUILD_METRICS_COMPLETED_NAME'] + failed_name = app.config['BUILD_METRICS_FAILED_NAME'] + incompleted_name = app.config['BUILD_METRICS_INCOMPLETED_NAME'] + request_queue = get_queue(app) + self._reporter = CloudWatchBuildReporter(request_queue, namespace, completed_name, + failed_name, incompleted_name) + + def __getattr__(self, name): + return getattr(self._reporter, name, None) diff --git a/buildman/server.py b/buildman/server.py index f6ba9b4bc..855afc212 100644 --- a/buildman/server.py +++ b/buildman/server.py @@ -9,14 +9,15 @@ from autobahn.wamp import types from aiowsgi import create_server as create_wsgi_server from flask import Flask from threading import Event +from trollius.tasks import Task from trollius.coroutines import From from datetime import timedelta +from buildman.enums import BuildJobResult, BuildServerStatus from buildman.jobutil.buildstatus import StatusHandler from buildman.jobutil.buildjob import BuildJob, BuildJobLoadException from data import database -from data.queue import WorkQueue -from app import app +from app import app, build_metrics logger = logging.getLogger(__name__) @@ -27,12 +28,6 @@ MINIMUM_JOB_EXTENSION = timedelta(minutes=2) HEARTBEAT_PERIOD_SEC = 30 -class BuildJobResult(object): - """ Build job result enum """ - INCOMPLETE = 'incomplete' - COMPLETE = 'complete' - ERROR = 'error' - class BuilderServer(object): """ Server which handles both HTTP and WAMP requests, managing the full state of the build controller. @@ -40,7 +35,7 @@ class BuilderServer(object): def __init__(self, registry_hostname, queue, build_logs, user_files, lifecycle_manager_klass, lifecycle_manager_config, manager_hostname): self._loop = None - self._current_status = 'starting' + self._current_status = BuildServerStatus.STARTING self._current_components = [] self._job_count = 0 @@ -60,7 +55,7 @@ class BuilderServer(object): self._lifecycle_manager_config = lifecycle_manager_config self._shutdown_event = Event() - self._current_status = 'running' + self._current_status = BuildServerStatus.RUNNING self._register_controller() @@ -97,8 +92,14 @@ class BuilderServer(object): logger.debug('Starting server on port %s, with controller on port %s', websocket_port, controller_port) + + TASKS = [ + Task(self._initialize(loop, host, websocket_port, controller_port, ssl)), + Task(self._queue_metrics_updater()), + ] + try: - loop.run_until_complete(self._initialize(loop, host, websocket_port, controller_port, ssl)) + loop.run_until_complete(trollius.wait(TASKS)) except KeyboardInterrupt: pass finally: @@ -106,7 +107,7 @@ class BuilderServer(object): def close(self): logger.debug('Requested server shutdown') - self._current_status = 'shutting_down' + self._current_status = BuildServerStatus.SHUTDOWN self._lifecycle_manager.shutdown() self._shutdown_event.wait() logger.debug('Shutting down server') @@ -147,12 +148,14 @@ class BuilderServer(object): self._job_count = self._job_count - 1 - if self._current_status == 'shutting_down' and not self._job_count: + if self._current_status == BuildServerStatus.SHUTDOWN and not self._job_count: self._shutdown_event.set() + build_metrics.report_completion_status(job_status) + @trollius.coroutine def _work_checker(self): - while self._current_status == 'running': + while self._current_status == BuildServerStatus.RUNNING: with database.CloseForLongOperation(app.config): yield From(trollius.sleep(WORK_CHECK_TIMEOUT)) @@ -183,7 +186,11 @@ class BuilderServer(object): logger.debug('All workers are busy. Requeuing.') self._queue.incomplete(job_item, restore_retry=True, retry_after=0) - + @trollius.coroutine + def _queue_metrics_updater(self): + while self._current_status == BuildServerStatus.RUNNING: + yield From(trollius.sleep(30)) + self._queue.update_metrics() @trollius.coroutine def _initialize(self, loop, host, websocket_port, controller_port, ssl=None): diff --git a/conf/gunicorn_local.py b/conf/gunicorn_local.py index 6987041be..49a30682d 100644 --- a/conf/gunicorn_local.py +++ b/conf/gunicorn_local.py @@ -1,7 +1,6 @@ bind = '0.0.0.0:5000' workers = 2 worker_class = 'gevent' -timeout = 2000 daemon = False logconfig = 'conf/logging_debug.conf' pythonpath = '.' diff --git a/conf/gunicorn_registry.py b/conf/gunicorn_registry.py index 4f7bb37f2..944608868 100644 --- a/conf/gunicorn_registry.py +++ b/conf/gunicorn_registry.py @@ -1,7 +1,6 @@ bind = 'unix:/tmp/gunicorn_registry.sock' workers = 8 worker_class = 'gevent' -timeout = 2000 logconfig = 'conf/logging.conf' pythonpath = '.' preload_app = True diff --git a/conf/gunicorn_verbs.py b/conf/gunicorn_verbs.py index eaf8041df..f329a8cbe 100644 --- a/conf/gunicorn_verbs.py +++ b/conf/gunicorn_verbs.py @@ -1,6 +1,5 @@ bind = 'unix:/tmp/gunicorn_verbs.sock' workers = 4 -timeout = 2000 logconfig = 'conf/logging.conf' pythonpath = '.' preload_app = True diff --git a/conf/gunicorn_web.py b/conf/gunicorn_web.py index bdfa8001a..cb9f78d24 100644 --- a/conf/gunicorn_web.py +++ b/conf/gunicorn_web.py @@ -1,7 +1,6 @@ bind = 'unix:/tmp/gunicorn_web.sock' workers = 2 worker_class = 'gevent' -timeout = 30 logconfig = 'conf/logging.conf' pythonpath = '.' preload_app = True diff --git a/conf/proxy-server-base.conf b/conf/proxy-server-base.conf index fb2f3f962..6230dbfd8 100644 --- a/conf/proxy-server-base.conf +++ b/conf/proxy-server-base.conf @@ -34,7 +34,6 @@ location /v1/repositories/ { proxy_request_buffering off; proxy_pass http://registry_app_server; - proxy_read_timeout 2000; proxy_temp_path /var/log/nginx/proxy_temp 1 2; client_max_body_size 20G; @@ -48,7 +47,6 @@ location /v1/ { proxy_request_buffering off; proxy_pass http://registry_app_server; - proxy_read_timeout 2000; proxy_temp_path /var/log/nginx/proxy_temp 1 2; client_max_body_size 20G; @@ -60,7 +58,6 @@ location /c1/ { proxy_request_buffering off; proxy_pass http://verbs_app_server; - proxy_read_timeout 2000; proxy_temp_path /var/log/nginx/proxy_temp 1 2; limit_req zone=api burst=5 nodelay; @@ -80,7 +77,6 @@ location /v1/_ping { location ~ ^/b1/controller(/?)(.*) { proxy_pass http://build_manager_controller_server/$2; - proxy_read_timeout 2000; } location ~ ^/b1/socket(/?)(.*) { diff --git a/conf/server-base.conf b/conf/server-base.conf index d5b211c52..4122a99eb 100644 --- a/conf/server-base.conf +++ b/conf/server-base.conf @@ -35,7 +35,6 @@ location /v1/ { proxy_request_buffering off; proxy_pass http://registry_app_server; - proxy_read_timeout 2000; proxy_temp_path /var/log/nginx/proxy_temp 1 2; client_max_body_size 20G; @@ -47,7 +46,6 @@ location /c1/ { proxy_request_buffering off; proxy_pass http://verbs_app_server; - proxy_read_timeout 2000; proxy_temp_path /var/log/nginx/proxy_temp 1 2; } @@ -65,7 +63,6 @@ location /v1/_ping { location ~ ^/b1/controller(/?)(.*) { proxy_pass http://build_manager_controller_server/$2; - proxy_read_timeout 2000; } location ~ ^/b1/socket(/?)(.*) { diff --git a/config.py b/config.py index 6a6c9c2e6..4bb44fca2 100644 --- a/config.py +++ b/config.py @@ -194,4 +194,7 @@ class DefaultConfig(object): SYSTEM_SERVICES_PATH = "conf/init/" # Services that should not be shown in the logs view. - SYSTEM_SERVICE_BLACKLIST = [] \ No newline at end of file + SYSTEM_SERVICE_BLACKLIST = [] + + # Temporary tag expiration in seconds, this may actually be longer based on GC policy + PUSH_TEMP_TAG_EXPIRATION_SEC = 60 * 60 diff --git a/data/database.py b/data/database.py index d23157c0c..162057530 100644 --- a/data/database.py +++ b/data/database.py @@ -469,6 +469,7 @@ class RepositoryTag(BaseModel): repository = ForeignKeyField(Repository) lifetime_start_ts = IntegerField(default=_get_epoch_timestamp) lifetime_end_ts = IntegerField(null=True, index=True) + hidden = BooleanField(default=False) class Meta: database = db diff --git a/data/migrations/versions/4ef04c61fcf9_allow_tags_to_be_marked_as_hidden.py b/data/migrations/versions/4ef04c61fcf9_allow_tags_to_be_marked_as_hidden.py new file mode 100644 index 000000000..e4fc1ea5e --- /dev/null +++ b/data/migrations/versions/4ef04c61fcf9_allow_tags_to_be_marked_as_hidden.py @@ -0,0 +1,26 @@ +"""Allow tags to be marked as hidden. + +Revision ID: 4ef04c61fcf9 +Revises: 509d2857566f +Create Date: 2015-02-18 16:34:16.586129 + +""" + +# revision identifiers, used by Alembic. +revision = '4ef04c61fcf9' +down_revision = '509d2857566f' + +from alembic import op +import sqlalchemy as sa + + +def upgrade(tables): + ### commands auto generated by Alembic - please adjust! ### + op.add_column('repositorytag', sa.Column('hidden', sa.Boolean(), nullable=False, server_default=sa.sql.expression.false())) + ### end Alembic commands ### + + +def downgrade(tables): + ### commands auto generated by Alembic - please adjust! ### + op.drop_column('repositorytag', 'hidden') + ### end Alembic commands ### diff --git a/data/model/legacy.py b/data/model/legacy.py index fe675767e..331bf2720 100644 --- a/data/model/legacy.py +++ b/data/model/legacy.py @@ -5,6 +5,7 @@ import json import time from datetime import datetime, timedelta, date +from uuid import uuid4 from data.database import (User, Repository, Image, AccessToken, Role, RepositoryPermission, Visibility, RepositoryTag, EmailConfirmation, FederatedLogin, @@ -1561,15 +1562,20 @@ def _tag_alive(query): (RepositoryTag.lifetime_end_ts > int(time.time()))) -def list_repository_tags(namespace_name, repository_name): - return _tag_alive(RepositoryTag - .select(RepositoryTag, Image) - .join(Repository) - .join(Namespace, on=(Repository.namespace_user == Namespace.id)) - .switch(RepositoryTag) - .join(Image) - .where(Repository.name == repository_name, - Namespace.username == namespace_name)) +def list_repository_tags(namespace_name, repository_name, include_hidden=False): + query = _tag_alive(RepositoryTag + .select(RepositoryTag, Image) + .join(Repository) + .join(Namespace, on=(Repository.namespace_user == Namespace.id)) + .switch(RepositoryTag) + .join(Image) + .where(Repository.name == repository_name, + Namespace.username == namespace_name)) + + if not include_hidden: + query = query.where(RepositoryTag.hidden == False) + + return query def _garbage_collect_tags(namespace_name, repository_name): @@ -1628,8 +1634,6 @@ def garbage_collect_repository(namespace_name, repository_name): logger.info('Garbage collecting storage for images: %s', to_remove) _garbage_collect_storage(storage_id_whitelist) - return len(to_remove) - def _garbage_collect_storage(storage_id_whitelist): if len(storage_id_whitelist) == 0: @@ -1788,10 +1792,8 @@ def create_or_update_tag(namespace_name, repository_name, tag_name, # No tag that needs to be ended pass - tag = RepositoryTag.create(repository=repo, image=image, name=tag_name, - lifetime_start_ts=now_ts) - - return tag + return RepositoryTag.create(repository=repo, image=image, name=tag_name, + lifetime_start_ts=now_ts) def delete_tag(namespace_name, repository_name, tag_name): @@ -1814,6 +1816,17 @@ def delete_tag(namespace_name, repository_name, tag_name): found.save() +def create_temporary_hidden_tag(repo, image, expiration_s): + """ Create a tag with a defined timeline, that will not appear in the UI or CLI. Returns the name + of the temporary tag. """ + now_ts = int(time.time()) + expire_ts = now_ts + expiration_s + tag_name = str(uuid4()) + RepositoryTag.create(repository=repo, image=image, name=tag_name, lifetime_start_ts=now_ts, + lifetime_end_ts=expire_ts, hidden=True) + return tag_name + + def purge_all_repository_tags(namespace_name, repository_name): """ Immediately purge all repository tags without respecting the lifeline procedure """ try: diff --git a/endpoints/index.py b/endpoints/index.py index d1a902915..f2e1f7411 100644 --- a/endpoints/index.py +++ b/endpoints/index.py @@ -222,32 +222,6 @@ def create_repository(namespace, repository): repo = model.create_repository(namespace, repository, get_authenticated_user()) - logger.debug('Determining already added images') - added_images = OrderedDict([(desc['id'], desc) for desc in image_descriptions]) - new_repo_images = dict(added_images) - - # Optimization: Lookup any existing images in the repository with matching docker IDs and - # remove them from the added dict, so we don't need to look them up one-by-one. - def chunks(l, n): - for i in xrange(0, len(l), n): - yield l[i:i+n] - - # Note: We do this in chunks in an effort to not hit the SQL query size limit. - for chunk in chunks(new_repo_images.keys(), 50): - existing_images = model.lookup_repository_images(namespace, repository, chunk) - for existing in existing_images: - added_images.pop(existing.docker_image_id) - - logger.debug('Creating/Linking necessary images') - username = get_authenticated_user() and get_authenticated_user().username - translations = {} - for image_description in added_images.values(): - model.find_create_or_link_image(image_description['id'], repo, username, - translations, storage.preferred_locations[0]) - - - logger.debug('Created images') - track_and_log('push_repo', repo) return make_response('Created', 201) @@ -280,7 +254,7 @@ def update_images(namespace, repository): event.publish_event_data('docker-cli', user_data) logger.debug('GCing repository') - num_removed = model.garbage_collect_repository(namespace, repository) + model.garbage_collect_repository(namespace, repository) # Generate a job for each notification that has been added to this repo logger.debug('Adding notifications for repository') @@ -288,8 +262,8 @@ def update_images(namespace, repository): updated_tags = session.get('pushed_tags', {}) event_data = { 'updated_tags': updated_tags, - 'pruned_image_count': num_removed } + track_and_log('push_repo', repo) spawn_notification(repo, 'repo_push', event_data) return make_response('Updated', 204) diff --git a/endpoints/registry.py b/endpoints/registry.py index 5178f3a83..c901eed5b 100644 --- a/endpoints/registry.py +++ b/endpoints/registry.py @@ -9,6 +9,7 @@ from time import time from app import storage as store, image_diff_queue, app from auth.auth import process_auth, extract_namespace_repo_from_session +from auth.auth_context import get_authenticated_user from util import checksums, changes from util.http import abort, exact_abort from auth.permissions import (ReadRepositoryPermission, @@ -456,9 +457,19 @@ def put_image_json(namespace, repository, image_id): logger.debug('Looking up repo image') repo_image = model.get_repo_image_extended(namespace, repository, image_id) if not repo_image: - logger.debug('Image not found') - abort(404, 'Image %(image_id)s not found', issue='unknown-image', - image_id=image_id) + logger.debug('Image not found, creating image') + repo = model.get_repository(namespace, repository) + if repo is None: + abort(404, 'Repository does not exist: %(namespace)s/%(repository)s', issue='no-repo', + namespace=namespace, repository=repository) + + username = get_authenticated_user() and get_authenticated_user().username + repo_image = model.find_create_or_link_image(image_id, repo, username, {}, + store.preferred_locations[0]) + + # Create a temporary tag to prevent this image from getting garbage collected while the push + # is in progress. + model.create_temporary_hidden_tag(repo, repo_image, app.config['PUSH_TEMP_TAG_EXPIRATION_SEC']) uuid = repo_image.storage.uuid diff --git a/local-test.sh b/local-test.sh index a54491969..2a85148bf 100755 --- a/local-test.sh +++ b/local-test.sh @@ -1 +1 @@ -TEST=true python -m unittest discover +TEST=true TROLLIUSDEBUG=1 python -m unittest discover diff --git a/test/data/test.db b/test/data/test.db index 0856c2f6a..4da80c978 100644 Binary files a/test/data/test.db and b/test/data/test.db differ diff --git a/util/cloudwatch.py b/util/cloudwatch.py new file mode 100644 index 000000000..b75dadf31 --- /dev/null +++ b/util/cloudwatch.py @@ -0,0 +1,47 @@ +import logging +import boto + +from Queue import Queue +from threading import Thread + + +logger = logging.getLogger(__name__) + +def get_queue(app): + """ + Returns a queue to the CloudWatchSender. If a queue/sender do not exist, creates them. + """ + access_key = app.config['CLOUDWATCH_AWS_ACCESS_KEY'] + secret_key = app.config['CLOUDWATCH_AWS_SECRET_KEY'] + + queue = Queue() + sender = CloudWatchSender(queue, access_key, secret_key) + sender.start() + return queue + +class CloudWatchSender(Thread): + """ + CloudWatchSender loops indefinitely and pulls metrics off of a queue then sends it to CloudWatch. + """ + def __init__(self, request_queue, aws_access_key, aws_secret_key): + Thread.__init__(self) + self.daemon = True + + self._aws_access_key = aws_access_key + self._aws_secret_key = aws_secret_key + self._put_metrics_queue = request_queue + + def run(self): + try: + logger.debug('Starting CloudWatch sender process.') + connection = boto.connect_cloudwatch(self._aws_access_key, self._aws_secret_key) + except: + logger.exception('Failed to connect to CloudWatch.') + + while True: + put_metric_args, kwargs = self._put_metrics_queue.get() + logger.debug('Got queued put metrics request.') + try: + connection.put_metric_data(*put_metric_args, **kwargs) + except: + logger.exception('Failed to write to CloudWatch') diff --git a/util/queuemetrics.py b/util/queuemetrics.py index 1bebfa3a6..9e0a549f4 100644 --- a/util/queuemetrics.py +++ b/util/queuemetrics.py @@ -1,8 +1,6 @@ import logging -import boto -from Queue import Queue -from threading import Thread +from util.cloudwatch import get_queue logger = logging.getLogger(__name__) @@ -13,8 +11,8 @@ class NullReporter(object): pass -class QueueingCloudWatchReporter(object): - """ QueueingCloudWatchReporter reports metrics to the "SendToCloudWatch" process """ +class CloudWatchReporter(object): + """ CloudWatchReporter reports work queue metrics to CloudWatch """ def __init__(self, request_queue, namespace, need_capacity_name, build_percent_name): self._namespace = namespace self._need_capacity_name = need_capacity_name @@ -36,70 +34,23 @@ class QueueingCloudWatchReporter(object): self._send_to_queue(self._namespace, self._build_percent_name, building_percent, unit='Percent') - -class SendToCloudWatch(Thread): - """ SendToCloudWatch loops indefinitely and pulls metrics off of a queue then sends it to - CloudWatch. """ - def __init__(self, request_queue, aws_access_key, aws_secret_key): - Thread.__init__(self) - self.daemon = True - - self._aws_access_key = aws_access_key - self._aws_secret_key = aws_secret_key - self._put_metrics_queue = request_queue - - def run(self): - try: - logger.debug('Starting CloudWatch sender process.') - connection = boto.connect_cloudwatch(self._aws_access_key, self._aws_secret_key) - except: - logger.exception('Failed to connect to CloudWatch.') - - while True: - put_metric_args, kwargs = self._put_metrics_queue.get() - logger.debug('Got queued put metrics request.') - try: - connection.put_metric_data(*put_metric_args, **kwargs) - except: - logger.exception('Failed to write to CloudWatch') - - class QueueMetrics(object): + """ + QueueMetrics initializes a reporter for recording metrics of work queues. + """ def __init__(self, app=None): - self.app = app - self.sender = None + self._app = app + self._reporter = NullReporter() if app is not None: - self.state = self.init_app(app) - else: - self.state = None + reporter_type = app.config.get('QUEUE_METRICS_TYPE', 'Null') + if reporter_type == 'CloudWatch': + namespace = app.config['QUEUE_METRICS_NAMESPACE'] + req_capacity_name = app.config['QUEUE_METRICS_CAPACITY_SHORTAGE_NAME'] + build_percent_name = app.config['QUEUE_METRICS_BUILD_PERCENT_NAME'] - def init_app(self, app): - analytics_type = app.config.get('QUEUE_METRICS_TYPE', 'Null') - - if analytics_type == 'CloudWatch': - access_key = app.config.get('QUEUE_METRICS_AWS_ACCESS_KEY') - secret_key = app.config.get('QUEUE_METRICS_AWS_SECRET_KEY') - namespace = app.config.get('QUEUE_METRICS_NAMESPACE') - req_capacity_name = app.config.get('QUEUE_METRICS_CAPACITY_SHORTAGE_NAME') - build_percent_name = app.config.get('QUEUE_METRICS_BUILD_PERCENT_NAME') - - request_queue = Queue() - reporter = QueueingCloudWatchReporter(request_queue, namespace, req_capacity_name, - build_percent_name) - self.sender = SendToCloudWatch(request_queue, access_key, secret_key) - else: - reporter = NullReporter() - - # register extension with app - app.extensions = getattr(app, 'extensions', {}) - app.extensions['queuemetrics'] = reporter - return reporter - - def run(self): - logger.debug('Asked to start CloudWatch reporter') - if self.sender is not None: - logger.debug('Starting CloudWatch reporter') - self.sender.start() + request_queue = get_queue(app) + self._reporter = CloudWatchReporter(request_queue, namespace, req_capacity_name, + build_percent_name) def __getattr__(self, name): - return getattr(self.state, name, None) + return getattr(self._reporter, name, None) diff --git a/web.py b/web.py index 92b3d6758..7c945cc45 100644 --- a/web.py +++ b/web.py @@ -1,7 +1,7 @@ import logging import logging.config -from app import app as application, queue_metrics +from app import app as application from endpoints.api import api_bp from endpoints.web import web @@ -9,9 +9,6 @@ from endpoints.webhooks import webhooks from endpoints.realtime import realtime from endpoints.callbacks import callback -# Start the cloudwatch reporting. -queue_metrics.run() - application.register_blueprint(web) application.register_blueprint(callback, url_prefix='/oauth2') application.register_blueprint(api_bp, url_prefix='/api')