diff --git a/README.md b/README.md index 5a43f8325..35ea35a87 100644 --- a/README.md +++ b/README.md @@ -32,6 +32,7 @@ start the workers: ``` STACK=prod python -m workers.diffsworker -D STACK=prod python -m workers.dockerfilebuild -D +STACK=prod python -m workers.webhookworker -D ``` bouncing the servers: diff --git a/data/database.py b/data/database.py index 1dd51788a..1a8291d5a 100644 --- a/data/database.py +++ b/data/database.py @@ -131,6 +131,13 @@ def random_string_generator(length=16): return random_string +class Webhook(BaseModel): + public_id = CharField(default=random_string_generator(length=64), + unique=True, index=True) + repository = ForeignKeyField(Repository) + parameters = TextField() + + class AccessToken(BaseModel): friendly_name = CharField(null=True) code = CharField(default=random_string_generator(length=64), unique=True, @@ -199,9 +206,10 @@ class QueueItem(BaseModel): available_after = DateTimeField(default=datetime.now, index=True) available = BooleanField(default=True, index=True) processing_expires = DateTimeField(null=True, index=True) + retries_remaining = IntegerField(default=5) all_models = [User, Repository, Image, AccessToken, Role, RepositoryPermission, Visibility, RepositoryTag, EmailConfirmation, FederatedLogin, LoginService, QueueItem, - RepositoryBuild, Team, TeamMember, TeamRole] + RepositoryBuild, Team, TeamMember, TeamRole, Webhook] diff --git a/data/model.py b/data/model.py index ea42ef7c9..28c910d28 100644 --- a/data/model.py +++ b/data/model.py @@ -2,6 +2,7 @@ import bcrypt import logging import dateutil.parser import operator +import json from database import * from util.validation import * @@ -42,6 +43,10 @@ class InvalidRepositoryBuildException(DataModelException): pass +class InvalidWebhookException(DataModelException): + pass + + def create_user(username, password, email): if not validate_email(email): raise InvalidEmailAddressException('Invalid email address: %s' % email) @@ -946,3 +951,30 @@ def list_repository_builds(namespace_name, repository_name, def create_repository_build(repo, access_token, resource_key, tag): return RepositoryBuild.create(repository=repo, access_token=access_token, resource_key=resource_key, tag=tag) + + +def create_webhook(repo, params_obj): + return Webhook.create(repository=repo, parameters=json.dumps(params_obj)) + + +def get_webhook(namespace_name, repository_name, public_id): + joined = Webhook.select().join(Repository) + found = list(joined.where(Repository.namespace == namespace_name, + Repository.name == repository_name, + Webhook.public_id == public_id)) + + if not found: + raise InvalidWebhookException('No webhook found with id: %s' % public_id) + + return found[0] + + +def list_webhooks(namespace_name, repository_name): + joined = Webhook.select().join(Repository) + return joined.where(Repository.namespace == namespace_name, + Repository.name == repository_name) + + +def delete_webhook(namespace_name, repository_name, public_id): + webhook = get_webhook(namespace_name, repository_name, public_id) + webhook.delete_instance() diff --git a/data/queue.py b/data/queue.py index 8e63c4d17..ef0026e52 100644 --- a/data/queue.py +++ b/data/queue.py @@ -1,13 +1,13 @@ from datetime import datetime, timedelta -from database import QueueItem +from data.database import QueueItem, db class WorkQueue(object): def __init__(self, queue_name): self.queue_name = queue_name - def put(self, message, available_after=0): + def put(self, message, available_after=0, retries_remaining=5): """ Put an item, if it shouldn't be processed for some number of seconds, specify that amount as available_after. @@ -16,6 +16,7 @@ class WorkQueue(object): params = { 'queue_name': self.queue_name, 'body': message, + 'retries_remaining': retries_remaining, } if available_after: @@ -33,31 +34,35 @@ class WorkQueue(object): available_or_expired = ((QueueItem.available == True) | (QueueItem.processing_expires <= now)) - # TODO the query and the update should be atomic, but for now we only - # have one worker. - avail = QueueItem.select().where(QueueItem.queue_name == self.queue_name, - QueueItem.available_after <= now, - available_or_expired) + with db.transaction(): + avail = QueueItem.select().where(QueueItem.queue_name == self.queue_name, + QueueItem.available_after <= now, + available_or_expired, + QueueItem.retries_remaining > 0) - found = list(avail.limit(1).order_by(QueueItem.available_after)) + found = list(avail.limit(1).order_by(QueueItem.available_after)) - if found: - item = found[0] - item.available = False - item.processing_expires = now + timedelta(seconds=processing_time) - item.save() + if found: + item = found[0] + item.available = False + item.processing_expires = now + timedelta(seconds=processing_time) + item.retries_remaining -= 1 + item.save() - return item + return item - return None + return None def complete(self, completed_item): completed_item.delete_instance() - def incomplete(self, incomplete_item): + def incomplete(self, incomplete_item, retry_after=300): + retry_date = datetime.now() + timedelta(seconds=retry_after) + incomplete_item.available_after = retry_date incomplete_item.available = True incomplete_item.save() image_diff_queue = WorkQueue('imagediff') dockerfile_build_queue = WorkQueue('dockerfilebuild') +webhook_queue = WorkQueue('webhook') diff --git a/endpoints/api.py b/endpoints/api.py index 062ad69e1..b8ab44c77 100644 --- a/endpoints/api.py +++ b/endpoints/api.py @@ -1,12 +1,11 @@ import logging import stripe -import re import requests import urlparse import json -from flask import request, make_response, jsonify, abort -from flask.ext.login import login_required, current_user, logout_user +from flask import request, make_response, jsonify, abort, url_for +from flask.ext.login import current_user, logout_user from flask.ext.principal import identity_changed, AnonymousIdentity from functools import wraps from collections import defaultdict @@ -41,7 +40,8 @@ def api_login_required(f): if not current_user.is_authenticated(): abort(401) - if current_user and current_user.db_user() and current_user.db_user().organization: + if (current_user and current_user.db_user() and + current_user.db_user().organization): abort(401) return f(*args, **kwargs) @@ -149,7 +149,7 @@ def convert_user_to_organization(): def change_user_details(): user = current_user.db_user() - user_data = request.get_json(); + user_data = request.get_json() try: if 'password' in user_data: @@ -267,7 +267,7 @@ def get_matching_entities(prefix): if permission.can(): try: organization = model.get_organization(organization_name) - except: + except model.InvalidOrganizationException: pass if organization: @@ -275,7 +275,7 @@ def get_matching_entities(prefix): users = model.get_matching_users(prefix, organization) - def team_view(team): + def entity_team_view(team): result = { 'name': team.name, 'kind': 'team', @@ -294,20 +294,20 @@ def get_matching_entities(prefix): return user_json - team_data = [team_view(team) for team in teams] + team_data = [entity_team_view(team) for team in teams] user_data = [user_view(user) for user in users] return jsonify({ 'results': team_data + user_data }) -def team_view(orgname, t): - view_permission = ViewTeamPermission(orgname, t.name) - role = model.get_team_org_role(t).name +def team_view(orgname, team): + view_permission = ViewTeamPermission(orgname, team.name) + role = model.get_team_org_role(team).name return { - 'id': t.id, - 'name': t.name, - 'description': t.description, + 'id': team.id, + 'name': team.name, + 'description': team.description, 'can_view': view_permission.can(), 'role': role } @@ -320,8 +320,9 @@ def create_organization_api(): existing = None try: - existing = model.get_organization(org_data['name']) or model.get_user(org_data['name']) - except: + existing = (model.get_organization(org_data['name']) or + model.get_user(org_data['name'])) + except model.InvalidOrganizationException: pass if existing: @@ -332,8 +333,8 @@ def create_organization_api(): return error_resp try: - organization = model.create_organization(org_data['name'], org_data['email'], - current_user.db_user()) + model.create_organization(org_data['name'], org_data['email'], + current_user.db_user()) return make_response('Created', 201) except model.DataModelException as ex: error_resp = jsonify({ @@ -365,8 +366,6 @@ def org_view(o, teams): def get_organization(orgname): permission = OrganizationMemberPermission(orgname) if permission.can(): - user = current_user.db_user() - try: org = model.get_organization(orgname) except model.InvalidOrganizationException: @@ -403,7 +402,7 @@ def change_organization_details(orgname): @api_login_required def get_organization_members(orgname): permission = AdministerOrganizationPermission(orgname) - if permission.can(): + if permission.can(): try: org = model.get_organization(orgname) except model.InvalidOrganizationException: @@ -416,9 +415,10 @@ def get_organization_members(orgname): members = model.get_organization_members_with_teams(org) for member in members: if not member.user.username in members_dict: - members_dict[member.user.username] = {'username': member.user.username, 'teams': []} + members_dict[member.user.username] = {'username': member.user.username, + 'teams': []} - members_dict[member.user.username]['teams'].append(member.team.name) + members_dict[member.user.username]['teams'].append(member.team.name) return jsonify({'members': members_dict}) @@ -447,9 +447,9 @@ def get_organization_private_allowed(orgname): abort(403) -def member_view(m): +def member_view(member): return { - 'username': m.username + 'username': member.username } @@ -461,25 +461,25 @@ def update_organization_team(orgname, teamname): if edit_permission.can(): team = None - json = request.get_json() + details = request.get_json() is_existing = False try: team = model.get_organization_team(orgname, teamname) is_existing = True - except: + except model.InvalidTeamException: # Create the new team. - description = json['description'] if 'description' in json else '' - role = json['role'] if 'role' in json else 'member' + description = details['description'] if 'description' in details else '' + role = details['role'] if 'role' in details else 'member' org = model.get_organization(orgname) team = model.create_team(teamname, org, role, description) if is_existing: - if 'description' in json: - team.description = json['description'] + if 'description' in details: + team.description = details['description'] team.save() - if 'role' in json: - team = model.set_team_org_permission(team, json['role'], + if 'role' in details: + team = model.set_team_org_permission(team, details['role'], current_user.db_user().username) resp = jsonify(team_view(orgname, team)) @@ -510,12 +510,10 @@ def get_organization_team_members(orgname, teamname): edit_permission = AdministerOrganizationPermission(orgname) if view_permission.can(): - user = current_user.db_user() team = None - try: team = model.get_organization_team(orgname, teamname) - except: + except model.InvalidTeamException: abort(404) members = model.get_organization_team_members(team.id) @@ -539,7 +537,7 @@ def update_organization_team_member(orgname, teamname, membername): # Find the team. try: team = model.get_organization_team(orgname, teamname) - except: + except model.InvalidTeamException: abort(404) # Find the user. @@ -573,23 +571,23 @@ def delete_organization_team_member(orgname, teamname, membername): @api_login_required def create_repo_api(): owner = current_user.db_user() - json = request.get_json() - namespace_name = json['namespace'] if 'namespace' in json else owner.username + req = request.get_json() + namespace_name = req['namespace'] if 'namespace' in req else owner.username permission = CreateRepositoryPermission(namespace_name) if permission.can(): - repository_name = json['repository'] - visibility = json['visibility'] + repository_name = req['repository'] + visibility = req['visibility'] existing = model.get_repository(namespace_name, repository_name) if existing: return make_response('Repository already exists', 400) - visibility = json['visibility'] + visibility = req['visibility'] repo = model.create_repository(namespace_name, repository_name, owner, visibility) - repo.description = json['description'] + repo.description = req['description'] repo.save() return jsonify({ @@ -641,7 +639,7 @@ def list_repos_api(): try: limit = int(limit) if limit else None - except: + except TypeError: limit = None include_public = include_public == 'true' @@ -741,7 +739,7 @@ def get_repo_api(namespace, repository): organization = None try: organization = model.get_organization(namespace) - except: + except model.InvalidOrganizationException: pass permission = ReadRepositoryPermission(namespace, repository) @@ -805,18 +803,6 @@ def get_repo_builds(namespace, repository): abort(403) # Permissions denied - -@app.route('/api/filedrop/', methods=['POST']) -@api_login_required -def get_filedrop_url(): - mime_type = request.get_json()['mimeType'] - (url, file_id) = user_files.prepare_for_drop(mime_type) - return jsonify({ - 'url': url, - 'file_id': file_id - }) - - @app.route('/api/repository//build/', methods=['POST']) @api_login_required @parse_repository_name @@ -844,6 +830,81 @@ def request_repo_build(namespace, repository): abort(403) # Permissions denied +def webhook_view(webhook): + return { + 'public_id': webhook.public_id, + 'parameters': json.loads(webhook.parameters), + } + + +@app.route('/api/repository//webhook/', methods=['POST']) +@api_login_required +@parse_repository_name +def create_webhook(namespace, repository): + permission = AdministerRepositoryPermission(namespace, repository) + if permission.can(): + repo = model.get_repository(namespace, repository) + webhook = model.create_webhook(repo, request.get_json()) + resp = jsonify(webhook_view(webhook)) + repo_string = '%s/%s' % (namespace, repository) + resp.headers['Location'] = url_for('get_webhook', repository=repo_string, + public_id=webhook.public_id) + return resp + + abort(403) # Permissions denied + + +@app.route('/api/repository//webhook/', + methods=['GET']) +@api_login_required +@parse_repository_name +def get_webhook(namespace, repository, public_id): + permission = AdministerRepositoryPermission(namespace, repository) + if permission.can(): + webhook = model.get_webhook(namespace, repository, public_id) + return jsonify(webhook_view(webhook)) + + abort(403) # Permission denied + + +@app.route('/api/repository//webhook/', methods=['GET']) +@api_login_required +@parse_repository_name +def list_webhooks(namespace, repository): + permission = AdministerRepositoryPermission(namespace, repository) + if permission.can(): + webhooks = model.list_webhooks(namespace, repository) + return jsonify({ + 'webhooks': [webhook_view(webhook) for webhook in webhooks] + }) + + abort(403) # Permission denied + + +@app.route('/api/repository//webhook/', + methods=['DELETE']) +@api_login_required +@parse_repository_name +def delete_webhook(namespace, repository, public_id): + permission = AdministerRepositoryPermission(namespace, repository) + if permission.can(): + model.delete_webhook(namespace, repository, public_id) + return make_response('No Content', 204) + + abort(403) # Permission denied + + +@app.route('/api/filedrop/', methods=['POST']) +@api_login_required +def get_filedrop_url(): + mime_type = request.get_json()['mimeType'] + (url, file_id) = user_files.prepare_for_drop(mime_type) + return jsonify({ + 'url': url, + 'file_id': file_id + }) + + def role_view(repo_perm_obj): return { 'role': repo_perm_obj.role.name, @@ -1381,7 +1442,7 @@ def org_invoices_api(orgname): if not organization.stripe_id: abort(404) - invoices = stripe.Invoice.all(customer=organization.stripe_id, count=12) + invoices = stripe.Invoice.all(customer=organization.stripe_id, count=12) return jsonify({ 'invoices': [invoice_view(i) for i in invoices.data] }) diff --git a/endpoints/index.py b/endpoints/index.py index 0fb1ad990..207ace697 100644 --- a/endpoints/index.py +++ b/endpoints/index.py @@ -6,6 +6,7 @@ from flask import request, make_response, jsonify, abort from functools import wraps from data import model +from data.queue import webhook_queue from app import app, mixpanel from auth.auth import (process_auth, get_authenticated_user, get_validated_token) @@ -178,17 +179,38 @@ def update_images(namespace, repository): permission = ModifyRepositoryPermission(namespace, repository) if permission.can(): - repository = model.get_repository(namespace, repository) - if not repository: + repo = model.get_repository(namespace, repository) + if not repo: # Make sure the repo actually exists. abort(404) image_with_checksums = json.loads(request.data) + updated_tags = {} for image in image_with_checksums: logger.debug('Setting checksum for image id: %s to %s' % (image['id'], image['checksum'])) - model.set_image_checksum(image['id'], repository, image['checksum']) + updated_tags[image['Tag']] = image['id'] + model.set_image_checksum(image['id'], repo, image['checksum']) + + # Generate a job for each webhook that has been added to this repo + webhooks = model.list_webhooks(namespace, repository) + for webhook in webhooks: + webhook_data = json.loads(webhook.parameters) + repo_string = '%s/%s' % (namespace, repository) + logger.debug('Creating webhook for repository \'%s\' for url \'%s\'' % + (repo_string, webhook_data['url'])) + webhook_data['payload'] = { + 'repository': repo_string, + 'namespace': namespace, + 'name': repository, + 'docker_url': 'quay.io/%s' % repo_string, + 'homepage': 'https://quay.io/repository/%s' % repo_string, + 'visibility': repo.visibility.name, + 'updated_tags': updated_tags, + 'pushed_image_count': len(image_with_checksums), + } + webhook_queue.put(json.dumps(webhook_data)) return make_response('Updated', 204) diff --git a/static/css/quay.css b/static/css/quay.css index dcf702505..bf3f1124a 100644 --- a/static/css/quay.css +++ b/static/css/quay.css @@ -554,11 +554,13 @@ font-size: .4em; } -form input.ng-invalid.ng-dirty { +form input.ng-invalid.ng-dirty, +*[ng-form] input.ng-invalid.ng-dirty { background-color: #FDD7D9; } -form input.ng-valid.ng-dirty { +form input.ng-valid.ng-dirty, +*[ng-form] input.ng-valid.ng-dirty { background-color: #DDFFEE; } diff --git a/static/js/controllers.js b/static/js/controllers.js index 2d475f5f7..8bba4f712 100644 --- a/static/js/controllers.js +++ b/static/js/controllers.js @@ -559,7 +559,7 @@ function RepoAdminCtrl($scope, Restangular, $routeParams, $rootScope) { }); }; - $scope.roles = [ + $scope.roles = [ { 'id': 'read', 'title': 'Read', 'kind': 'success' }, { 'id': 'write', 'title': 'Write', 'kind': 'success' }, { 'id': 'admin', 'title': 'Admin', 'kind': 'primary' } @@ -700,6 +700,31 @@ function RepoAdminCtrl($scope, Restangular, $routeParams, $rootScope) { $scope.loading = false; }); + $scope.webhooksLoading = true; + $scope.loadWebhooks = function() { + $scope.webhooksLoading = true; + var fetchWebhooks = Restangular.one('repository/' + namespace + '/' + name + '/webhook/'); + fetchWebhooks.get().then(function(resp) { + $scope.webhooks = resp.webhooks; + $scope.webhooksLoading = false; + }); + }; + + $scope.createWebhook = function() { + var newWebhook = Restangular.one('repository/' + namespace + '/' + name + '/webhook/'); + newWebhook.customPOST($scope.newWebhook).then(function(resp) { + $scope.webhooks.push(resp); + $scope.newWebhook.url = ''; + $scope.newWebhookForm.$setPristine(); + }); + }; + + $scope.deleteWebhook = function(webhook) { + var deleteWebhookReq = Restangular.one('repository/' + namespace + '/' + name + '/webhook/' + webhook.public_id); + deleteWebhookReq.customDELETE().then(function(resp) { + $scope.webhooks.splice($scope.webhooks.indexOf(webhook), 1); + }); + }; } function UserAdminCtrl($scope, $timeout, $location, Restangular, PlanService, UserService, KeyService, $routeParams) { diff --git a/static/partials/repo-admin.html b/static/partials/repo-admin.html index ada422d74..d9016b83a 100644 --- a/static/partials/repo-admin.html +++ b/static/partials/repo-admin.html @@ -23,6 +23,7 @@
@@ -145,7 +146,50 @@
+ + +
+
+
Push Webhooks + +
+ +
+ Loading webhooks: +
+ +
+ + + + + + + + + + + + + + + + + + +
Webhook URL
{{ webhook.parameters.url }} + + + + +
+ + + +
+
+
diff --git a/test/data/test.db b/test/data/test.db index e7e752706..6a2cb0c86 100644 Binary files a/test/data/test.db and b/test/data/test.db differ diff --git a/tools/__init__.py b/tools/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tools/monthlyrevenue.py b/tools/monthlyrevenue.py new file mode 100644 index 000000000..9472eeda6 --- /dev/null +++ b/tools/monthlyrevenue.py @@ -0,0 +1,19 @@ +from app import stripe + +EXCLUDE_CID = {'cus_2iVlmwz8CpHgOj'} + +offset = 0 +total_monthly_revenue = 0 + +batch = stripe.Customer.all(count=100, offset=offset) +while batch.data: + for cust in batch.data: + if cust.id not in EXCLUDE_CID and cust.subscription: + sub = cust.subscription + total_monthly_revenue += sub.plan.amount * sub.quantity + offset += len(batch.data) + batch = stripe.Customer.all(count=100, offset=offset) + +dollars = total_monthly_revenue / 100 +cents = total_monthly_revenue % 100 +print 'Monthly revenue: $%d.%02d' % (dollars, cents) diff --git a/workers/diffsworker.py b/workers/diffsworker.py index cacf48856..1d6ee56af 100644 --- a/workers/diffsworker.py +++ b/workers/diffsworker.py @@ -1,15 +1,11 @@ import logging -import json import daemon -import time import argparse -from apscheduler.scheduler import Scheduler - from data.queue import image_diff_queue -from data.database import db as db_connection from data.model import DataModelException from endpoints.registry import process_image_changes +from workers.worker import Worker root_logger = logging.getLogger('') @@ -21,20 +17,13 @@ formatter = logging.Formatter(FORMAT) logger = logging.getLogger(__name__) -def process_work_items(): - logger.debug('Getting work item from queue.') +class DiffsWorker(Worker): + def process_queue_item(self, job_details): + image_id = job_details['image_id'] + namespace = job_details['namespace'] + repository = job_details['repository'] - item = image_diff_queue.get() - - while item: - logger.debug('Queue gave us some work: %s' % item.body) - - request = json.loads(item.body) try: - image_id = request['image_id'] - namespace = request['namespace'] - repository = request['repository'] - process_image_changes(namespace, repository, image_id) except DataModelException: # This exception is unrecoverable, and the item should continue and be @@ -43,27 +32,7 @@ def process_work_items(): (image_id, namespace, repository)) logger.warning(msg) - image_diff_queue.complete(item) - - item = image_diff_queue.get() - - logger.debug('No more work.') - - if not db_connection.is_closed(): - logger.debug('Closing thread db connection.') - db_connection.close() - - -def start_worker(): - logger.debug("Scheduling worker.") - - sched = Scheduler() - sched.start() - - sched.add_interval_job(process_work_items, seconds=30) - - while True: - time.sleep(60 * 60 * 24) # sleep one day, basically forever + return True parser = argparse.ArgumentParser(description='Worker daemon to compute diffs') @@ -74,15 +43,17 @@ parser.add_argument('--log', default='diffsworker.log', args = parser.parse_args() +worker = DiffsWorker(image_diff_queue) + if args.D: handler = logging.FileHandler(args.log) handler.setFormatter(formatter) root_logger.addHandler(handler) with daemon.DaemonContext(files_preserve=[handler.stream]): - start_worker() + worker.start() else: handler = logging.StreamHandler() handler.setFormatter(formatter) root_logger.addHandler(handler) - start_worker() \ No newline at end of file + worker.start() \ No newline at end of file diff --git a/workers/webhookworker.py b/workers/webhookworker.py new file mode 100644 index 000000000..575c11f41 --- /dev/null +++ b/workers/webhookworker.py @@ -0,0 +1,61 @@ +import logging +import daemon +import argparse +import requests +import json + +from data.queue import webhook_queue +from workers.worker import Worker + + +root_logger = logging.getLogger('') +root_logger.setLevel(logging.DEBUG) + +FORMAT = '%(asctime)-15s - %(levelname)s - %(pathname)s - %(funcName)s - %(message)s' +formatter = logging.Formatter(FORMAT) + +logger = logging.getLogger(__name__) + + +class WebhookWorker(Worker): + def process_queue_item(self, job_details): + url = job_details['url'] + payload = job_details['payload'] + headers = {'Content-type': 'application/json'} + + try: + resp = requests.post(url, data=json.dumps(payload), headers=headers) + if resp.status_code/100 != 2: + logger.error('%s response for webhook to url: %s' % (resp.status_code, + url)) + return False + except requests.exceptions.RequestException as ex: + logger.exception('Webhook was unable to be sent: %s' % ex.message) + return False + + return True + + +parser = argparse.ArgumentParser(description='Worker daemon to send webhooks') +parser.add_argument('-D', action='store_true', default=False, + help='Run the worker in daemon mode.') +parser.add_argument('--log', default='webhooks.log', + help='Specify the log file for the worker as a daemon.') +args = parser.parse_args() + + +worker = WebhookWorker(webhook_queue, poll_period_seconds=15, + reservation_seconds=3600) + +if args.D: + handler = logging.FileHandler(args.log) + handler.setFormatter(formatter) + root_logger.addHandler(handler) + with daemon.DaemonContext(files_preserve=[handler.stream]): + worker.start() + +else: + handler = logging.StreamHandler() + handler.setFormatter(formatter) + root_logger.addHandler(handler) + worker.start() \ No newline at end of file diff --git a/workers/worker.py b/workers/worker.py new file mode 100644 index 000000000..be4984bdd --- /dev/null +++ b/workers/worker.py @@ -0,0 +1,53 @@ +import logging +import json + +from threading import Event +from apscheduler.scheduler import Scheduler + + +logger = logging.getLogger(__name__) + + +class Worker(object): + def __init__(self, queue, poll_period_seconds=30, reservation_seconds=300): + self._sched = Scheduler() + self._poll_period_seconds = poll_period_seconds + self._reservation_seconds = reservation_seconds + self._stop = Event() + self._queue = queue + + def process_queue_item(self, job_details): + """ Return True if complete, False if it should be retried. """ + raise NotImplementedError('Workers must implement run.') + + def poll_queue(self): + logger.debug('Getting work item from queue.') + + item = self._queue.get() + while item: + logger.debug('Queue gave us some work: %s' % item.body) + + job_details = json.loads(item.body) + + if self.process_queue_item(job_details): + self._queue.complete(item) + else: + logger.warning('An error occurred processing request: %s' % item.body) + self._queue.incomplete(item) + + item = self._queue.get(processing_time=self._reservation_seconds) + + logger.debug('No more work.') + + def start(self): + logger.debug("Scheduling worker.") + + self._sched.start() + self._sched.add_interval_job(self.poll_queue, + seconds=self._poll_period_seconds) + + while not self._stop.wait(1): + pass + + def join(self): + self._stop.set()