diff --git a/app.py b/app.py index f1b029b55..3fa1c3961 100644 --- a/app.py +++ b/app.py @@ -12,8 +12,10 @@ from storage import Storage from data.userfiles import Userfiles from util.analytics import Analytics from util.exceptionlog import Sentry +from util.queuemetrics import QueueMetrics from data.billing import Billing from data.buildlogs import BuildLogs +from data.queue import WorkQueue OVERRIDE_CONFIG_FILENAME = 'conf/stack/config.py' @@ -48,3 +50,9 @@ analytics = Analytics(app) billing = Billing(app) sentry = Sentry(app) build_logs = BuildLogs(app) +queue_metrics = QueueMetrics(app) + +image_diff_queue = WorkQueue(app.config['DIFFS_QUEUE_NAME']) +dockerfile_build_queue = WorkQueue(app.config['DOCKERFILE_BUILD_QUEUE_NAME'], + reporter=queue_metrics.report) +webhook_queue = WorkQueue(app.config['WEBHOOK_QUEUE_NAME']) diff --git a/config.py b/config.py index 54650e566..85d88239f 100644 --- a/config.py +++ b/config.py @@ -86,7 +86,10 @@ class DefaultConfig(object): USERFILES_PATH = 'test/data/registry/userfiles' # Analytics - ANALYTICS_TYPE = "FakeAnalytics" + ANALYTICS_TYPE = 'FakeAnalytics' + + # Build Queue Metrics + QUEUE_METRICS_TYPE = 'Null' # Exception logging EXCEPTION_LOG_TYPE = 'FakeSentry' diff --git a/data/queue.py b/data/queue.py index 61a03a631..4a074e44c 100644 --- a/data/queue.py +++ b/data/queue.py @@ -11,18 +11,53 @@ MINIMUM_EXTENSION = timedelta(seconds=20) class WorkQueue(object): - def __init__(self, queue_name, canonical_name_match_list=None): - self.queue_name = queue_name + def __init__(self, queue_name, canonical_name_match_list=None, reporter=None): + self._queue_name = queue_name + self._reporter = reporter if canonical_name_match_list is None: - self.canonical_name_match_list = [] + self._canonical_name_match_list = [] else: - self.canonical_name_match_list = canonical_name_match_list + self._canonical_name_match_list = canonical_name_match_list @staticmethod def _canonical_name(name_list): return '/'.join(name_list) + '/' + def _running_jobs(self, now, name_match_query): + return (QueueItem + .select(QueueItem.queue_name) + .where(QueueItem.available == False, + QueueItem.processing_expires > now, + QueueItem.queue_name ** name_match_query)) + + def _name_match_query(self): + return '%s%%' % self._canonical_name([self._queue_name] + self._canonical_name_match_list) + + def _report_queue_metrics(self): + if self._reporter is None: + return + + now = datetime.now() + name_match_query = self._name_match_query() + + total_jobs = (QueueItem + .select(QueueItem.queue_name) + .where(QueueItem.queue_name ** name_match_query, + QueueItem.available_after <= now, + ((QueueItem.available == True) | (QueueItem.processing_expires > now) | + (QueueItem.retries_remaining > 0))) + .distinct() + .count()) + + running = self._running_jobs(now, name_match_query).distinct().count() + + self._reporter(running, total_jobs) + + def update_metrics(self): + with transaction_factory(db): + self._report_queue_metrics() + def put(self, canonical_name_list, message, available_after=0, retries_remaining=5): """ Put an item, if it shouldn't be processed for some number of seconds, @@ -30,7 +65,7 @@ class WorkQueue(object): """ params = { - 'queue_name': self._canonical_name([self.queue_name] + canonical_name_list), + 'queue_name': self._canonical_name([self._queue_name] + canonical_name_list), 'body': message, 'retries_remaining': retries_remaining, } @@ -39,7 +74,9 @@ class WorkQueue(object): available_date = datetime.now() + timedelta(seconds=available_after) params['available_after'] = available_date - QueueItem.create(**params) + with transaction_factory(db): + QueueItem.create(**params) + self._report_queue_metrics() def get(self, processing_time=300): """ @@ -48,15 +85,10 @@ class WorkQueue(object): """ now = datetime.now() - name_match_query = '%s%%' % self._canonical_name([self.queue_name] + - self.canonical_name_match_list) + name_match_query = self._name_match_query() with transaction_factory(db): - running = (QueueItem - .select(QueueItem.queue_name) - .where(QueueItem.available == False, - QueueItem.processing_expires > now, - QueueItem.queue_name ** name_match_query)) + running = self._running_jobs(now, name_match_query) avail = QueueItem.select().where(QueueItem.queue_name ** name_match_query, QueueItem.available_after <= now, @@ -67,6 +99,8 @@ class WorkQueue(object): found = list(avail.limit(1).order_by(QueueItem.id)) + item = None + if found: item = found[0] item.available = False @@ -74,24 +108,26 @@ class WorkQueue(object): item.retries_remaining -= 1 item.save() - return item + self._report_queue_metrics() - return None + return item - @staticmethod - def complete(completed_item): - completed_item.delete_instance() + def complete(self, completed_item): + with transaction_factory(db): + completed_item.delete_instance() + self._report_queue_metrics() - @staticmethod - def incomplete(incomplete_item, retry_after=300, restore_retry=False): - retry_date = datetime.now() + timedelta(seconds=retry_after) - incomplete_item.available_after = retry_date - incomplete_item.available = True + def incomplete(self, incomplete_item, retry_after=300, restore_retry=False): + with transaction_factory(db): + retry_date = datetime.now() + timedelta(seconds=retry_after) + incomplete_item.available_after = retry_date + incomplete_item.available = True - if restore_retry: - incomplete_item.retries_remaining += 1 + if restore_retry: + incomplete_item.retries_remaining += 1 - incomplete_item.save() + incomplete_item.save() + self._report_queue_metrics() @staticmethod def extend_processing(queue_item, seconds_from_now): @@ -101,8 +137,3 @@ class WorkQueue(object): if new_expiration - queue_item.processing_expires > MINIMUM_EXTENSION: queue_item.processing_expires = new_expiration queue_item.save() - - -image_diff_queue = WorkQueue(app.config['DIFFS_QUEUE_NAME']) -dockerfile_build_queue = WorkQueue(app.config['DOCKERFILE_BUILD_QUEUE_NAME']) -webhook_queue = WorkQueue(app.config['WEBHOOK_QUEUE_NAME']) diff --git a/endpoints/common.py b/endpoints/common.py index e9bd7b7c6..ad2f3e66b 100644 --- a/endpoints/common.py +++ b/endpoints/common.py @@ -9,8 +9,7 @@ from flask.ext.principal import identity_changed from random import SystemRandom from data import model -from data.queue import dockerfile_build_queue -from app import app, login_manager +from app import app, login_manager, dockerfile_build_queue from auth.permissions import QuayDeferredPermissionUser from auth import scopes from endpoints.api.discovery import swagger_route_data diff --git a/endpoints/index.py b/endpoints/index.py index 6ebec2d6c..25013f05e 100644 --- a/endpoints/index.py +++ b/endpoints/index.py @@ -8,8 +8,7 @@ from collections import OrderedDict from data import model from data.model import oauth -from data.queue import webhook_queue -from app import analytics, app +from app import analytics, app, webhook_queue from auth.auth import process_auth from auth.auth_context import get_authenticated_user, get_validated_token, get_validated_oauth_token from util.names import parse_repository_name diff --git a/endpoints/registry.py b/endpoints/registry.py index 6c9800f5c..b2018a2b0 100644 --- a/endpoints/registry.py +++ b/endpoints/registry.py @@ -7,9 +7,7 @@ from functools import wraps from datetime import datetime from time import time -from data.queue import image_diff_queue - -from app import storage as store +from app import storage as store, image_diff_queue from auth.auth import process_auth, extract_namespace_repo_from_session from util import checksums, changes from util.http import abort diff --git a/test/test_queue.py b/test/test_queue.py index c29568951..433a350d8 100644 --- a/test/test_queue.py +++ b/test/test_queue.py @@ -9,12 +9,23 @@ from data.queue import WorkQueue QUEUE_NAME = 'testqueuename' +class SaveLastCountReporter(object): + def __init__(self): + self.running = None + self.total = None + + def __call__(self, running, total_jobs): + self.running = running + self.total = total_jobs + + class QueueTestCase(unittest.TestCase): TEST_MESSAGE_1 = json.dumps({'data': 1}) TEST_MESSAGE_2 = json.dumps({'data': 2}) def setUp(self): - self.queue = WorkQueue(QUEUE_NAME) + self.reporter = SaveLastCountReporter() + self.queue = WorkQueue(QUEUE_NAME, reporter=self.reporter) setup_database_for_testing(self) def tearDown(self): @@ -23,33 +34,52 @@ class QueueTestCase(unittest.TestCase): class TestQueue(QueueTestCase): def test_same_canonical_names(self): + self.assertEqual(self.reporter.running, None) + self.assertEqual(self.reporter.total, None) + self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1) self.queue.put(['abc', 'def'], self.TEST_MESSAGE_2) + self.assertEqual(self.reporter.running, 0) + self.assertEqual(self.reporter.total, 1) one = self.queue.get() self.assertNotEqual(None, one) self.assertEqual(self.TEST_MESSAGE_1, one.body) + self.assertEqual(self.reporter.running, 1) + self.assertEqual(self.reporter.total, 1) two_fail = self.queue.get() self.assertEqual(None, two_fail) + self.assertEqual(self.reporter.running, 1) + self.assertEqual(self.reporter.total, 1) self.queue.complete(one) + self.assertEqual(self.reporter.running, 0) + self.assertEqual(self.reporter.total, 1) two = self.queue.get() self.assertNotEqual(None, two) self.assertEqual(self.TEST_MESSAGE_2, two.body) + self.assertEqual(self.reporter.running, 1) + self.assertEqual(self.reporter.total, 1) def test_different_canonical_names(self): self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1) self.queue.put(['abc', 'ghi'], self.TEST_MESSAGE_2) + self.assertEqual(self.reporter.running, 0) + self.assertEqual(self.reporter.total, 2) one = self.queue.get() self.assertNotEqual(None, one) self.assertEqual(self.TEST_MESSAGE_1, one.body) + self.assertEqual(self.reporter.running, 1) + self.assertEqual(self.reporter.total, 2) two = self.queue.get() self.assertNotEqual(None, two) self.assertEqual(self.TEST_MESSAGE_2, two.body) + self.assertEqual(self.reporter.running, 2) + self.assertEqual(self.reporter.total, 2) def test_canonical_name(self): self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1) @@ -63,17 +93,26 @@ class TestQueue(QueueTestCase): def test_expiration(self): self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1) + self.assertEqual(self.reporter.running, 0) + self.assertEqual(self.reporter.total, 1) one = self.queue.get(processing_time=0.5) self.assertNotEqual(None, one) + self.assertEqual(self.reporter.running, 1) + self.assertEqual(self.reporter.total, 1) one_fail = self.queue.get() self.assertEqual(None, one_fail) time.sleep(1) + self.queue.update_metrics() + self.assertEqual(self.reporter.running, 0) + self.assertEqual(self.reporter.total, 1) one_again = self.queue.get() self.assertNotEqual(None, one_again) + self.assertEqual(self.reporter.running, 1) + self.assertEqual(self.reporter.total, 1) def test_specialized_queue(self): self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1) diff --git a/util/queuemetrics.py b/util/queuemetrics.py new file mode 100644 index 000000000..bc4784b4e --- /dev/null +++ b/util/queuemetrics.py @@ -0,0 +1,51 @@ +import logging +import boto + + +logger = logging.getLogger(__name__) + + +class NullReporter(object): + def report(self, running_count, total_count): + pass + + +class CloudWatchReporter(object): + def __init__(self, aws_access_key, aws_secret_key, namespace, name): + self._connection = boto.connect_cloudwatch(aws_access_key, aws_secret_key) + self._namespace = namespace + self._name = name + + def report(self, running_count, total_count): + need_capacity_count = total_count - running_count + self._connection.put_metric_data(self._namespace, self._name, need_capacity_count, + unit='Count') + + +class QueueMetrics(object): + def __init__(self, app=None): + self.app = app + if app is not None: + self.state = self.init_app(app) + else: + self.state = None + + def init_app(self, app): + analytics_type = app.config.get('QUEUE_METRICS_TYPE', 'Null') + + if analytics_type == 'CloudWatch': + access_key = app.config.get('QUEUE_METRICS_AWS_ACCESS_KEY', '') + secret_key = app.config.get('QUEUE_METRICS_AWS_SECRET_KEY', '') + namespace = app.config.get('QUEUE_METRICS_NAMESPACE', '') + name = app.config.get('QUEUE_METRICS_NAME', '') + reporter = CloudWatchReporter(access_key, secret_key, namespace, name) + else: + reporter = NullReporter() + + # register extension with app + app.extensions = getattr(app, 'extensions', {}) + app.extensions['queuemetrics'] = reporter + return reporter + + def __getattr__(self, name): + return getattr(self.state, name, None) diff --git a/workers/diffsworker.py b/workers/diffsworker.py index 85b615cbe..70f74f1db 100644 --- a/workers/diffsworker.py +++ b/workers/diffsworker.py @@ -1,7 +1,7 @@ import logging import argparse -from data.queue import image_diff_queue +from app import image_diff_queue from data.model import DataModelException from endpoints.registry import process_image_changes from workers.worker import Worker diff --git a/workers/dockerfilebuild.py b/workers/dockerfilebuild.py index dbd9ab3a4..bbd326775 100644 --- a/workers/dockerfilebuild.py +++ b/workers/dockerfilebuild.py @@ -18,10 +18,9 @@ from threading import Event from uuid import uuid4 from collections import defaultdict -from data.queue import dockerfile_build_queue from data import model from workers.worker import Worker, WorkerUnhealthyException, JobException -from app import userfiles as user_files, build_logs, sentry +from app import userfiles as user_files, build_logs, sentry, dockerfile_build_queue from util.safetar import safe_extractall from util.dockerfileparse import parse_dockerfile, ParsedDockerfile, serialize_dockerfile diff --git a/workers/webhookworker.py b/workers/webhookworker.py index 2b785acb6..ccff884c2 100644 --- a/workers/webhookworker.py +++ b/workers/webhookworker.py @@ -3,7 +3,7 @@ import argparse import requests import json -from data.queue import webhook_queue +from app import webhook_queue from workers.worker import Worker