This commit is contained in:
root 2013-11-18 18:03:36 +00:00
commit e70be13479
15 changed files with 425 additions and 121 deletions

View file

@ -32,6 +32,7 @@ start the workers:
```
STACK=prod python -m workers.diffsworker -D
STACK=prod python -m workers.dockerfilebuild -D
STACK=prod python -m workers.webhookworker -D
```
bouncing the servers:

View file

@ -131,6 +131,13 @@ def random_string_generator(length=16):
return random_string
class Webhook(BaseModel):
public_id = CharField(default=random_string_generator(length=64),
unique=True, index=True)
repository = ForeignKeyField(Repository)
parameters = TextField()
class AccessToken(BaseModel):
friendly_name = CharField(null=True)
code = CharField(default=random_string_generator(length=64), unique=True,
@ -199,9 +206,10 @@ class QueueItem(BaseModel):
available_after = DateTimeField(default=datetime.now, index=True)
available = BooleanField(default=True, index=True)
processing_expires = DateTimeField(null=True, index=True)
retries_remaining = IntegerField(default=5)
all_models = [User, Repository, Image, AccessToken, Role,
RepositoryPermission, Visibility, RepositoryTag,
EmailConfirmation, FederatedLogin, LoginService, QueueItem,
RepositoryBuild, Team, TeamMember, TeamRole]
RepositoryBuild, Team, TeamMember, TeamRole, Webhook]

View file

@ -2,6 +2,7 @@ import bcrypt
import logging
import dateutil.parser
import operator
import json
from database import *
from util.validation import *
@ -42,6 +43,10 @@ class InvalidRepositoryBuildException(DataModelException):
pass
class InvalidWebhookException(DataModelException):
pass
def create_user(username, password, email):
if not validate_email(email):
raise InvalidEmailAddressException('Invalid email address: %s' % email)
@ -946,3 +951,30 @@ def list_repository_builds(namespace_name, repository_name,
def create_repository_build(repo, access_token, resource_key, tag):
return RepositoryBuild.create(repository=repo, access_token=access_token,
resource_key=resource_key, tag=tag)
def create_webhook(repo, params_obj):
return Webhook.create(repository=repo, parameters=json.dumps(params_obj))
def get_webhook(namespace_name, repository_name, public_id):
joined = Webhook.select().join(Repository)
found = list(joined.where(Repository.namespace == namespace_name,
Repository.name == repository_name,
Webhook.public_id == public_id))
if not found:
raise InvalidWebhookException('No webhook found with id: %s' % public_id)
return found[0]
def list_webhooks(namespace_name, repository_name):
joined = Webhook.select().join(Repository)
return joined.where(Repository.namespace == namespace_name,
Repository.name == repository_name)
def delete_webhook(namespace_name, repository_name, public_id):
webhook = get_webhook(namespace_name, repository_name, public_id)
webhook.delete_instance()

View file

@ -1,13 +1,13 @@
from datetime import datetime, timedelta
from database import QueueItem
from data.database import QueueItem, db
class WorkQueue(object):
def __init__(self, queue_name):
self.queue_name = queue_name
def put(self, message, available_after=0):
def put(self, message, available_after=0, retries_remaining=5):
"""
Put an item, if it shouldn't be processed for some number of seconds,
specify that amount as available_after.
@ -16,6 +16,7 @@ class WorkQueue(object):
params = {
'queue_name': self.queue_name,
'body': message,
'retries_remaining': retries_remaining,
}
if available_after:
@ -33,31 +34,35 @@ class WorkQueue(object):
available_or_expired = ((QueueItem.available == True) |
(QueueItem.processing_expires <= now))
# TODO the query and the update should be atomic, but for now we only
# have one worker.
avail = QueueItem.select().where(QueueItem.queue_name == self.queue_name,
QueueItem.available_after <= now,
available_or_expired)
with db.transaction():
avail = QueueItem.select().where(QueueItem.queue_name == self.queue_name,
QueueItem.available_after <= now,
available_or_expired,
QueueItem.retries_remaining > 0)
found = list(avail.limit(1).order_by(QueueItem.available_after))
found = list(avail.limit(1).order_by(QueueItem.available_after))
if found:
item = found[0]
item.available = False
item.processing_expires = now + timedelta(seconds=processing_time)
item.save()
if found:
item = found[0]
item.available = False
item.processing_expires = now + timedelta(seconds=processing_time)
item.retries_remaining -= 1
item.save()
return item
return item
return None
return None
def complete(self, completed_item):
completed_item.delete_instance()
def incomplete(self, incomplete_item):
def incomplete(self, incomplete_item, retry_after=300):
retry_date = datetime.now() + timedelta(seconds=retry_after)
incomplete_item.available_after = retry_date
incomplete_item.available = True
incomplete_item.save()
image_diff_queue = WorkQueue('imagediff')
dockerfile_build_queue = WorkQueue('dockerfilebuild')
webhook_queue = WorkQueue('webhook')

View file

@ -1,12 +1,11 @@
import logging
import stripe
import re
import requests
import urlparse
import json
from flask import request, make_response, jsonify, abort
from flask.ext.login import login_required, current_user, logout_user
from flask import request, make_response, jsonify, abort, url_for
from flask.ext.login import current_user, logout_user
from flask.ext.principal import identity_changed, AnonymousIdentity
from functools import wraps
from collections import defaultdict
@ -41,7 +40,8 @@ def api_login_required(f):
if not current_user.is_authenticated():
abort(401)
if current_user and current_user.db_user() and current_user.db_user().organization:
if (current_user and current_user.db_user() and
current_user.db_user().organization):
abort(401)
return f(*args, **kwargs)
@ -149,7 +149,7 @@ def convert_user_to_organization():
def change_user_details():
user = current_user.db_user()
user_data = request.get_json();
user_data = request.get_json()
try:
if 'password' in user_data:
@ -267,7 +267,7 @@ def get_matching_entities(prefix):
if permission.can():
try:
organization = model.get_organization(organization_name)
except:
except model.InvalidOrganizationException:
pass
if organization:
@ -275,7 +275,7 @@ def get_matching_entities(prefix):
users = model.get_matching_users(prefix, organization)
def team_view(team):
def entity_team_view(team):
result = {
'name': team.name,
'kind': 'team',
@ -294,20 +294,20 @@ def get_matching_entities(prefix):
return user_json
team_data = [team_view(team) for team in teams]
team_data = [entity_team_view(team) for team in teams]
user_data = [user_view(user) for user in users]
return jsonify({
'results': team_data + user_data
})
def team_view(orgname, t):
view_permission = ViewTeamPermission(orgname, t.name)
role = model.get_team_org_role(t).name
def team_view(orgname, team):
view_permission = ViewTeamPermission(orgname, team.name)
role = model.get_team_org_role(team).name
return {
'id': t.id,
'name': t.name,
'description': t.description,
'id': team.id,
'name': team.name,
'description': team.description,
'can_view': view_permission.can(),
'role': role
}
@ -320,8 +320,9 @@ def create_organization_api():
existing = None
try:
existing = model.get_organization(org_data['name']) or model.get_user(org_data['name'])
except:
existing = (model.get_organization(org_data['name']) or
model.get_user(org_data['name']))
except model.InvalidOrganizationException:
pass
if existing:
@ -332,8 +333,8 @@ def create_organization_api():
return error_resp
try:
organization = model.create_organization(org_data['name'], org_data['email'],
current_user.db_user())
model.create_organization(org_data['name'], org_data['email'],
current_user.db_user())
return make_response('Created', 201)
except model.DataModelException as ex:
error_resp = jsonify({
@ -365,8 +366,6 @@ def org_view(o, teams):
def get_organization(orgname):
permission = OrganizationMemberPermission(orgname)
if permission.can():
user = current_user.db_user()
try:
org = model.get_organization(orgname)
except model.InvalidOrganizationException:
@ -416,7 +415,8 @@ def get_organization_members(orgname):
members = model.get_organization_members_with_teams(org)
for member in members:
if not member.user.username in members_dict:
members_dict[member.user.username] = {'username': member.user.username, 'teams': []}
members_dict[member.user.username] = {'username': member.user.username,
'teams': []}
members_dict[member.user.username]['teams'].append(member.team.name)
@ -447,9 +447,9 @@ def get_organization_private_allowed(orgname):
abort(403)
def member_view(m):
def member_view(member):
return {
'username': m.username
'username': member.username
}
@ -461,25 +461,25 @@ def update_organization_team(orgname, teamname):
if edit_permission.can():
team = None
json = request.get_json()
details = request.get_json()
is_existing = False
try:
team = model.get_organization_team(orgname, teamname)
is_existing = True
except:
except model.InvalidTeamException:
# Create the new team.
description = json['description'] if 'description' in json else ''
role = json['role'] if 'role' in json else 'member'
description = details['description'] if 'description' in details else ''
role = details['role'] if 'role' in details else 'member'
org = model.get_organization(orgname)
team = model.create_team(teamname, org, role, description)
if is_existing:
if 'description' in json:
team.description = json['description']
if 'description' in details:
team.description = details['description']
team.save()
if 'role' in json:
team = model.set_team_org_permission(team, json['role'],
if 'role' in details:
team = model.set_team_org_permission(team, details['role'],
current_user.db_user().username)
resp = jsonify(team_view(orgname, team))
@ -510,12 +510,10 @@ def get_organization_team_members(orgname, teamname):
edit_permission = AdministerOrganizationPermission(orgname)
if view_permission.can():
user = current_user.db_user()
team = None
try:
team = model.get_organization_team(orgname, teamname)
except:
except model.InvalidTeamException:
abort(404)
members = model.get_organization_team_members(team.id)
@ -539,7 +537,7 @@ def update_organization_team_member(orgname, teamname, membername):
# Find the team.
try:
team = model.get_organization_team(orgname, teamname)
except:
except model.InvalidTeamException:
abort(404)
# Find the user.
@ -573,23 +571,23 @@ def delete_organization_team_member(orgname, teamname, membername):
@api_login_required
def create_repo_api():
owner = current_user.db_user()
json = request.get_json()
namespace_name = json['namespace'] if 'namespace' in json else owner.username
req = request.get_json()
namespace_name = req['namespace'] if 'namespace' in req else owner.username
permission = CreateRepositoryPermission(namespace_name)
if permission.can():
repository_name = json['repository']
visibility = json['visibility']
repository_name = req['repository']
visibility = req['visibility']
existing = model.get_repository(namespace_name, repository_name)
if existing:
return make_response('Repository already exists', 400)
visibility = json['visibility']
visibility = req['visibility']
repo = model.create_repository(namespace_name, repository_name, owner,
visibility)
repo.description = json['description']
repo.description = req['description']
repo.save()
return jsonify({
@ -641,7 +639,7 @@ def list_repos_api():
try:
limit = int(limit) if limit else None
except:
except TypeError:
limit = None
include_public = include_public == 'true'
@ -741,7 +739,7 @@ def get_repo_api(namespace, repository):
organization = None
try:
organization = model.get_organization(namespace)
except:
except model.InvalidOrganizationException:
pass
permission = ReadRepositoryPermission(namespace, repository)
@ -805,18 +803,6 @@ def get_repo_builds(namespace, repository):
abort(403) # Permissions denied
@app.route('/api/filedrop/', methods=['POST'])
@api_login_required
def get_filedrop_url():
mime_type = request.get_json()['mimeType']
(url, file_id) = user_files.prepare_for_drop(mime_type)
return jsonify({
'url': url,
'file_id': file_id
})
@app.route('/api/repository/<path:repository>/build/', methods=['POST'])
@api_login_required
@parse_repository_name
@ -844,6 +830,81 @@ def request_repo_build(namespace, repository):
abort(403) # Permissions denied
def webhook_view(webhook):
return {
'public_id': webhook.public_id,
'parameters': json.loads(webhook.parameters),
}
@app.route('/api/repository/<path:repository>/webhook/', methods=['POST'])
@api_login_required
@parse_repository_name
def create_webhook(namespace, repository):
permission = AdministerRepositoryPermission(namespace, repository)
if permission.can():
repo = model.get_repository(namespace, repository)
webhook = model.create_webhook(repo, request.get_json())
resp = jsonify(webhook_view(webhook))
repo_string = '%s/%s' % (namespace, repository)
resp.headers['Location'] = url_for('get_webhook', repository=repo_string,
public_id=webhook.public_id)
return resp
abort(403) # Permissions denied
@app.route('/api/repository/<path:repository>/webhook/<public_id>',
methods=['GET'])
@api_login_required
@parse_repository_name
def get_webhook(namespace, repository, public_id):
permission = AdministerRepositoryPermission(namespace, repository)
if permission.can():
webhook = model.get_webhook(namespace, repository, public_id)
return jsonify(webhook_view(webhook))
abort(403) # Permission denied
@app.route('/api/repository/<path:repository>/webhook/', methods=['GET'])
@api_login_required
@parse_repository_name
def list_webhooks(namespace, repository):
permission = AdministerRepositoryPermission(namespace, repository)
if permission.can():
webhooks = model.list_webhooks(namespace, repository)
return jsonify({
'webhooks': [webhook_view(webhook) for webhook in webhooks]
})
abort(403) # Permission denied
@app.route('/api/repository/<path:repository>/webhook/<public_id>',
methods=['DELETE'])
@api_login_required
@parse_repository_name
def delete_webhook(namespace, repository, public_id):
permission = AdministerRepositoryPermission(namespace, repository)
if permission.can():
model.delete_webhook(namespace, repository, public_id)
return make_response('No Content', 204)
abort(403) # Permission denied
@app.route('/api/filedrop/', methods=['POST'])
@api_login_required
def get_filedrop_url():
mime_type = request.get_json()['mimeType']
(url, file_id) = user_files.prepare_for_drop(mime_type)
return jsonify({
'url': url,
'file_id': file_id
})
def role_view(repo_perm_obj):
return {
'role': repo_perm_obj.role.name,

View file

@ -6,6 +6,7 @@ from flask import request, make_response, jsonify, abort
from functools import wraps
from data import model
from data.queue import webhook_queue
from app import app, mixpanel
from auth.auth import (process_auth, get_authenticated_user,
get_validated_token)
@ -178,17 +179,38 @@ def update_images(namespace, repository):
permission = ModifyRepositoryPermission(namespace, repository)
if permission.can():
repository = model.get_repository(namespace, repository)
if not repository:
repo = model.get_repository(namespace, repository)
if not repo:
# Make sure the repo actually exists.
abort(404)
image_with_checksums = json.loads(request.data)
updated_tags = {}
for image in image_with_checksums:
logger.debug('Setting checksum for image id: %s to %s' %
(image['id'], image['checksum']))
model.set_image_checksum(image['id'], repository, image['checksum'])
updated_tags[image['Tag']] = image['id']
model.set_image_checksum(image['id'], repo, image['checksum'])
# Generate a job for each webhook that has been added to this repo
webhooks = model.list_webhooks(namespace, repository)
for webhook in webhooks:
webhook_data = json.loads(webhook.parameters)
repo_string = '%s/%s' % (namespace, repository)
logger.debug('Creating webhook for repository \'%s\' for url \'%s\'' %
(repo_string, webhook_data['url']))
webhook_data['payload'] = {
'repository': repo_string,
'namespace': namespace,
'name': repository,
'docker_url': 'quay.io/%s' % repo_string,
'homepage': 'https://quay.io/repository/%s' % repo_string,
'visibility': repo.visibility.name,
'updated_tags': updated_tags,
'pushed_image_count': len(image_with_checksums),
}
webhook_queue.put(json.dumps(webhook_data))
return make_response('Updated', 204)

View file

@ -554,11 +554,13 @@
font-size: .4em;
}
form input.ng-invalid.ng-dirty {
form input.ng-invalid.ng-dirty,
*[ng-form] input.ng-invalid.ng-dirty {
background-color: #FDD7D9;
}
form input.ng-valid.ng-dirty {
form input.ng-valid.ng-dirty,
*[ng-form] input.ng-valid.ng-dirty {
background-color: #DDFFEE;
}

View file

@ -559,7 +559,7 @@ function RepoAdminCtrl($scope, Restangular, $routeParams, $rootScope) {
});
};
$scope.roles = [
$scope.roles = [
{ 'id': 'read', 'title': 'Read', 'kind': 'success' },
{ 'id': 'write', 'title': 'Write', 'kind': 'success' },
{ 'id': 'admin', 'title': 'Admin', 'kind': 'primary' }
@ -700,6 +700,31 @@ function RepoAdminCtrl($scope, Restangular, $routeParams, $rootScope) {
$scope.loading = false;
});
$scope.webhooksLoading = true;
$scope.loadWebhooks = function() {
$scope.webhooksLoading = true;
var fetchWebhooks = Restangular.one('repository/' + namespace + '/' + name + '/webhook/');
fetchWebhooks.get().then(function(resp) {
$scope.webhooks = resp.webhooks;
$scope.webhooksLoading = false;
});
};
$scope.createWebhook = function() {
var newWebhook = Restangular.one('repository/' + namespace + '/' + name + '/webhook/');
newWebhook.customPOST($scope.newWebhook).then(function(resp) {
$scope.webhooks.push(resp);
$scope.newWebhook.url = '';
$scope.newWebhookForm.$setPristine();
});
};
$scope.deleteWebhook = function(webhook) {
var deleteWebhookReq = Restangular.one('repository/' + namespace + '/' + name + '/webhook/' + webhook.public_id);
deleteWebhookReq.customDELETE().then(function(resp) {
$scope.webhooks.splice($scope.webhooks.indexOf(webhook), 1);
});
};
}
function UserAdminCtrl($scope, $timeout, $location, Restangular, PlanService, UserService, KeyService, $routeParams) {

View file

@ -23,6 +23,7 @@
<div class="col-md-2">
<ul class="nav nav-pills nav-stacked">
<li class="active"><a href="javascript:void(0)" data-toggle="tab" data-target="#permissions">Permissions</a></li>
<li><a href="javascript:void(0)" data-toggle="tab" data-target="#webhook" ng-click="loadWebhooks()">Webhooks</a></li>
<li><a href="javascript:void(0)" data-toggle="tab" data-target="#publicprivate">Public/Private</a></li>
<li><a href="javascript:void(0)" data-toggle="tab" data-target="#delete">Delete</a></li>
</ul>
@ -145,7 +146,50 @@
</form>
</div>
</div>
</div>
<!-- Webhook tab -->
<div id="webhook" class="tab-pane">
<div class="panel panel-default">
<div class="panel-heading">Push Webhooks
<i class="info-icon fa fa-info-circle" data-placement="left" data-content="URLs which will be invoked when a successful push to the repository occurs."></i>
</div>
<div class="panel-body" ng-show="webhooksLoading">
Loading webhooks: <i class="fa fa-spinner fa-spin fa-2x" style="vertical-align: middle; margin-left: 4px"></i>
</div>
<div class="panel-body" ng-show="!webhooksLoading">
<table class="permissions" ng-form="newWebhookForm">
<thead>
<tr>
<td style="width: 500px;">Webhook URL</td>
<td></td>
</tr>
</thead>
<tbody>
<tr ng-repeat="webhook in webhooks">
<td>{{ webhook.parameters.url }}</td>
<td>
<span class="delete-ui" tabindex="0">
<span class="delete-ui-button" ng-click="deleteWebhook(webhook)"><button class="btn btn-danger">Delete</button></span>
<i class="fa fa-times" bs-tooltip="tooltip.title" data-placement="right" title="Delete Webhook"></i>
</span>
</td>
</tr>
<tr>
<td>
<input type="url" class="form-control" placeholder="New webhook url..." ng-model="newWebhook.url" required>
</td>
<td>
<button class="btn btn-primary" type="submit" ng-click="createWebhook()">Create</button>
</td>
</tr>
</tbody>
</table>
</div>
</div>
</div>
<!-- Public/private tab -->

Binary file not shown.

0
tools/__init__.py Normal file
View file

19
tools/monthlyrevenue.py Normal file
View file

@ -0,0 +1,19 @@
from app import stripe
EXCLUDE_CID = {'cus_2iVlmwz8CpHgOj'}
offset = 0
total_monthly_revenue = 0
batch = stripe.Customer.all(count=100, offset=offset)
while batch.data:
for cust in batch.data:
if cust.id not in EXCLUDE_CID and cust.subscription:
sub = cust.subscription
total_monthly_revenue += sub.plan.amount * sub.quantity
offset += len(batch.data)
batch = stripe.Customer.all(count=100, offset=offset)
dollars = total_monthly_revenue / 100
cents = total_monthly_revenue % 100
print 'Monthly revenue: $%d.%02d' % (dollars, cents)

View file

@ -1,15 +1,11 @@
import logging
import json
import daemon
import time
import argparse
from apscheduler.scheduler import Scheduler
from data.queue import image_diff_queue
from data.database import db as db_connection
from data.model import DataModelException
from endpoints.registry import process_image_changes
from workers.worker import Worker
root_logger = logging.getLogger('')
@ -21,20 +17,13 @@ formatter = logging.Formatter(FORMAT)
logger = logging.getLogger(__name__)
def process_work_items():
logger.debug('Getting work item from queue.')
class DiffsWorker(Worker):
def process_queue_item(self, job_details):
image_id = job_details['image_id']
namespace = job_details['namespace']
repository = job_details['repository']
item = image_diff_queue.get()
while item:
logger.debug('Queue gave us some work: %s' % item.body)
request = json.loads(item.body)
try:
image_id = request['image_id']
namespace = request['namespace']
repository = request['repository']
process_image_changes(namespace, repository, image_id)
except DataModelException:
# This exception is unrecoverable, and the item should continue and be
@ -43,27 +32,7 @@ def process_work_items():
(image_id, namespace, repository))
logger.warning(msg)
image_diff_queue.complete(item)
item = image_diff_queue.get()
logger.debug('No more work.')
if not db_connection.is_closed():
logger.debug('Closing thread db connection.')
db_connection.close()
def start_worker():
logger.debug("Scheduling worker.")
sched = Scheduler()
sched.start()
sched.add_interval_job(process_work_items, seconds=30)
while True:
time.sleep(60 * 60 * 24) # sleep one day, basically forever
return True
parser = argparse.ArgumentParser(description='Worker daemon to compute diffs')
@ -74,15 +43,17 @@ parser.add_argument('--log', default='diffsworker.log',
args = parser.parse_args()
worker = DiffsWorker(image_diff_queue)
if args.D:
handler = logging.FileHandler(args.log)
handler.setFormatter(formatter)
root_logger.addHandler(handler)
with daemon.DaemonContext(files_preserve=[handler.stream]):
start_worker()
worker.start()
else:
handler = logging.StreamHandler()
handler.setFormatter(formatter)
root_logger.addHandler(handler)
start_worker()
worker.start()

61
workers/webhookworker.py Normal file
View file

@ -0,0 +1,61 @@
import logging
import daemon
import argparse
import requests
import json
from data.queue import webhook_queue
from workers.worker import Worker
root_logger = logging.getLogger('')
root_logger.setLevel(logging.DEBUG)
FORMAT = '%(asctime)-15s - %(levelname)s - %(pathname)s - %(funcName)s - %(message)s'
formatter = logging.Formatter(FORMAT)
logger = logging.getLogger(__name__)
class WebhookWorker(Worker):
def process_queue_item(self, job_details):
url = job_details['url']
payload = job_details['payload']
headers = {'Content-type': 'application/json'}
try:
resp = requests.post(url, data=json.dumps(payload), headers=headers)
if resp.status_code/100 != 2:
logger.error('%s response for webhook to url: %s' % (resp.status_code,
url))
return False
except requests.exceptions.RequestException as ex:
logger.exception('Webhook was unable to be sent: %s' % ex.message)
return False
return True
parser = argparse.ArgumentParser(description='Worker daemon to send webhooks')
parser.add_argument('-D', action='store_true', default=False,
help='Run the worker in daemon mode.')
parser.add_argument('--log', default='webhooks.log',
help='Specify the log file for the worker as a daemon.')
args = parser.parse_args()
worker = WebhookWorker(webhook_queue, poll_period_seconds=15,
reservation_seconds=3600)
if args.D:
handler = logging.FileHandler(args.log)
handler.setFormatter(formatter)
root_logger.addHandler(handler)
with daemon.DaemonContext(files_preserve=[handler.stream]):
worker.start()
else:
handler = logging.StreamHandler()
handler.setFormatter(formatter)
root_logger.addHandler(handler)
worker.start()

53
workers/worker.py Normal file
View file

@ -0,0 +1,53 @@
import logging
import json
from threading import Event
from apscheduler.scheduler import Scheduler
logger = logging.getLogger(__name__)
class Worker(object):
def __init__(self, queue, poll_period_seconds=30, reservation_seconds=300):
self._sched = Scheduler()
self._poll_period_seconds = poll_period_seconds
self._reservation_seconds = reservation_seconds
self._stop = Event()
self._queue = queue
def process_queue_item(self, job_details):
""" Return True if complete, False if it should be retried. """
raise NotImplementedError('Workers must implement run.')
def poll_queue(self):
logger.debug('Getting work item from queue.')
item = self._queue.get()
while item:
logger.debug('Queue gave us some work: %s' % item.body)
job_details = json.loads(item.body)
if self.process_queue_item(job_details):
self._queue.complete(item)
else:
logger.warning('An error occurred processing request: %s' % item.body)
self._queue.incomplete(item)
item = self._queue.get(processing_time=self._reservation_seconds)
logger.debug('No more work.')
def start(self):
logger.debug("Scheduling worker.")
self._sched.start()
self._sched.add_interval_job(self.poll_queue,
seconds=self._poll_period_seconds)
while not self._stop.wait(1):
pass
def join(self):
self._stop.set()