From 0b6552d6ccf2a08b46669459fdf95fc47ad6038c Mon Sep 17 00:00:00 2001 From: Jake Moshenko Date: Fri, 23 May 2014 14:16:26 -0400 Subject: [PATCH] Fix the metrics so they are usable for scaling the workers down and up. Switch all datetimes which touch the database from now to utcnow. Fix the worker Dockerfile. --- Dockerfile.buildworker | 2 ++ auth/auth.py | 2 +- data/billing.py | 8 ++--- data/model/oauth.py | 6 ++-- data/queue.py | 51 ++++++++++++++--------------- test/test_queue.py | 66 +++++++++++++++++++++++++++----------- util/analytics.py | 2 +- util/queuemetrics.py | 59 ++++++++++++++++++++++++++-------- workers/dockerfilebuild.py | 12 +++---- workers/worker.py | 4 +++ 10 files changed, 137 insertions(+), 75 deletions(-) diff --git a/Dockerfile.buildworker b/Dockerfile.buildworker index 4ad6ba6ff..d49387dfa 100644 --- a/Dockerfile.buildworker +++ b/Dockerfile.buildworker @@ -8,6 +8,8 @@ RUN apt-get install -y git python-virtualenv python-dev libjpeg8 libjpeg62-dev l ### End common section ### +RUN apt-get install -y libldap2-dev libsasl2-dev + RUN apt-get install -y lxc aufs-tools RUN usermod -v 100000-200000 -w 100000-200000 root diff --git a/auth/auth.py b/auth/auth.py index 715b5a0dd..3616792ad 100644 --- a/auth/auth.py +++ b/auth/auth.py @@ -41,7 +41,7 @@ def _validate_and_apply_oauth_token(token): } abort(401, message='OAuth access token could not be validated: %(token)s', issue='invalid-oauth-token', token=token, headers=authenticate_header) - elif validated.expires_at <= datetime.now(): + elif validated.expires_at <= datetime.utcnow(): logger.info('OAuth access with an expired token: %s', token) authenticate_header = { 'WWW-Authenticate': ('Bearer error="invalid_token", ' diff --git a/data/billing.py b/data/billing.py index 8872ad87f..723760210 100644 --- a/data/billing.py +++ b/data/billing.py @@ -131,10 +131,10 @@ class FakeStripe(object): FAKE_SUBSCRIPTION = AttrDict({ 'plan': FAKE_PLAN, - 'current_period_start': timegm(datetime.now().utctimetuple()), - 'current_period_end': timegm((datetime.now() + timedelta(days=30)).utctimetuple()), - 'trial_start': timegm(datetime.now().utctimetuple()), - 'trial_end': timegm((datetime.now() + timedelta(days=30)).utctimetuple()), + 'current_period_start': timegm(datetime.utcnow().utctimetuple()), + 'current_period_end': timegm((datetime.utcnow() + timedelta(days=30)).utctimetuple()), + 'trial_start': timegm(datetime.utcnow().utctimetuple()), + 'trial_end': timegm((datetime.utcnow() + timedelta(days=30)).utctimetuple()), }) FAKE_CARD = AttrDict({ diff --git a/data/model/oauth.py b/data/model/oauth.py index b99a9cb58..309e2122a 100644 --- a/data/model/oauth.py +++ b/data/model/oauth.py @@ -65,7 +65,7 @@ class DatabaseAuthorizationProvider(AuthorizationProvider): .switch(OAuthAccessToken) .join(User) .where(OAuthApplication.client_id == client_id, User.username == username, - OAuthAccessToken.expires_at > datetime.now())) + OAuthAccessToken.expires_at > datetime.utcnow())) found = list(found) logger.debug('Found %s matching tokens.', len(found)) long_scope_string = ','.join([token.scope for token in found]) @@ -116,7 +116,7 @@ class DatabaseAuthorizationProvider(AuthorizationProvider): raise RuntimeError('Username must be in the data field') app = OAuthApplication.get(client_id=client_id) - expires_at = datetime.now() + timedelta(seconds=expires_in) + expires_at = datetime.utcnow() + timedelta(seconds=expires_in) OAuthAccessToken.create(application=app, authorized_user=user, scope=scope, access_token=access_token, token_type=token_type, expires_at=expires_at, refresh_token=refresh_token, data=data) @@ -274,7 +274,7 @@ def list_applications_for_org(org): def create_access_token_for_testing(user, client_id, scope): - expires_at = datetime.now() + timedelta(seconds=10000) + expires_at = datetime.utcnow() + timedelta(seconds=10000) application = get_application_for_client_id(client_id) OAuthAccessToken.create(application=application, authorized_user=user, scope=scope, token_type='token', access_token='test', diff --git a/data/queue.py b/data/queue.py index be0c8301b..44d7ad531 100644 --- a/data/queue.py +++ b/data/queue.py @@ -12,6 +12,7 @@ class WorkQueue(object): self._queue_name = queue_name self._reporter = reporter self._transaction_factory = transaction_factory + self._currently_processing = False if canonical_name_match_list is None: self._canonical_name_match_list = [] @@ -39,25 +40,21 @@ class WorkQueue(object): def _name_match_query(self): return '%s%%' % self._canonical_name([self._queue_name] + self._canonical_name_match_list) - def _report_queue_metrics(self): - if self._reporter is None: - return - - now = datetime.now() - name_match_query = self._name_match_query() - - running_query = self._running_jobs(now, name_match_query) - running_count =running_query.distinct().count() - - avialable_query = self._available_jobs(now, name_match_query, running_query) - available_count = avialable_query.select(QueueItem.queue_name).distinct().count() - - - self._reporter(running_count, running_count + available_count) - def update_metrics(self): with self._transaction_factory(db): - self._report_queue_metrics() + if self._reporter is None: + return + + now = datetime.utcnow() + name_match_query = self._name_match_query() + + running_query = self._running_jobs(now, name_match_query) + running_count =running_query.distinct().count() + + avialable_query = self._available_jobs(now, name_match_query, running_query) + available_count = avialable_query.select(QueueItem.queue_name).distinct().count() + + self._reporter(self._currently_processing, running_count, running_count + available_count) def put(self, canonical_name_list, message, available_after=0, retries_remaining=5): """ @@ -72,19 +69,18 @@ class WorkQueue(object): } if available_after: - available_date = datetime.now() + timedelta(seconds=available_after) + available_date = datetime.utcnow() + timedelta(seconds=available_after) params['available_after'] = available_date with self._transaction_factory(db): QueueItem.create(**params) - self._report_queue_metrics() def get(self, processing_time=300): """ Get an available item and mark it as unavailable for the default of five minutes. """ - now = datetime.now() + now = datetime.utcnow() name_match_query = self._name_match_query() @@ -99,21 +95,22 @@ class WorkQueue(object): item.processing_expires = now + timedelta(seconds=processing_time) item.retries_remaining -= 1 item.save() - except QueueItem.DoesNotExist: - pass - self._report_queue_metrics() + self._currently_processing = True + except QueueItem.DoesNotExist: + self._currently_processing = False + pass return item def complete(self, completed_item): with self._transaction_factory(db): completed_item.delete_instance() - self._report_queue_metrics() + self._currently_processing = False def incomplete(self, incomplete_item, retry_after=300, restore_retry=False): with self._transaction_factory(db): - retry_date = datetime.now() + timedelta(seconds=retry_after) + retry_date = datetime.utcnow() + timedelta(seconds=retry_after) incomplete_item.available_after = retry_date incomplete_item.available = True @@ -121,11 +118,11 @@ class WorkQueue(object): incomplete_item.retries_remaining += 1 incomplete_item.save() - self._report_queue_metrics() + self._currently_processing = False @staticmethod def extend_processing(queue_item, seconds_from_now): - new_expiration = datetime.now() + timedelta(seconds=seconds_from_now) + new_expiration = datetime.utcnow() + timedelta(seconds=seconds_from_now) # Only actually write the new expiration to the db if it moves the expiration some minimum if new_expiration - queue_item.processing_expires > MINIMUM_EXTENSION: diff --git a/test/test_queue.py b/test/test_queue.py index 024a00d72..6c1660eb7 100644 --- a/test/test_queue.py +++ b/test/test_queue.py @@ -2,6 +2,8 @@ import unittest import json import time +from functools import wraps + from app import app from initdb import setup_database_for_testing, finished_database_for_testing from data.queue import WorkQueue @@ -12,14 +14,36 @@ QUEUE_NAME = 'testqueuename' class SaveLastCountReporter(object): def __init__(self): - self.running = None + self.currently_processing = None + self.running_count = None self.total = None - def __call__(self, running, total_jobs): - self.running = running + def __call__(self, currently_processing, running_count, total_jobs): + self.currently_processing = currently_processing + self.running_count = running_count self.total = total_jobs +class AutoUpdatingQueue(object): + def __init__(self, queue_to_wrap): + self._queue = queue_to_wrap + + def _wrapper(self, func): + @wraps(func) + def wrapper(*args, **kwargs): + to_return = func(*args, **kwargs) + self._queue.update_metrics() + return to_return + return wrapper + + def __getattr__(self, attr_name): + method_or_attr = getattr(self._queue, attr_name) + if callable(method_or_attr): + return self._wrapper(method_or_attr) + else: + return method_or_attr + + class QueueTestCase(unittest.TestCase): TEST_MESSAGE_1 = json.dumps({'data': 1}) TEST_MESSAGE_2 = json.dumps({'data': 2}) @@ -27,7 +51,8 @@ class QueueTestCase(unittest.TestCase): def setUp(self): self.reporter = SaveLastCountReporter() self.transaction_factory = app.config['DB_TRANSACTION_FACTORY'] - self.queue = WorkQueue(QUEUE_NAME, self.transaction_factory, reporter=self.reporter) + self.queue = AutoUpdatingQueue(WorkQueue(QUEUE_NAME, self.transaction_factory, + reporter=self.reporter)) setup_database_for_testing(self) def tearDown(self): @@ -36,51 +61,56 @@ class QueueTestCase(unittest.TestCase): class TestQueue(QueueTestCase): def test_same_canonical_names(self): - self.assertEqual(self.reporter.running, None) + self.assertEqual(self.reporter.currently_processing, None) + self.assertEqual(self.reporter.running_count, None) self.assertEqual(self.reporter.total, None) self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1) self.queue.put(['abc', 'def'], self.TEST_MESSAGE_2) - self.assertEqual(self.reporter.running, 0) + self.assertEqual(self.reporter.currently_processing, False) + self.assertEqual(self.reporter.running_count, 0) self.assertEqual(self.reporter.total, 1) one = self.queue.get() self.assertNotEqual(None, one) self.assertEqual(self.TEST_MESSAGE_1, one.body) - self.assertEqual(self.reporter.running, 1) + self.assertEqual(self.reporter.currently_processing, True) + self.assertEqual(self.reporter.running_count, 1) self.assertEqual(self.reporter.total, 1) two_fail = self.queue.get() self.assertEqual(None, two_fail) - self.assertEqual(self.reporter.running, 1) + self.assertEqual(self.reporter.running_count, 1) self.assertEqual(self.reporter.total, 1) self.queue.complete(one) - self.assertEqual(self.reporter.running, 0) + self.assertEqual(self.reporter.currently_processing, False) + self.assertEqual(self.reporter.running_count, 0) self.assertEqual(self.reporter.total, 1) two = self.queue.get() self.assertNotEqual(None, two) + self.assertEqual(self.reporter.currently_processing, True) self.assertEqual(self.TEST_MESSAGE_2, two.body) - self.assertEqual(self.reporter.running, 1) + self.assertEqual(self.reporter.running_count, 1) self.assertEqual(self.reporter.total, 1) def test_different_canonical_names(self): self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1) self.queue.put(['abc', 'ghi'], self.TEST_MESSAGE_2) - self.assertEqual(self.reporter.running, 0) + self.assertEqual(self.reporter.running_count, 0) self.assertEqual(self.reporter.total, 2) one = self.queue.get() self.assertNotEqual(None, one) self.assertEqual(self.TEST_MESSAGE_1, one.body) - self.assertEqual(self.reporter.running, 1) + self.assertEqual(self.reporter.running_count, 1) self.assertEqual(self.reporter.total, 2) two = self.queue.get() self.assertNotEqual(None, two) self.assertEqual(self.TEST_MESSAGE_2, two.body) - self.assertEqual(self.reporter.running, 2) + self.assertEqual(self.reporter.running_count, 2) self.assertEqual(self.reporter.total, 2) def test_canonical_name(self): @@ -95,12 +125,12 @@ class TestQueue(QueueTestCase): def test_expiration(self): self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1) - self.assertEqual(self.reporter.running, 0) + self.assertEqual(self.reporter.running_count, 0) self.assertEqual(self.reporter.total, 1) one = self.queue.get(processing_time=0.5) self.assertNotEqual(None, one) - self.assertEqual(self.reporter.running, 1) + self.assertEqual(self.reporter.running_count, 1) self.assertEqual(self.reporter.total, 1) one_fail = self.queue.get() @@ -108,19 +138,19 @@ class TestQueue(QueueTestCase): time.sleep(1) self.queue.update_metrics() - self.assertEqual(self.reporter.running, 0) + self.assertEqual(self.reporter.running_count, 0) self.assertEqual(self.reporter.total, 1) one_again = self.queue.get() self.assertNotEqual(None, one_again) - self.assertEqual(self.reporter.running, 1) + self.assertEqual(self.reporter.running_count, 1) self.assertEqual(self.reporter.total, 1) def test_specialized_queue(self): self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1) self.queue.put(['def', 'def'], self.TEST_MESSAGE_2) - my_queue = WorkQueue(QUEUE_NAME, self.transaction_factory, ['def']) + my_queue = AutoUpdatingQueue(WorkQueue(QUEUE_NAME, self.transaction_factory, ['def'])) two = my_queue.get() self.assertNotEqual(None, two) diff --git a/util/analytics.py b/util/analytics.py index 4ae1d9db0..6dfdf923c 100644 --- a/util/analytics.py +++ b/util/analytics.py @@ -26,7 +26,7 @@ class SendToMixpanel(Process): self.daemon = True def run(self): - logger.debug('Starting sender process.') + logger.debug('Starting mixpanel sender process.') while True: mp_request = self._mp_queue.get() logger.debug('Got queued mixpanel reqeust.') diff --git a/util/queuemetrics.py b/util/queuemetrics.py index 10b5caf3b..5d969e9bb 100644 --- a/util/queuemetrics.py +++ b/util/queuemetrics.py @@ -1,27 +1,54 @@ import logging import boto +from multiprocessing import Process, Queue logger = logging.getLogger(__name__) class NullReporter(object): - def report(self, running_count, total_count): + def report(self, *args): pass -class CloudWatchReporter(object): - def __init__(self, aws_access_key, aws_secret_key, namespace, name): - self._connection = boto.connect_cloudwatch(aws_access_key, aws_secret_key) +class QueueingCloudWatchReporter(object): + def __init__(self, request_queue, namespace, need_capacity_name, build_percent_name): self._namespace = namespace - self._name = name + self._need_capacity_name = need_capacity_name + self._build_percent_name = build_percent_name + self._put_metrics_queue = request_queue - def report(self, running_count, total_count): + def _send_to_queue(self, *args, **kwargs): + self._put_metrics_queue.put((args, kwargs)) + + def report(self, currently_processing, running_count, total_count): logger.debug('Worker indicated %s running count and %s total count', running_count, total_count) + need_capacity_count = total_count - running_count - self._connection.put_metric_data(self._namespace, self._name, need_capacity_count, - unit='Count') + self._send_to_queue(self._namespace, self._need_capacity_name, need_capacity_count, + unit='Count') + + building_percent = 100 if currently_processing else 0 + self._send_to_queue(self._namespace, self._build_percent_name, building_percent, + unit='Percent') + + +class SendToCloudWatch(Process): + def __init__(self, request_queue, aws_access_key, aws_secret_key): + Process.__init__(self) + self._aws_access_key = aws_access_key + self._aws_secret_key = aws_secret_key + self._put_metrics_queue = request_queue + self.daemon = True + + def run(self): + logger.debug('Starting cloudwatch sender process.') + connection = boto.connect_cloudwatch(self._aws_access_key, self._aws_secret_key) + while True: + put_metric_args, kwargs = self._put_metrics_queue.get() + logger.debug('Got queued put metrics reqeust.') + connection.put_metric_data(*put_metric_args, **kwargs) class QueueMetrics(object): @@ -36,11 +63,17 @@ class QueueMetrics(object): analytics_type = app.config.get('QUEUE_METRICS_TYPE', 'Null') if analytics_type == 'CloudWatch': - access_key = app.config.get('QUEUE_METRICS_AWS_ACCESS_KEY', '') - secret_key = app.config.get('QUEUE_METRICS_AWS_SECRET_KEY', '') - namespace = app.config.get('QUEUE_METRICS_NAMESPACE', '') - name = app.config.get('QUEUE_METRICS_NAME', '') - reporter = CloudWatchReporter(access_key, secret_key, namespace, name) + access_key = app.config.get('QUEUE_METRICS_AWS_ACCESS_KEY') + secret_key = app.config.get('QUEUE_METRICS_AWS_SECRET_KEY') + namespace = app.config.get('QUEUE_METRICS_NAMESPACE') + req_capacity_name = app.config.get('QUEUE_METRICS_CAPACITY_SHORTAGE_NAME') + build_percent_name = app.config.get('QUEUE_METRICS_BUILD_PERCENT_NAME') + + request_queue = Queue() + reporter = QueueingCloudWatchReporter(request_queue, namespace, req_capacity_name, + build_percent_name) + sender = SendToCloudWatch(request_queue, access_key, secret_key) + sender.start() else: reporter = NullReporter() diff --git a/workers/dockerfilebuild.py b/workers/dockerfilebuild.py index bbd326775..477e0282a 100644 --- a/workers/dockerfilebuild.py +++ b/workers/dockerfilebuild.py @@ -1,3 +1,7 @@ +import logging.config + +logging.config.fileConfig('conf/logging.conf', disable_existing_loggers=False) + import logging import daemon import argparse @@ -25,12 +29,6 @@ from util.safetar import safe_extractall from util.dockerfileparse import parse_dockerfile, ParsedDockerfile, serialize_dockerfile -root_logger = logging.getLogger('') -root_logger.setLevel(logging.DEBUG) - -FORMAT = '%(asctime)-15s - %(levelname)s - %(pathname)s - %(funcName)s - %(message)s' -formatter = logging.Formatter(FORMAT) - logger = logging.getLogger(__name__) TIMEOUT_PERIOD_MINUTES = 20 @@ -558,8 +556,6 @@ parser.add_argument('--cachegb', default=20, type=float, help='Maximum cache size in gigabytes.') args = parser.parse_args() -logging.config.fileConfig('conf/logging.conf', disable_existing_loggers=False) - worker = DockerfileBuildWorker(args.cachegb, dockerfile_build_queue, reservation_seconds=RESERVATION_TIME) worker.start(start_status_server_port=8000) diff --git a/workers/worker.py b/workers/worker.py index 112c4f6bc..094f2154f 100644 --- a/workers/worker.py +++ b/workers/worker.py @@ -124,6 +124,9 @@ class Worker(object): if not self._stop.is_set(): logger.debug('No more work.') + def update_queue_metrics(self): + self._queue.update_metrics() + def start(self, start_status_server_port=None): if start_status_server_port is not None: # Start a status server on a thread @@ -140,6 +143,7 @@ class Worker(object): self._sched.start() self._sched.add_interval_job(self.poll_queue, seconds=self._poll_period_seconds, start_date=soon) + self._sched.add_interval_job(self.update_queue_metrics, seconds=60, start_date=soon) self._sched.add_interval_job(self.watchdog, seconds=self._watchdog_period_seconds) signal.signal(signal.SIGTERM, self.terminate)