Merge pull request #2207 from jakedt/queueisslow
Improve queue performance hopefully
This commit is contained in:
commit
6456e95b24
5 changed files with 9 additions and 4 deletions
|
@ -105,6 +105,7 @@ class DefaultConfig(object):
|
||||||
|
|
||||||
# Build Queue Metrics
|
# Build Queue Metrics
|
||||||
QUEUE_METRICS_TYPE = 'Null'
|
QUEUE_METRICS_TYPE = 'Null'
|
||||||
|
QUEUE_WORKER_METRICS_REFRESH_SECONDS = 300
|
||||||
|
|
||||||
# Exception logging
|
# Exception logging
|
||||||
EXCEPTION_LOG_TYPE = 'FakeSentry'
|
EXCEPTION_LOG_TYPE = 'FakeSentry'
|
||||||
|
|
|
@ -758,6 +758,11 @@ class QueueItem(BaseModel):
|
||||||
processing_expires = DateTimeField(null=True, index=True)
|
processing_expires = DateTimeField(null=True, index=True)
|
||||||
retries_remaining = IntegerField(default=5, index=True)
|
retries_remaining = IntegerField(default=5, index=True)
|
||||||
|
|
||||||
|
class Meta:
|
||||||
|
database = db
|
||||||
|
read_slaves = (read_slave,)
|
||||||
|
only_save_dirty = True
|
||||||
|
|
||||||
|
|
||||||
class RepositoryBuild(BaseModel):
|
class RepositoryBuild(BaseModel):
|
||||||
uuid = CharField(default=uuid_generator, index=True)
|
uuid = CharField(default=uuid_generator, index=True)
|
||||||
|
|
|
@ -270,7 +270,7 @@ class WorkQueue(object):
|
||||||
queue_item.processing_expires = new_expiration
|
queue_item.processing_expires = new_expiration
|
||||||
has_change = True
|
has_change = True
|
||||||
|
|
||||||
if updated_data is not None:
|
if updated_data is not None and queue_item.body != updated_data:
|
||||||
queue_item.body = updated_data
|
queue_item.body = updated_data
|
||||||
has_change = True
|
has_change = True
|
||||||
|
|
||||||
|
|
|
@ -40,7 +40,8 @@ class QueueWorker(Worker):
|
||||||
|
|
||||||
# Add the various operations.
|
# Add the various operations.
|
||||||
self.add_operation(self.poll_queue, self._poll_period_seconds)
|
self.add_operation(self.poll_queue, self._poll_period_seconds)
|
||||||
self.add_operation(self.update_queue_metrics, 60)
|
self.add_operation(self.update_queue_metrics,
|
||||||
|
app.config['QUEUE_WORKER_METRICS_REFRESH_SECONDS'])
|
||||||
self.add_operation(self.run_watchdog, self._watchdog_period_seconds)
|
self.add_operation(self.run_watchdog, self._watchdog_period_seconds)
|
||||||
|
|
||||||
def process_queue_item(self, job_details):
|
def process_queue_item(self, job_details):
|
||||||
|
|
|
@ -12,7 +12,6 @@ from util.secscan.notifier import process_notification_data
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
_READING_SECONDS = 120 # 2 minutes
|
|
||||||
_PROCESSING_SECONDS = 60 * 60 # 1 hour
|
_PROCESSING_SECONDS = 60 * 60 # 1 hour
|
||||||
_LAYER_LIMIT = 1000 # The number of layers to request on each page.
|
_LAYER_LIMIT = 1000 # The number of layers to request on each page.
|
||||||
|
|
||||||
|
@ -31,7 +30,6 @@ class SecurityNotificationWorker(QueueWorker):
|
||||||
current_page = data.get('page', None)
|
current_page = data.get('page', None)
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
self.extend_processing(_READING_SECONDS, json.dumps(data))
|
|
||||||
(response_data, should_retry) = secscan_api.get_notification(notification_name,
|
(response_data, should_retry) = secscan_api.get_notification(notification_name,
|
||||||
layer_limit=_LAYER_LIMIT,
|
layer_limit=_LAYER_LIMIT,
|
||||||
page=current_page)
|
page=current_page)
|
||||||
|
|
Reference in a new issue