Add a queue capacity reporter plugin to the queue. Move the queue definitions to app. Add a cloudwatch reporter to the dockerfile build queue.

This commit is contained in:
Jake Moshenko 2014-05-21 19:50:37 -04:00
parent 512a17363f
commit d14798de1d
11 changed files with 171 additions and 44 deletions

8
app.py
View file

@ -12,8 +12,10 @@ from storage import Storage
from data.userfiles import Userfiles
from util.analytics import Analytics
from util.exceptionlog import Sentry
from util.queuemetrics import QueueMetrics
from data.billing import Billing
from data.buildlogs import BuildLogs
from data.queue import WorkQueue
OVERRIDE_CONFIG_FILENAME = 'conf/stack/config.py'
@ -48,3 +50,9 @@ analytics = Analytics(app)
billing = Billing(app)
sentry = Sentry(app)
build_logs = BuildLogs(app)
queue_metrics = QueueMetrics(app)
image_diff_queue = WorkQueue(app.config['DIFFS_QUEUE_NAME'])
dockerfile_build_queue = WorkQueue(app.config['DOCKERFILE_BUILD_QUEUE_NAME'],
reporter=queue_metrics.report)
webhook_queue = WorkQueue(app.config['WEBHOOK_QUEUE_NAME'])

View file

@ -86,7 +86,10 @@ class DefaultConfig(object):
USERFILES_PATH = 'test/data/registry/userfiles'
# Analytics
ANALYTICS_TYPE = "FakeAnalytics"
ANALYTICS_TYPE = 'FakeAnalytics'
# Build Queue Metrics
QUEUE_METRICS_TYPE = 'Null'
# Exception logging
EXCEPTION_LOG_TYPE = 'FakeSentry'

View file

@ -11,18 +11,53 @@ MINIMUM_EXTENSION = timedelta(seconds=20)
class WorkQueue(object):
def __init__(self, queue_name, canonical_name_match_list=None):
self.queue_name = queue_name
def __init__(self, queue_name, canonical_name_match_list=None, reporter=None):
self._queue_name = queue_name
self._reporter = reporter
if canonical_name_match_list is None:
self.canonical_name_match_list = []
self._canonical_name_match_list = []
else:
self.canonical_name_match_list = canonical_name_match_list
self._canonical_name_match_list = canonical_name_match_list
@staticmethod
def _canonical_name(name_list):
return '/'.join(name_list) + '/'
def _running_jobs(self, now, name_match_query):
return (QueueItem
.select(QueueItem.queue_name)
.where(QueueItem.available == False,
QueueItem.processing_expires > now,
QueueItem.queue_name ** name_match_query))
def _name_match_query(self):
return '%s%%' % self._canonical_name([self._queue_name] + self._canonical_name_match_list)
def _report_queue_metrics(self):
if self._reporter is None:
return
now = datetime.now()
name_match_query = self._name_match_query()
total_jobs = (QueueItem
.select(QueueItem.queue_name)
.where(QueueItem.queue_name ** name_match_query,
QueueItem.available_after <= now,
((QueueItem.available == True) | (QueueItem.processing_expires > now) |
(QueueItem.retries_remaining > 0)))
.distinct()
.count())
running = self._running_jobs(now, name_match_query).distinct().count()
self._reporter(running, total_jobs)
def update_metrics(self):
with transaction_factory(db):
self._report_queue_metrics()
def put(self, canonical_name_list, message, available_after=0, retries_remaining=5):
"""
Put an item, if it shouldn't be processed for some number of seconds,
@ -30,7 +65,7 @@ class WorkQueue(object):
"""
params = {
'queue_name': self._canonical_name([self.queue_name] + canonical_name_list),
'queue_name': self._canonical_name([self._queue_name] + canonical_name_list),
'body': message,
'retries_remaining': retries_remaining,
}
@ -39,7 +74,9 @@ class WorkQueue(object):
available_date = datetime.now() + timedelta(seconds=available_after)
params['available_after'] = available_date
QueueItem.create(**params)
with transaction_factory(db):
QueueItem.create(**params)
self._report_queue_metrics()
def get(self, processing_time=300):
"""
@ -48,15 +85,10 @@ class WorkQueue(object):
"""
now = datetime.now()
name_match_query = '%s%%' % self._canonical_name([self.queue_name] +
self.canonical_name_match_list)
name_match_query = self._name_match_query()
with transaction_factory(db):
running = (QueueItem
.select(QueueItem.queue_name)
.where(QueueItem.available == False,
QueueItem.processing_expires > now,
QueueItem.queue_name ** name_match_query))
running = self._running_jobs(now, name_match_query)
avail = QueueItem.select().where(QueueItem.queue_name ** name_match_query,
QueueItem.available_after <= now,
@ -67,6 +99,8 @@ class WorkQueue(object):
found = list(avail.limit(1).order_by(QueueItem.id))
item = None
if found:
item = found[0]
item.available = False
@ -74,24 +108,26 @@ class WorkQueue(object):
item.retries_remaining -= 1
item.save()
return item
self._report_queue_metrics()
return None
return item
@staticmethod
def complete(completed_item):
completed_item.delete_instance()
def complete(self, completed_item):
with transaction_factory(db):
completed_item.delete_instance()
self._report_queue_metrics()
@staticmethod
def incomplete(incomplete_item, retry_after=300, restore_retry=False):
retry_date = datetime.now() + timedelta(seconds=retry_after)
incomplete_item.available_after = retry_date
incomplete_item.available = True
def incomplete(self, incomplete_item, retry_after=300, restore_retry=False):
with transaction_factory(db):
retry_date = datetime.now() + timedelta(seconds=retry_after)
incomplete_item.available_after = retry_date
incomplete_item.available = True
if restore_retry:
incomplete_item.retries_remaining += 1
if restore_retry:
incomplete_item.retries_remaining += 1
incomplete_item.save()
incomplete_item.save()
self._report_queue_metrics()
@staticmethod
def extend_processing(queue_item, seconds_from_now):
@ -101,8 +137,3 @@ class WorkQueue(object):
if new_expiration - queue_item.processing_expires > MINIMUM_EXTENSION:
queue_item.processing_expires = new_expiration
queue_item.save()
image_diff_queue = WorkQueue(app.config['DIFFS_QUEUE_NAME'])
dockerfile_build_queue = WorkQueue(app.config['DOCKERFILE_BUILD_QUEUE_NAME'])
webhook_queue = WorkQueue(app.config['WEBHOOK_QUEUE_NAME'])

View file

@ -9,8 +9,7 @@ from flask.ext.principal import identity_changed
from random import SystemRandom
from data import model
from data.queue import dockerfile_build_queue
from app import app, login_manager
from app import app, login_manager, dockerfile_build_queue
from auth.permissions import QuayDeferredPermissionUser
from auth import scopes
from endpoints.api.discovery import swagger_route_data

View file

@ -8,8 +8,7 @@ from collections import OrderedDict
from data import model
from data.model import oauth
from data.queue import webhook_queue
from app import analytics, app
from app import analytics, app, webhook_queue
from auth.auth import process_auth
from auth.auth_context import get_authenticated_user, get_validated_token, get_validated_oauth_token
from util.names import parse_repository_name

View file

@ -7,9 +7,7 @@ from functools import wraps
from datetime import datetime
from time import time
from data.queue import image_diff_queue
from app import storage as store
from app import storage as store, image_diff_queue
from auth.auth import process_auth, extract_namespace_repo_from_session
from util import checksums, changes
from util.http import abort

View file

@ -9,12 +9,23 @@ from data.queue import WorkQueue
QUEUE_NAME = 'testqueuename'
class SaveLastCountReporter(object):
def __init__(self):
self.running = None
self.total = None
def __call__(self, running, total_jobs):
self.running = running
self.total = total_jobs
class QueueTestCase(unittest.TestCase):
TEST_MESSAGE_1 = json.dumps({'data': 1})
TEST_MESSAGE_2 = json.dumps({'data': 2})
def setUp(self):
self.queue = WorkQueue(QUEUE_NAME)
self.reporter = SaveLastCountReporter()
self.queue = WorkQueue(QUEUE_NAME, reporter=self.reporter)
setup_database_for_testing(self)
def tearDown(self):
@ -23,33 +34,52 @@ class QueueTestCase(unittest.TestCase):
class TestQueue(QueueTestCase):
def test_same_canonical_names(self):
self.assertEqual(self.reporter.running, None)
self.assertEqual(self.reporter.total, None)
self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1)
self.queue.put(['abc', 'def'], self.TEST_MESSAGE_2)
self.assertEqual(self.reporter.running, 0)
self.assertEqual(self.reporter.total, 1)
one = self.queue.get()
self.assertNotEqual(None, one)
self.assertEqual(self.TEST_MESSAGE_1, one.body)
self.assertEqual(self.reporter.running, 1)
self.assertEqual(self.reporter.total, 1)
two_fail = self.queue.get()
self.assertEqual(None, two_fail)
self.assertEqual(self.reporter.running, 1)
self.assertEqual(self.reporter.total, 1)
self.queue.complete(one)
self.assertEqual(self.reporter.running, 0)
self.assertEqual(self.reporter.total, 1)
two = self.queue.get()
self.assertNotEqual(None, two)
self.assertEqual(self.TEST_MESSAGE_2, two.body)
self.assertEqual(self.reporter.running, 1)
self.assertEqual(self.reporter.total, 1)
def test_different_canonical_names(self):
self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1)
self.queue.put(['abc', 'ghi'], self.TEST_MESSAGE_2)
self.assertEqual(self.reporter.running, 0)
self.assertEqual(self.reporter.total, 2)
one = self.queue.get()
self.assertNotEqual(None, one)
self.assertEqual(self.TEST_MESSAGE_1, one.body)
self.assertEqual(self.reporter.running, 1)
self.assertEqual(self.reporter.total, 2)
two = self.queue.get()
self.assertNotEqual(None, two)
self.assertEqual(self.TEST_MESSAGE_2, two.body)
self.assertEqual(self.reporter.running, 2)
self.assertEqual(self.reporter.total, 2)
def test_canonical_name(self):
self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1)
@ -63,17 +93,26 @@ class TestQueue(QueueTestCase):
def test_expiration(self):
self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1)
self.assertEqual(self.reporter.running, 0)
self.assertEqual(self.reporter.total, 1)
one = self.queue.get(processing_time=0.5)
self.assertNotEqual(None, one)
self.assertEqual(self.reporter.running, 1)
self.assertEqual(self.reporter.total, 1)
one_fail = self.queue.get()
self.assertEqual(None, one_fail)
time.sleep(1)
self.queue.update_metrics()
self.assertEqual(self.reporter.running, 0)
self.assertEqual(self.reporter.total, 1)
one_again = self.queue.get()
self.assertNotEqual(None, one_again)
self.assertEqual(self.reporter.running, 1)
self.assertEqual(self.reporter.total, 1)
def test_specialized_queue(self):
self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1)

51
util/queuemetrics.py Normal file
View file

@ -0,0 +1,51 @@
import logging
import boto
logger = logging.getLogger(__name__)
class NullReporter(object):
def report(self, running_count, total_count):
pass
class CloudWatchReporter(object):
def __init__(self, aws_access_key, aws_secret_key, namespace, name):
self._connection = boto.connect_cloudwatch(aws_access_key, aws_secret_key)
self._namespace = namespace
self._name = name
def report(self, running_count, total_count):
need_capacity_count = total_count - running_count
self._connection.put_metric_data(self._namespace, self._name, need_capacity_count,
unit='Count')
class QueueMetrics(object):
def __init__(self, app=None):
self.app = app
if app is not None:
self.state = self.init_app(app)
else:
self.state = None
def init_app(self, app):
analytics_type = app.config.get('QUEUE_METRICS_TYPE', 'Null')
if analytics_type == 'CloudWatch':
access_key = app.config.get('QUEUE_METRICS_AWS_ACCESS_KEY', '')
secret_key = app.config.get('QUEUE_METRICS_AWS_SECRET_KEY', '')
namespace = app.config.get('QUEUE_METRICS_NAMESPACE', '')
name = app.config.get('QUEUE_METRICS_NAME', '')
reporter = CloudWatchReporter(access_key, secret_key, namespace, name)
else:
reporter = NullReporter()
# register extension with app
app.extensions = getattr(app, 'extensions', {})
app.extensions['queuemetrics'] = reporter
return reporter
def __getattr__(self, name):
return getattr(self.state, name, None)

View file

@ -1,7 +1,7 @@
import logging
import argparse
from data.queue import image_diff_queue
from app import image_diff_queue
from data.model import DataModelException
from endpoints.registry import process_image_changes
from workers.worker import Worker

View file

@ -18,10 +18,9 @@ from threading import Event
from uuid import uuid4
from collections import defaultdict
from data.queue import dockerfile_build_queue
from data import model
from workers.worker import Worker, WorkerUnhealthyException, JobException
from app import userfiles as user_files, build_logs, sentry
from app import userfiles as user_files, build_logs, sentry, dockerfile_build_queue
from util.safetar import safe_extractall
from util.dockerfileparse import parse_dockerfile, ParsedDockerfile, serialize_dockerfile

View file

@ -3,7 +3,7 @@ import argparse
import requests
import json
from data.queue import webhook_queue
from app import webhook_queue
from workers.worker import Worker