Merge pull request #2204 from jzelinskie/429builds
add rate limiting to build queues
This commit is contained in:
commit
00eafff747
7 changed files with 78 additions and 9 deletions
|
@ -400,3 +400,9 @@ class DefaultConfig(object):
|
||||||
|
|
||||||
# Location of the static marketing site.
|
# Location of the static marketing site.
|
||||||
STATIC_SITE_BUCKET = None
|
STATIC_SITE_BUCKET = None
|
||||||
|
|
||||||
|
# Count and duration used to produce a rate of builds allowed to be queued per repository before
|
||||||
|
# rejecting requests. Values less than zero disable rate limiting.
|
||||||
|
# Example: 10 builds per minute is accomplished by setting ITEMS = 10, SECS = 60
|
||||||
|
MAX_BUILD_QUEUE_RATE_ITEMS = -1
|
||||||
|
MAX_BUILD_QUEUE_RATE_SECS = -1
|
||||||
|
|
|
@ -81,6 +81,19 @@ class WorkQueue(object):
|
||||||
._available_jobs(now, name_match_query)
|
._available_jobs(now, name_match_query)
|
||||||
.where(~(QueueItem.queue_name << running_query)))
|
.where(~(QueueItem.queue_name << running_query)))
|
||||||
|
|
||||||
|
def num_available_jobs_between(self, available_min_time, available_max_time, canonical_name_list):
|
||||||
|
"""
|
||||||
|
Returns the number of available queue items with a given prefix, between the two provided times.
|
||||||
|
"""
|
||||||
|
def strip_slash(name):
|
||||||
|
return name.lstrip('/')
|
||||||
|
canonical_name_list = map(strip_slash, canonical_name_list)
|
||||||
|
|
||||||
|
available = self._available_jobs(available_max_time,
|
||||||
|
'/'.join([self._queue_name] + canonical_name_list) + '%')
|
||||||
|
|
||||||
|
return available.where(QueueItem.available_after >= available_min_time).count()
|
||||||
|
|
||||||
def _name_match_query(self):
|
def _name_match_query(self):
|
||||||
return '%s%%' % self._canonical_name([self._queue_name] + self._canonical_name_match_list)
|
return '%s%%' % self._canonical_name([self._queue_name] + self._canonical_name_match_list)
|
||||||
|
|
||||||
|
|
|
@ -14,9 +14,9 @@ from buildtrigger.basehandler import BuildTriggerHandler
|
||||||
from endpoints.api import (RepositoryParamResource, parse_args, query_param, nickname, resource,
|
from endpoints.api import (RepositoryParamResource, parse_args, query_param, nickname, resource,
|
||||||
require_repo_read, require_repo_write, validate_json_request,
|
require_repo_read, require_repo_write, validate_json_request,
|
||||||
ApiResource, internal_only, format_date, api, path_param,
|
ApiResource, internal_only, format_date, api, path_param,
|
||||||
require_repo_admin)
|
require_repo_admin, abort)
|
||||||
from endpoints.exception import Unauthorized, NotFound, InvalidRequest
|
from endpoints.exception import Unauthorized, NotFound, InvalidRequest
|
||||||
from endpoints.building import start_build, PreparedBuild
|
from endpoints.building import start_build, PreparedBuild, MaximumBuildsQueuedException
|
||||||
from data import database
|
from data import database
|
||||||
from data import model
|
from data import model
|
||||||
from auth.permissions import (ReadRepositoryPermission, ModifyRepositoryPermission,
|
from auth.permissions import (ReadRepositoryPermission, ModifyRepositoryPermission,
|
||||||
|
@ -287,7 +287,11 @@ class RepositoryBuildList(RepositoryParamResource):
|
||||||
prepared.is_manual = True
|
prepared.is_manual = True
|
||||||
prepared.metadata = {}
|
prepared.metadata = {}
|
||||||
|
|
||||||
|
try:
|
||||||
build_request = start_build(repo, prepared, pull_robot_name=pull_robot_name)
|
build_request = start_build(repo, prepared, pull_robot_name=pull_robot_name)
|
||||||
|
except MaximumBuildsQueuedException:
|
||||||
|
abort(429, message='Maximum queued build rate exceeded.')
|
||||||
|
|
||||||
resp = build_status_view(build_request)
|
resp = build_status_view(build_request)
|
||||||
repo_string = '%s/%s' % (namespace, repository)
|
repo_string = '%s/%s' % (namespace, repository)
|
||||||
headers = {
|
headers = {
|
||||||
|
|
|
@ -15,10 +15,10 @@ from buildtrigger.triggerutil import (TriggerDeactivationException,
|
||||||
RepositoryReadException, TriggerStartException)
|
RepositoryReadException, TriggerStartException)
|
||||||
from endpoints.api import (RepositoryParamResource, nickname, resource, require_repo_admin,
|
from endpoints.api import (RepositoryParamResource, nickname, resource, require_repo_admin,
|
||||||
log_action, request_error, query_param, parse_args, internal_only,
|
log_action, request_error, query_param, parse_args, internal_only,
|
||||||
validate_json_request, api, path_param)
|
validate_json_request, api, path_param, abort)
|
||||||
from endpoints.exception import NotFound, Unauthorized, InvalidRequest
|
from endpoints.exception import NotFound, Unauthorized, InvalidRequest
|
||||||
from endpoints.api.build import build_status_view, trigger_view, RepositoryBuildStatus
|
from endpoints.api.build import build_status_view, trigger_view, RepositoryBuildStatus
|
||||||
from endpoints.building import start_build
|
from endpoints.building import start_build, MaximumBuildsQueuedException
|
||||||
from data import model
|
from data import model
|
||||||
from auth.permissions import (UserAdminPermission, AdministerOrganizationPermission,
|
from auth.permissions import (UserAdminPermission, AdministerOrganizationPermission,
|
||||||
ReadRepositoryPermission, AdministerRepositoryPermission)
|
ReadRepositoryPermission, AdministerRepositoryPermission)
|
||||||
|
@ -436,6 +436,8 @@ class ActivateBuildTrigger(RepositoryParamResource):
|
||||||
build_request = start_build(repo, prepared, pull_robot_name=pull_robot_name)
|
build_request = start_build(repo, prepared, pull_robot_name=pull_robot_name)
|
||||||
except TriggerStartException as tse:
|
except TriggerStartException as tse:
|
||||||
raise InvalidRequest(tse.message)
|
raise InvalidRequest(tse.message)
|
||||||
|
except MaximumBuildsQueuedException:
|
||||||
|
abort(429, message='Maximum queued build rate exceeded.')
|
||||||
|
|
||||||
resp = build_status_view(build_request)
|
resp = build_status_view(build_request)
|
||||||
repo_string = '%s/%s' % (namespace_name, repo_name)
|
repo_string = '%s/%s' % (namespace_name, repo_name)
|
||||||
|
|
|
@ -1,6 +1,8 @@
|
||||||
import logging
|
import logging
|
||||||
import json
|
import json
|
||||||
|
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
|
||||||
from flask import request
|
from flask import request
|
||||||
|
|
||||||
from app import app, dockerfile_build_queue
|
from app import app, dockerfile_build_queue
|
||||||
|
@ -11,10 +13,32 @@ from endpoints.notificationhelper import spawn_notification
|
||||||
from util.names import escape_tag
|
from util.names import escape_tag
|
||||||
from util.morecollections import AttrDict
|
from util.morecollections import AttrDict
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
MAX_BUILD_QUEUE_RATE_ITEMS = app.config.get('MAX_BUILD_QUEUE_RATE_ITEMS', -1)
|
||||||
|
MAX_BUILD_QUEUE_RATE_SECS = app.config.get('MAX_BUILD_QUEUE_RATE_SECS', -1)
|
||||||
|
|
||||||
|
|
||||||
|
class MaximumBuildsQueuedException(Exception):
|
||||||
|
"""
|
||||||
|
This exception is raised when a build is requested, but the incoming build
|
||||||
|
would exceed the configured maximum build rate.
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
def start_build(repository, prepared_build, pull_robot_name=None):
|
def start_build(repository, prepared_build, pull_robot_name=None):
|
||||||
|
if MAX_BUILD_QUEUE_RATE_ITEMS > 0 and MAX_BUILD_QUEUE_RATE_SECS > 0:
|
||||||
|
queue_item_canonical_name = [repository.namespace_user.username, repository.name]
|
||||||
|
now = datetime.utcnow()
|
||||||
|
available_min = now - timedelta(seconds=MAX_BUILD_QUEUE_RATE_SECS)
|
||||||
|
available_builds = dockerfile_build_queue.num_available_jobs_between(available_min,
|
||||||
|
now,
|
||||||
|
queue_item_canonical_name)
|
||||||
|
if available_builds >= MAX_BUILD_QUEUE_RATE_ITEMS:
|
||||||
|
raise MaximumBuildsQueuedException()
|
||||||
|
|
||||||
host = app.config['SERVER_HOSTNAME']
|
host = app.config['SERVER_HOSTNAME']
|
||||||
repo_path = '%s/%s/%s' % (host, repository.namespace_user.username, repository.name)
|
repo_path = '%s/%s/%s' % (host, repository.namespace_user.username, repository.name)
|
||||||
|
|
||||||
|
|
|
@ -12,7 +12,7 @@ from util.http import abort
|
||||||
from buildtrigger.basehandler import BuildTriggerHandler
|
from buildtrigger.basehandler import BuildTriggerHandler
|
||||||
from buildtrigger.triggerutil import (ValidationRequestException, SkipRequestException,
|
from buildtrigger.triggerutil import (ValidationRequestException, SkipRequestException,
|
||||||
InvalidPayloadException)
|
InvalidPayloadException)
|
||||||
from endpoints.building import start_build
|
from endpoints.building import start_build, MaximumBuildsQueuedException
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
@ -104,7 +104,10 @@ def build_trigger_webhook(trigger_uuid, **kwargs):
|
||||||
|
|
||||||
pull_robot_name = model.build.get_pull_robot_name(trigger)
|
pull_robot_name = model.build.get_pull_robot_name(trigger)
|
||||||
repo = model.repository.get_repository(namespace, repository)
|
repo = model.repository.get_repository(namespace, repository)
|
||||||
|
try:
|
||||||
start_build(repo, prepared, pull_robot_name=pull_robot_name)
|
start_build(repo, prepared, pull_robot_name=pull_robot_name)
|
||||||
|
except MaximumBuildsQueuedException:
|
||||||
|
abort(429, message='Maximum queued build rate exceeded.')
|
||||||
|
|
||||||
return make_response('Okay')
|
return make_response('Okay')
|
||||||
|
|
||||||
|
|
|
@ -2,6 +2,7 @@ import unittest
|
||||||
import json
|
import json
|
||||||
import time
|
import time
|
||||||
|
|
||||||
|
from datetime import datetime, timedelta
|
||||||
from functools import wraps
|
from functools import wraps
|
||||||
|
|
||||||
from app import app
|
from app import app
|
||||||
|
@ -226,6 +227,22 @@ class TestQueue(QueueTestCase):
|
||||||
self.assertEqual(self.reporter.running_count, 0)
|
self.assertEqual(self.reporter.running_count, 0)
|
||||||
self.assertEqual(self.reporter.total, 3)
|
self.assertEqual(self.reporter.total, 3)
|
||||||
|
|
||||||
|
def test_num_available_between(self):
|
||||||
|
now = datetime.utcnow()
|
||||||
|
self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1, available_after=-10)
|
||||||
|
self.queue.put(['abc', 'ghi'], self.TEST_MESSAGE_2, available_after=-5)
|
||||||
|
|
||||||
|
# Partial results
|
||||||
|
count = self.queue.num_available_jobs_between(now-timedelta(seconds=8), now, ['abc'])
|
||||||
|
self.assertEqual(1, count)
|
||||||
|
|
||||||
|
# All results
|
||||||
|
count = self.queue.num_available_jobs_between(now-timedelta(seconds=20), now, ['/abc'])
|
||||||
|
self.assertEqual(2, count)
|
||||||
|
|
||||||
|
# No results
|
||||||
|
count = self.queue.num_available_jobs_between(now, now, 'abc')
|
||||||
|
self.assertEqual(0, count)
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
|
||||||
|
|
Reference in a new issue