Merge branch 'master' of https://bitbucket.org/yackob03/quay
This commit is contained in:
commit
a64c9a5d7f
10 changed files with 137 additions and 75 deletions
|
@ -8,6 +8,8 @@ RUN apt-get install -y git python-virtualenv python-dev libjpeg8 libjpeg62-dev l
|
||||||
|
|
||||||
### End common section ###
|
### End common section ###
|
||||||
|
|
||||||
|
RUN apt-get install -y libldap2-dev libsasl2-dev
|
||||||
|
|
||||||
RUN apt-get install -y lxc aufs-tools
|
RUN apt-get install -y lxc aufs-tools
|
||||||
|
|
||||||
RUN usermod -v 100000-200000 -w 100000-200000 root
|
RUN usermod -v 100000-200000 -w 100000-200000 root
|
||||||
|
|
|
@ -41,7 +41,7 @@ def _validate_and_apply_oauth_token(token):
|
||||||
}
|
}
|
||||||
abort(401, message='OAuth access token could not be validated: %(token)s',
|
abort(401, message='OAuth access token could not be validated: %(token)s',
|
||||||
issue='invalid-oauth-token', token=token, headers=authenticate_header)
|
issue='invalid-oauth-token', token=token, headers=authenticate_header)
|
||||||
elif validated.expires_at <= datetime.now():
|
elif validated.expires_at <= datetime.utcnow():
|
||||||
logger.info('OAuth access with an expired token: %s', token)
|
logger.info('OAuth access with an expired token: %s', token)
|
||||||
authenticate_header = {
|
authenticate_header = {
|
||||||
'WWW-Authenticate': ('Bearer error="invalid_token", '
|
'WWW-Authenticate': ('Bearer error="invalid_token", '
|
||||||
|
|
|
@ -131,10 +131,10 @@ class FakeStripe(object):
|
||||||
|
|
||||||
FAKE_SUBSCRIPTION = AttrDict({
|
FAKE_SUBSCRIPTION = AttrDict({
|
||||||
'plan': FAKE_PLAN,
|
'plan': FAKE_PLAN,
|
||||||
'current_period_start': timegm(datetime.now().utctimetuple()),
|
'current_period_start': timegm(datetime.utcnow().utctimetuple()),
|
||||||
'current_period_end': timegm((datetime.now() + timedelta(days=30)).utctimetuple()),
|
'current_period_end': timegm((datetime.utcnow() + timedelta(days=30)).utctimetuple()),
|
||||||
'trial_start': timegm(datetime.now().utctimetuple()),
|
'trial_start': timegm(datetime.utcnow().utctimetuple()),
|
||||||
'trial_end': timegm((datetime.now() + timedelta(days=30)).utctimetuple()),
|
'trial_end': timegm((datetime.utcnow() + timedelta(days=30)).utctimetuple()),
|
||||||
})
|
})
|
||||||
|
|
||||||
FAKE_CARD = AttrDict({
|
FAKE_CARD = AttrDict({
|
||||||
|
|
|
@ -65,7 +65,7 @@ class DatabaseAuthorizationProvider(AuthorizationProvider):
|
||||||
.switch(OAuthAccessToken)
|
.switch(OAuthAccessToken)
|
||||||
.join(User)
|
.join(User)
|
||||||
.where(OAuthApplication.client_id == client_id, User.username == username,
|
.where(OAuthApplication.client_id == client_id, User.username == username,
|
||||||
OAuthAccessToken.expires_at > datetime.now()))
|
OAuthAccessToken.expires_at > datetime.utcnow()))
|
||||||
found = list(found)
|
found = list(found)
|
||||||
logger.debug('Found %s matching tokens.', len(found))
|
logger.debug('Found %s matching tokens.', len(found))
|
||||||
long_scope_string = ','.join([token.scope for token in found])
|
long_scope_string = ','.join([token.scope for token in found])
|
||||||
|
@ -116,7 +116,7 @@ class DatabaseAuthorizationProvider(AuthorizationProvider):
|
||||||
raise RuntimeError('Username must be in the data field')
|
raise RuntimeError('Username must be in the data field')
|
||||||
|
|
||||||
app = OAuthApplication.get(client_id=client_id)
|
app = OAuthApplication.get(client_id=client_id)
|
||||||
expires_at = datetime.now() + timedelta(seconds=expires_in)
|
expires_at = datetime.utcnow() + timedelta(seconds=expires_in)
|
||||||
OAuthAccessToken.create(application=app, authorized_user=user, scope=scope,
|
OAuthAccessToken.create(application=app, authorized_user=user, scope=scope,
|
||||||
access_token=access_token, token_type=token_type,
|
access_token=access_token, token_type=token_type,
|
||||||
expires_at=expires_at, refresh_token=refresh_token, data=data)
|
expires_at=expires_at, refresh_token=refresh_token, data=data)
|
||||||
|
@ -274,7 +274,7 @@ def list_applications_for_org(org):
|
||||||
|
|
||||||
|
|
||||||
def create_access_token_for_testing(user, client_id, scope):
|
def create_access_token_for_testing(user, client_id, scope):
|
||||||
expires_at = datetime.now() + timedelta(seconds=10000)
|
expires_at = datetime.utcnow() + timedelta(seconds=10000)
|
||||||
application = get_application_for_client_id(client_id)
|
application = get_application_for_client_id(client_id)
|
||||||
OAuthAccessToken.create(application=application, authorized_user=user, scope=scope,
|
OAuthAccessToken.create(application=application, authorized_user=user, scope=scope,
|
||||||
token_type='token', access_token='test',
|
token_type='token', access_token='test',
|
||||||
|
|
|
@ -12,6 +12,7 @@ class WorkQueue(object):
|
||||||
self._queue_name = queue_name
|
self._queue_name = queue_name
|
||||||
self._reporter = reporter
|
self._reporter = reporter
|
||||||
self._transaction_factory = transaction_factory
|
self._transaction_factory = transaction_factory
|
||||||
|
self._currently_processing = False
|
||||||
|
|
||||||
if canonical_name_match_list is None:
|
if canonical_name_match_list is None:
|
||||||
self._canonical_name_match_list = []
|
self._canonical_name_match_list = []
|
||||||
|
@ -39,25 +40,21 @@ class WorkQueue(object):
|
||||||
def _name_match_query(self):
|
def _name_match_query(self):
|
||||||
return '%s%%' % self._canonical_name([self._queue_name] + self._canonical_name_match_list)
|
return '%s%%' % self._canonical_name([self._queue_name] + self._canonical_name_match_list)
|
||||||
|
|
||||||
def _report_queue_metrics(self):
|
|
||||||
if self._reporter is None:
|
|
||||||
return
|
|
||||||
|
|
||||||
now = datetime.now()
|
|
||||||
name_match_query = self._name_match_query()
|
|
||||||
|
|
||||||
running_query = self._running_jobs(now, name_match_query)
|
|
||||||
running_count =running_query.distinct().count()
|
|
||||||
|
|
||||||
avialable_query = self._available_jobs(now, name_match_query, running_query)
|
|
||||||
available_count = avialable_query.select(QueueItem.queue_name).distinct().count()
|
|
||||||
|
|
||||||
|
|
||||||
self._reporter(running_count, running_count + available_count)
|
|
||||||
|
|
||||||
def update_metrics(self):
|
def update_metrics(self):
|
||||||
with self._transaction_factory(db):
|
with self._transaction_factory(db):
|
||||||
self._report_queue_metrics()
|
if self._reporter is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
now = datetime.utcnow()
|
||||||
|
name_match_query = self._name_match_query()
|
||||||
|
|
||||||
|
running_query = self._running_jobs(now, name_match_query)
|
||||||
|
running_count =running_query.distinct().count()
|
||||||
|
|
||||||
|
avialable_query = self._available_jobs(now, name_match_query, running_query)
|
||||||
|
available_count = avialable_query.select(QueueItem.queue_name).distinct().count()
|
||||||
|
|
||||||
|
self._reporter(self._currently_processing, running_count, running_count + available_count)
|
||||||
|
|
||||||
def put(self, canonical_name_list, message, available_after=0, retries_remaining=5):
|
def put(self, canonical_name_list, message, available_after=0, retries_remaining=5):
|
||||||
"""
|
"""
|
||||||
|
@ -72,19 +69,18 @@ class WorkQueue(object):
|
||||||
}
|
}
|
||||||
|
|
||||||
if available_after:
|
if available_after:
|
||||||
available_date = datetime.now() + timedelta(seconds=available_after)
|
available_date = datetime.utcnow() + timedelta(seconds=available_after)
|
||||||
params['available_after'] = available_date
|
params['available_after'] = available_date
|
||||||
|
|
||||||
with self._transaction_factory(db):
|
with self._transaction_factory(db):
|
||||||
QueueItem.create(**params)
|
QueueItem.create(**params)
|
||||||
self._report_queue_metrics()
|
|
||||||
|
|
||||||
def get(self, processing_time=300):
|
def get(self, processing_time=300):
|
||||||
"""
|
"""
|
||||||
Get an available item and mark it as unavailable for the default of five
|
Get an available item and mark it as unavailable for the default of five
|
||||||
minutes.
|
minutes.
|
||||||
"""
|
"""
|
||||||
now = datetime.now()
|
now = datetime.utcnow()
|
||||||
|
|
||||||
name_match_query = self._name_match_query()
|
name_match_query = self._name_match_query()
|
||||||
|
|
||||||
|
@ -99,21 +95,22 @@ class WorkQueue(object):
|
||||||
item.processing_expires = now + timedelta(seconds=processing_time)
|
item.processing_expires = now + timedelta(seconds=processing_time)
|
||||||
item.retries_remaining -= 1
|
item.retries_remaining -= 1
|
||||||
item.save()
|
item.save()
|
||||||
except QueueItem.DoesNotExist:
|
|
||||||
pass
|
|
||||||
|
|
||||||
self._report_queue_metrics()
|
self._currently_processing = True
|
||||||
|
except QueueItem.DoesNotExist:
|
||||||
|
self._currently_processing = False
|
||||||
|
pass
|
||||||
|
|
||||||
return item
|
return item
|
||||||
|
|
||||||
def complete(self, completed_item):
|
def complete(self, completed_item):
|
||||||
with self._transaction_factory(db):
|
with self._transaction_factory(db):
|
||||||
completed_item.delete_instance()
|
completed_item.delete_instance()
|
||||||
self._report_queue_metrics()
|
self._currently_processing = False
|
||||||
|
|
||||||
def incomplete(self, incomplete_item, retry_after=300, restore_retry=False):
|
def incomplete(self, incomplete_item, retry_after=300, restore_retry=False):
|
||||||
with self._transaction_factory(db):
|
with self._transaction_factory(db):
|
||||||
retry_date = datetime.now() + timedelta(seconds=retry_after)
|
retry_date = datetime.utcnow() + timedelta(seconds=retry_after)
|
||||||
incomplete_item.available_after = retry_date
|
incomplete_item.available_after = retry_date
|
||||||
incomplete_item.available = True
|
incomplete_item.available = True
|
||||||
|
|
||||||
|
@ -121,11 +118,11 @@ class WorkQueue(object):
|
||||||
incomplete_item.retries_remaining += 1
|
incomplete_item.retries_remaining += 1
|
||||||
|
|
||||||
incomplete_item.save()
|
incomplete_item.save()
|
||||||
self._report_queue_metrics()
|
self._currently_processing = False
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def extend_processing(queue_item, seconds_from_now):
|
def extend_processing(queue_item, seconds_from_now):
|
||||||
new_expiration = datetime.now() + timedelta(seconds=seconds_from_now)
|
new_expiration = datetime.utcnow() + timedelta(seconds=seconds_from_now)
|
||||||
|
|
||||||
# Only actually write the new expiration to the db if it moves the expiration some minimum
|
# Only actually write the new expiration to the db if it moves the expiration some minimum
|
||||||
if new_expiration - queue_item.processing_expires > MINIMUM_EXTENSION:
|
if new_expiration - queue_item.processing_expires > MINIMUM_EXTENSION:
|
||||||
|
|
|
@ -2,6 +2,8 @@ import unittest
|
||||||
import json
|
import json
|
||||||
import time
|
import time
|
||||||
|
|
||||||
|
from functools import wraps
|
||||||
|
|
||||||
from app import app
|
from app import app
|
||||||
from initdb import setup_database_for_testing, finished_database_for_testing
|
from initdb import setup_database_for_testing, finished_database_for_testing
|
||||||
from data.queue import WorkQueue
|
from data.queue import WorkQueue
|
||||||
|
@ -12,14 +14,36 @@ QUEUE_NAME = 'testqueuename'
|
||||||
|
|
||||||
class SaveLastCountReporter(object):
|
class SaveLastCountReporter(object):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.running = None
|
self.currently_processing = None
|
||||||
|
self.running_count = None
|
||||||
self.total = None
|
self.total = None
|
||||||
|
|
||||||
def __call__(self, running, total_jobs):
|
def __call__(self, currently_processing, running_count, total_jobs):
|
||||||
self.running = running
|
self.currently_processing = currently_processing
|
||||||
|
self.running_count = running_count
|
||||||
self.total = total_jobs
|
self.total = total_jobs
|
||||||
|
|
||||||
|
|
||||||
|
class AutoUpdatingQueue(object):
|
||||||
|
def __init__(self, queue_to_wrap):
|
||||||
|
self._queue = queue_to_wrap
|
||||||
|
|
||||||
|
def _wrapper(self, func):
|
||||||
|
@wraps(func)
|
||||||
|
def wrapper(*args, **kwargs):
|
||||||
|
to_return = func(*args, **kwargs)
|
||||||
|
self._queue.update_metrics()
|
||||||
|
return to_return
|
||||||
|
return wrapper
|
||||||
|
|
||||||
|
def __getattr__(self, attr_name):
|
||||||
|
method_or_attr = getattr(self._queue, attr_name)
|
||||||
|
if callable(method_or_attr):
|
||||||
|
return self._wrapper(method_or_attr)
|
||||||
|
else:
|
||||||
|
return method_or_attr
|
||||||
|
|
||||||
|
|
||||||
class QueueTestCase(unittest.TestCase):
|
class QueueTestCase(unittest.TestCase):
|
||||||
TEST_MESSAGE_1 = json.dumps({'data': 1})
|
TEST_MESSAGE_1 = json.dumps({'data': 1})
|
||||||
TEST_MESSAGE_2 = json.dumps({'data': 2})
|
TEST_MESSAGE_2 = json.dumps({'data': 2})
|
||||||
|
@ -27,7 +51,8 @@ class QueueTestCase(unittest.TestCase):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
self.reporter = SaveLastCountReporter()
|
self.reporter = SaveLastCountReporter()
|
||||||
self.transaction_factory = app.config['DB_TRANSACTION_FACTORY']
|
self.transaction_factory = app.config['DB_TRANSACTION_FACTORY']
|
||||||
self.queue = WorkQueue(QUEUE_NAME, self.transaction_factory, reporter=self.reporter)
|
self.queue = AutoUpdatingQueue(WorkQueue(QUEUE_NAME, self.transaction_factory,
|
||||||
|
reporter=self.reporter))
|
||||||
setup_database_for_testing(self)
|
setup_database_for_testing(self)
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
|
@ -36,51 +61,56 @@ class QueueTestCase(unittest.TestCase):
|
||||||
|
|
||||||
class TestQueue(QueueTestCase):
|
class TestQueue(QueueTestCase):
|
||||||
def test_same_canonical_names(self):
|
def test_same_canonical_names(self):
|
||||||
self.assertEqual(self.reporter.running, None)
|
self.assertEqual(self.reporter.currently_processing, None)
|
||||||
|
self.assertEqual(self.reporter.running_count, None)
|
||||||
self.assertEqual(self.reporter.total, None)
|
self.assertEqual(self.reporter.total, None)
|
||||||
|
|
||||||
self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1)
|
self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1)
|
||||||
self.queue.put(['abc', 'def'], self.TEST_MESSAGE_2)
|
self.queue.put(['abc', 'def'], self.TEST_MESSAGE_2)
|
||||||
self.assertEqual(self.reporter.running, 0)
|
self.assertEqual(self.reporter.currently_processing, False)
|
||||||
|
self.assertEqual(self.reporter.running_count, 0)
|
||||||
self.assertEqual(self.reporter.total, 1)
|
self.assertEqual(self.reporter.total, 1)
|
||||||
|
|
||||||
one = self.queue.get()
|
one = self.queue.get()
|
||||||
self.assertNotEqual(None, one)
|
self.assertNotEqual(None, one)
|
||||||
self.assertEqual(self.TEST_MESSAGE_1, one.body)
|
self.assertEqual(self.TEST_MESSAGE_1, one.body)
|
||||||
self.assertEqual(self.reporter.running, 1)
|
self.assertEqual(self.reporter.currently_processing, True)
|
||||||
|
self.assertEqual(self.reporter.running_count, 1)
|
||||||
self.assertEqual(self.reporter.total, 1)
|
self.assertEqual(self.reporter.total, 1)
|
||||||
|
|
||||||
two_fail = self.queue.get()
|
two_fail = self.queue.get()
|
||||||
self.assertEqual(None, two_fail)
|
self.assertEqual(None, two_fail)
|
||||||
self.assertEqual(self.reporter.running, 1)
|
self.assertEqual(self.reporter.running_count, 1)
|
||||||
self.assertEqual(self.reporter.total, 1)
|
self.assertEqual(self.reporter.total, 1)
|
||||||
|
|
||||||
self.queue.complete(one)
|
self.queue.complete(one)
|
||||||
self.assertEqual(self.reporter.running, 0)
|
self.assertEqual(self.reporter.currently_processing, False)
|
||||||
|
self.assertEqual(self.reporter.running_count, 0)
|
||||||
self.assertEqual(self.reporter.total, 1)
|
self.assertEqual(self.reporter.total, 1)
|
||||||
|
|
||||||
two = self.queue.get()
|
two = self.queue.get()
|
||||||
self.assertNotEqual(None, two)
|
self.assertNotEqual(None, two)
|
||||||
|
self.assertEqual(self.reporter.currently_processing, True)
|
||||||
self.assertEqual(self.TEST_MESSAGE_2, two.body)
|
self.assertEqual(self.TEST_MESSAGE_2, two.body)
|
||||||
self.assertEqual(self.reporter.running, 1)
|
self.assertEqual(self.reporter.running_count, 1)
|
||||||
self.assertEqual(self.reporter.total, 1)
|
self.assertEqual(self.reporter.total, 1)
|
||||||
|
|
||||||
def test_different_canonical_names(self):
|
def test_different_canonical_names(self):
|
||||||
self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1)
|
self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1)
|
||||||
self.queue.put(['abc', 'ghi'], self.TEST_MESSAGE_2)
|
self.queue.put(['abc', 'ghi'], self.TEST_MESSAGE_2)
|
||||||
self.assertEqual(self.reporter.running, 0)
|
self.assertEqual(self.reporter.running_count, 0)
|
||||||
self.assertEqual(self.reporter.total, 2)
|
self.assertEqual(self.reporter.total, 2)
|
||||||
|
|
||||||
one = self.queue.get()
|
one = self.queue.get()
|
||||||
self.assertNotEqual(None, one)
|
self.assertNotEqual(None, one)
|
||||||
self.assertEqual(self.TEST_MESSAGE_1, one.body)
|
self.assertEqual(self.TEST_MESSAGE_1, one.body)
|
||||||
self.assertEqual(self.reporter.running, 1)
|
self.assertEqual(self.reporter.running_count, 1)
|
||||||
self.assertEqual(self.reporter.total, 2)
|
self.assertEqual(self.reporter.total, 2)
|
||||||
|
|
||||||
two = self.queue.get()
|
two = self.queue.get()
|
||||||
self.assertNotEqual(None, two)
|
self.assertNotEqual(None, two)
|
||||||
self.assertEqual(self.TEST_MESSAGE_2, two.body)
|
self.assertEqual(self.TEST_MESSAGE_2, two.body)
|
||||||
self.assertEqual(self.reporter.running, 2)
|
self.assertEqual(self.reporter.running_count, 2)
|
||||||
self.assertEqual(self.reporter.total, 2)
|
self.assertEqual(self.reporter.total, 2)
|
||||||
|
|
||||||
def test_canonical_name(self):
|
def test_canonical_name(self):
|
||||||
|
@ -95,12 +125,12 @@ class TestQueue(QueueTestCase):
|
||||||
|
|
||||||
def test_expiration(self):
|
def test_expiration(self):
|
||||||
self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1)
|
self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1)
|
||||||
self.assertEqual(self.reporter.running, 0)
|
self.assertEqual(self.reporter.running_count, 0)
|
||||||
self.assertEqual(self.reporter.total, 1)
|
self.assertEqual(self.reporter.total, 1)
|
||||||
|
|
||||||
one = self.queue.get(processing_time=0.5)
|
one = self.queue.get(processing_time=0.5)
|
||||||
self.assertNotEqual(None, one)
|
self.assertNotEqual(None, one)
|
||||||
self.assertEqual(self.reporter.running, 1)
|
self.assertEqual(self.reporter.running_count, 1)
|
||||||
self.assertEqual(self.reporter.total, 1)
|
self.assertEqual(self.reporter.total, 1)
|
||||||
|
|
||||||
one_fail = self.queue.get()
|
one_fail = self.queue.get()
|
||||||
|
@ -108,19 +138,19 @@ class TestQueue(QueueTestCase):
|
||||||
|
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
self.queue.update_metrics()
|
self.queue.update_metrics()
|
||||||
self.assertEqual(self.reporter.running, 0)
|
self.assertEqual(self.reporter.running_count, 0)
|
||||||
self.assertEqual(self.reporter.total, 1)
|
self.assertEqual(self.reporter.total, 1)
|
||||||
|
|
||||||
one_again = self.queue.get()
|
one_again = self.queue.get()
|
||||||
self.assertNotEqual(None, one_again)
|
self.assertNotEqual(None, one_again)
|
||||||
self.assertEqual(self.reporter.running, 1)
|
self.assertEqual(self.reporter.running_count, 1)
|
||||||
self.assertEqual(self.reporter.total, 1)
|
self.assertEqual(self.reporter.total, 1)
|
||||||
|
|
||||||
def test_specialized_queue(self):
|
def test_specialized_queue(self):
|
||||||
self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1)
|
self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1)
|
||||||
self.queue.put(['def', 'def'], self.TEST_MESSAGE_2)
|
self.queue.put(['def', 'def'], self.TEST_MESSAGE_2)
|
||||||
|
|
||||||
my_queue = WorkQueue(QUEUE_NAME, self.transaction_factory, ['def'])
|
my_queue = AutoUpdatingQueue(WorkQueue(QUEUE_NAME, self.transaction_factory, ['def']))
|
||||||
|
|
||||||
two = my_queue.get()
|
two = my_queue.get()
|
||||||
self.assertNotEqual(None, two)
|
self.assertNotEqual(None, two)
|
||||||
|
|
|
@ -26,7 +26,7 @@ class SendToMixpanel(Process):
|
||||||
self.daemon = True
|
self.daemon = True
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
logger.debug('Starting sender process.')
|
logger.debug('Starting mixpanel sender process.')
|
||||||
while True:
|
while True:
|
||||||
mp_request = self._mp_queue.get()
|
mp_request = self._mp_queue.get()
|
||||||
logger.debug('Got queued mixpanel reqeust.')
|
logger.debug('Got queued mixpanel reqeust.')
|
||||||
|
|
|
@ -1,27 +1,54 @@
|
||||||
import logging
|
import logging
|
||||||
import boto
|
import boto
|
||||||
|
|
||||||
|
from multiprocessing import Process, Queue
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class NullReporter(object):
|
class NullReporter(object):
|
||||||
def report(self, running_count, total_count):
|
def report(self, *args):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
class CloudWatchReporter(object):
|
class QueueingCloudWatchReporter(object):
|
||||||
def __init__(self, aws_access_key, aws_secret_key, namespace, name):
|
def __init__(self, request_queue, namespace, need_capacity_name, build_percent_name):
|
||||||
self._connection = boto.connect_cloudwatch(aws_access_key, aws_secret_key)
|
|
||||||
self._namespace = namespace
|
self._namespace = namespace
|
||||||
self._name = name
|
self._need_capacity_name = need_capacity_name
|
||||||
|
self._build_percent_name = build_percent_name
|
||||||
|
self._put_metrics_queue = request_queue
|
||||||
|
|
||||||
def report(self, running_count, total_count):
|
def _send_to_queue(self, *args, **kwargs):
|
||||||
|
self._put_metrics_queue.put((args, kwargs))
|
||||||
|
|
||||||
|
def report(self, currently_processing, running_count, total_count):
|
||||||
logger.debug('Worker indicated %s running count and %s total count', running_count,
|
logger.debug('Worker indicated %s running count and %s total count', running_count,
|
||||||
total_count)
|
total_count)
|
||||||
|
|
||||||
need_capacity_count = total_count - running_count
|
need_capacity_count = total_count - running_count
|
||||||
self._connection.put_metric_data(self._namespace, self._name, need_capacity_count,
|
self._send_to_queue(self._namespace, self._need_capacity_name, need_capacity_count,
|
||||||
unit='Count')
|
unit='Count')
|
||||||
|
|
||||||
|
building_percent = 100 if currently_processing else 0
|
||||||
|
self._send_to_queue(self._namespace, self._build_percent_name, building_percent,
|
||||||
|
unit='Percent')
|
||||||
|
|
||||||
|
|
||||||
|
class SendToCloudWatch(Process):
|
||||||
|
def __init__(self, request_queue, aws_access_key, aws_secret_key):
|
||||||
|
Process.__init__(self)
|
||||||
|
self._aws_access_key = aws_access_key
|
||||||
|
self._aws_secret_key = aws_secret_key
|
||||||
|
self._put_metrics_queue = request_queue
|
||||||
|
self.daemon = True
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
logger.debug('Starting cloudwatch sender process.')
|
||||||
|
connection = boto.connect_cloudwatch(self._aws_access_key, self._aws_secret_key)
|
||||||
|
while True:
|
||||||
|
put_metric_args, kwargs = self._put_metrics_queue.get()
|
||||||
|
logger.debug('Got queued put metrics reqeust.')
|
||||||
|
connection.put_metric_data(*put_metric_args, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
class QueueMetrics(object):
|
class QueueMetrics(object):
|
||||||
|
@ -36,11 +63,17 @@ class QueueMetrics(object):
|
||||||
analytics_type = app.config.get('QUEUE_METRICS_TYPE', 'Null')
|
analytics_type = app.config.get('QUEUE_METRICS_TYPE', 'Null')
|
||||||
|
|
||||||
if analytics_type == 'CloudWatch':
|
if analytics_type == 'CloudWatch':
|
||||||
access_key = app.config.get('QUEUE_METRICS_AWS_ACCESS_KEY', '')
|
access_key = app.config.get('QUEUE_METRICS_AWS_ACCESS_KEY')
|
||||||
secret_key = app.config.get('QUEUE_METRICS_AWS_SECRET_KEY', '')
|
secret_key = app.config.get('QUEUE_METRICS_AWS_SECRET_KEY')
|
||||||
namespace = app.config.get('QUEUE_METRICS_NAMESPACE', '')
|
namespace = app.config.get('QUEUE_METRICS_NAMESPACE')
|
||||||
name = app.config.get('QUEUE_METRICS_NAME', '')
|
req_capacity_name = app.config.get('QUEUE_METRICS_CAPACITY_SHORTAGE_NAME')
|
||||||
reporter = CloudWatchReporter(access_key, secret_key, namespace, name)
|
build_percent_name = app.config.get('QUEUE_METRICS_BUILD_PERCENT_NAME')
|
||||||
|
|
||||||
|
request_queue = Queue()
|
||||||
|
reporter = QueueingCloudWatchReporter(request_queue, namespace, req_capacity_name,
|
||||||
|
build_percent_name)
|
||||||
|
sender = SendToCloudWatch(request_queue, access_key, secret_key)
|
||||||
|
sender.start()
|
||||||
else:
|
else:
|
||||||
reporter = NullReporter()
|
reporter = NullReporter()
|
||||||
|
|
||||||
|
|
|
@ -1,3 +1,7 @@
|
||||||
|
import logging.config
|
||||||
|
|
||||||
|
logging.config.fileConfig('conf/logging.conf', disable_existing_loggers=False)
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
import daemon
|
import daemon
|
||||||
import argparse
|
import argparse
|
||||||
|
@ -25,12 +29,6 @@ from util.safetar import safe_extractall
|
||||||
from util.dockerfileparse import parse_dockerfile, ParsedDockerfile, serialize_dockerfile
|
from util.dockerfileparse import parse_dockerfile, ParsedDockerfile, serialize_dockerfile
|
||||||
|
|
||||||
|
|
||||||
root_logger = logging.getLogger('')
|
|
||||||
root_logger.setLevel(logging.DEBUG)
|
|
||||||
|
|
||||||
FORMAT = '%(asctime)-15s - %(levelname)s - %(pathname)s - %(funcName)s - %(message)s'
|
|
||||||
formatter = logging.Formatter(FORMAT)
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
TIMEOUT_PERIOD_MINUTES = 20
|
TIMEOUT_PERIOD_MINUTES = 20
|
||||||
|
@ -558,8 +556,6 @@ parser.add_argument('--cachegb', default=20, type=float,
|
||||||
help='Maximum cache size in gigabytes.')
|
help='Maximum cache size in gigabytes.')
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
logging.config.fileConfig('conf/logging.conf', disable_existing_loggers=False)
|
|
||||||
|
|
||||||
worker = DockerfileBuildWorker(args.cachegb, dockerfile_build_queue,
|
worker = DockerfileBuildWorker(args.cachegb, dockerfile_build_queue,
|
||||||
reservation_seconds=RESERVATION_TIME)
|
reservation_seconds=RESERVATION_TIME)
|
||||||
worker.start(start_status_server_port=8000)
|
worker.start(start_status_server_port=8000)
|
||||||
|
|
|
@ -124,6 +124,9 @@ class Worker(object):
|
||||||
if not self._stop.is_set():
|
if not self._stop.is_set():
|
||||||
logger.debug('No more work.')
|
logger.debug('No more work.')
|
||||||
|
|
||||||
|
def update_queue_metrics(self):
|
||||||
|
self._queue.update_metrics()
|
||||||
|
|
||||||
def start(self, start_status_server_port=None):
|
def start(self, start_status_server_port=None):
|
||||||
if start_status_server_port is not None:
|
if start_status_server_port is not None:
|
||||||
# Start a status server on a thread
|
# Start a status server on a thread
|
||||||
|
@ -140,6 +143,7 @@ class Worker(object):
|
||||||
self._sched.start()
|
self._sched.start()
|
||||||
self._sched.add_interval_job(self.poll_queue, seconds=self._poll_period_seconds,
|
self._sched.add_interval_job(self.poll_queue, seconds=self._poll_period_seconds,
|
||||||
start_date=soon)
|
start_date=soon)
|
||||||
|
self._sched.add_interval_job(self.update_queue_metrics, seconds=60, start_date=soon)
|
||||||
self._sched.add_interval_job(self.watchdog, seconds=self._watchdog_period_seconds)
|
self._sched.add_interval_job(self.watchdog, seconds=self._watchdog_period_seconds)
|
||||||
|
|
||||||
signal.signal(signal.SIGTERM, self.terminate)
|
signal.signal(signal.SIGTERM, self.terminate)
|
||||||
|
|
Reference in a new issue