From f07690956dc7c562ffaa851168d623378f274f93 Mon Sep 17 00:00:00 2001 From: yackob03 Date: Fri, 15 Nov 2013 15:49:26 -0500 Subject: [PATCH 01/10] Add a fixed number of retries to each item that gets put in the work queue.: --- data/database.py | 6 ++++++ data/queue.py | 8 ++++++-- test/data/test.db | Bin 96256 -> 97280 bytes 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/data/database.py b/data/database.py index fc5a68454..e6bda976a 100644 --- a/data/database.py +++ b/data/database.py @@ -104,6 +104,11 @@ class Repository(BaseModel): ) +class Webhook(BaseModel): + repository = ForeignKeyField(Repository) + parameters = TextField() + + class Role(BaseModel): name = CharField(index=True) @@ -198,6 +203,7 @@ class QueueItem(BaseModel): available_after = DateTimeField(default=datetime.now, index=True) available = BooleanField(default=True, index=True) processing_expires = DateTimeField(null=True, index=True) + retries_remaining = IntegerField(default=5) all_models = [User, Repository, Image, AccessToken, Role, diff --git a/data/queue.py b/data/queue.py index 8e63c4d17..5f292a1d7 100644 --- a/data/queue.py +++ b/data/queue.py @@ -7,7 +7,7 @@ class WorkQueue(object): def __init__(self, queue_name): self.queue_name = queue_name - def put(self, message, available_after=0): + def put(self, message, available_after=0, retries_remaining=5): """ Put an item, if it shouldn't be processed for some number of seconds, specify that amount as available_after. @@ -16,6 +16,7 @@ class WorkQueue(object): params = { 'queue_name': self.queue_name, 'body': message, + 'retries_remaining': retries_remaining, } if available_after: @@ -37,7 +38,8 @@ class WorkQueue(object): # have one worker. avail = QueueItem.select().where(QueueItem.queue_name == self.queue_name, QueueItem.available_after <= now, - available_or_expired) + available_or_expired, + QueueItem.retries_remaining > 0) found = list(avail.limit(1).order_by(QueueItem.available_after)) @@ -45,6 +47,7 @@ class WorkQueue(object): item = found[0] item.available = False item.processing_expires = now + timedelta(seconds=processing_time) + item.retries_remaining -= 1 item.save() return item @@ -61,3 +64,4 @@ class WorkQueue(object): image_diff_queue = WorkQueue('imagediff') dockerfile_build_queue = WorkQueue('dockerfilebuild') +webhook_queue = WorkQueue('webhook') diff --git a/test/data/test.db b/test/data/test.db index 20ed3819d36d71d54cdf20cd477623d14002ea5f..fc3f5c088710af863c3692ba8f133ab1364f8a62 100644 GIT binary patch delta 1567 zcmbu9|4&m_6u{qk_t6%p2uOt~lTuXVTY3FPDd@Dt(n3qWfA>pQXa^5kT1rvCY!T-V z+kTkJHL1VMWcX!E++yU-U>2GDw4XLL8&k7ni8EQu5@0MQ&NyOr3vv4cc5~n7oOjO6 zxi{yWdrz+BoqUmZwhF@Mq~^*3beAH-jQ&Q?(Vu7uJw;E@FX$2a5#8lWm>$%u*Af3- z4S`lcz%~)o*Ampo2)0!cY$+osC?d!g%rZ=6v4A>5QD>kGPSKapCNzmlm=jnheO7R4 z2NIT7Rl(^(+)}-|yHP4j$1-UyHX2UH5)p2g8}64$yKEk#$>@^W>>jDjYqj#bWk;c6 z6A3S(Iw&&lKm&}x@AQ6TLZ6b@4f&HoVfp?ln5x07tO8%DIaEF<5|&FOFs02zdgHNl za^oxHr}osNnVrmRbD zmrWZPll$N4J$P^kkF?cld;{a!pvB!kY3@mKDu=yCukZ84deu68EMu`N2a+A`(82CR zN6eWB462St`kE)S5&V5yohF&o_VwG+iIzlvpDjK-kZ_uYyfN*lBJDHl62?~65gi*1 zxZII3jiIY`e8@L$nuy?r_FbBhj&ZKfsTuWq{fb1?t@XEdN3C|nxT|-l|5()S>@jBg zJsxLAy3b&7_D^=HGnpud?^!DFC+*ubk#vGn`Q38eupyM`>Krj@Y)Q2}nHu7BR&U(V z!Wy{_!TcPEXLU&gmuiT7$RJRxBv5 zmcSS{9F2{SjmFa9TovOvtqhl}NU+J_0Li5b4m4vH&7K#Cc){V9!h(%bhA1k8MR5nd z;2zk~vsF;OO9ILDNSRn95zdt?%tdqhcKkSQfl9tH@iWZ}qA!KBDS=w-rzk&l6TYNt z>FcBkjJ)S^kfv?5czIF1N#5Zy>ODrOM{lqi@tNgq*!|A z&=BA8=>>*o9Ll&FY6jB|KX|}=z(TDeY+M~*Qb#|!g2XKyW_1ONBLGYjJ4^^;L7d4yn|w$i_ns!P%P1xoysY}d}0eiH$a zN?2}cfGHIYFG}#)JC(3+BkLdC*@7J_#rXbWerY@U_zzchz|40daURJ<5&AmU7e{&ViyO#!A7A3+39K}9R3H9aL11T delta 1504 zcmbu9e@s(X6vy9r_d#nx23md$m=wh!2<_{S*B^9>ZDEvJ`k?gDA1K(;+FGH2bkgc# zB>IPcm`%Bh-9H-BEo_;~T<8{-1xnri*gq~yrZH1ovMtM&I3zlg&1}Z)mi;mJ_g+pu z=iYmA&Ux>9Zr;hq1t%XAoU4PO^D=Ao26TfWj~V@menSt@D*74yfWAfFpc|Po#)X<+ z(vqlFNx-%cG&d4#+e%PdOHf`;Q1l$Z2GKmj)NT?{28vh~TIfdlJpB}%#%0Vl{JQLs zWF~<`m34J+svV!Le~?LU9Rh4=cs0F)soF>ih^Z(=>=x9*8*q>Q71_}hlIz6F4HY2C z+-hh>xXCKax9lp>ygZ_F27@DA+@#r~2`2^Hq)Q0!W>chVTzz!4pfyJLKJ?k+?q;J{C>dVgq5;;|O(mUEz+&fj-Y< zWJ;R|*$#T;_{+8??iGDh?-gP~xa;WDwBD;q2BRJyADH4t^j58)9q5gjxK5pN?1td~r4r7(1++jvS1{)#FhQA4$Yb9GhS}gHD6JKcduWOgK&QV0^@`Kcvw*;=G%STEo-F zlFlCexWi)5h9iDeX4NPyp1saw82b~^tgNuKf}$#@gYX(%Mqfa0QWQR6t%1#WPklMQ zYV8GSx*K6%Fp_qarXyl;ejFm|GYU(By;#OXxAasLR&Fnc@k6m-C=n0&$A*uLkA{X1 z1@r4_JQLiE9|iSLlV4@pLtoHu6;5c1T@>Y_=HWfMfqtJ%(wuR>?WO6yMm%_XE66h^ zZp&f*U8Xq0+`NqT89c6jm$}pG6YQ2gtG;i*ZgZR59ZHkW=4Cr=8pnXkYj*Lx-K%B$ zm0laKC9?B6pOf!0xXi3jsA84sR;99))yY)qHkF}`)ykEePODea^LKN|`F|VaMJb@p zQN;d$1pJ65$v2&#BRlX1cN#&luH`RxTA&G6t+1fP*H&t9a778t_`*svsK~w@os#Ut z>eXgYXNFe&04+FoUk@7maIp&Cy1zNqPD5d8PaR0GX02*AN5=QvlcMwk@uq)z3lt?# z_h=)19qlK*Eof#_E9TeQfLpH+S}Vs3YdcFEa#1D6!Rhw6KQJ1?^-EIx^_&!RcxbgM z^Vb}JWTr$Rs_f)oMuAUet8vF-4d_XUZTQq8D`uY!4KIuE-9;%FNVs3D6jk<-tgiK} zD@z;k)vOfS)-$gxwWU82r*p-#R;HwY8l#9kh0p0T=pFK^gQTVE-@5g|DWrY5LgRnLO?k}NTt;*}}2l zsAV`n2k6`A08ykf^*g|eU`pixq^S>S*tkHZe+?D_Ti_BD=gSw+r}O297K+7ioCaU& ij0E<=KM+#Mx7$dAE1rlOnK}E-*59GWa)G^1s6X From 14263de7f8924e52867702ca65ed42e3222fa36e Mon Sep 17 00:00:00 2001 From: yackob03 Date: Fri, 15 Nov 2013 15:50:20 -0500 Subject: [PATCH 02/10] Extract some boilerplate from the worker and create a base class. Port the diffs worker over to the base. --- workers/diffsworker.py | 51 +++++++++------------------------------- workers/worker.py | 53 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 64 insertions(+), 40 deletions(-) create mode 100644 workers/worker.py diff --git a/workers/diffsworker.py b/workers/diffsworker.py index cacf48856..1d6ee56af 100644 --- a/workers/diffsworker.py +++ b/workers/diffsworker.py @@ -1,15 +1,11 @@ import logging -import json import daemon -import time import argparse -from apscheduler.scheduler import Scheduler - from data.queue import image_diff_queue -from data.database import db as db_connection from data.model import DataModelException from endpoints.registry import process_image_changes +from workers.worker import Worker root_logger = logging.getLogger('') @@ -21,20 +17,13 @@ formatter = logging.Formatter(FORMAT) logger = logging.getLogger(__name__) -def process_work_items(): - logger.debug('Getting work item from queue.') +class DiffsWorker(Worker): + def process_queue_item(self, job_details): + image_id = job_details['image_id'] + namespace = job_details['namespace'] + repository = job_details['repository'] - item = image_diff_queue.get() - - while item: - logger.debug('Queue gave us some work: %s' % item.body) - - request = json.loads(item.body) try: - image_id = request['image_id'] - namespace = request['namespace'] - repository = request['repository'] - process_image_changes(namespace, repository, image_id) except DataModelException: # This exception is unrecoverable, and the item should continue and be @@ -43,27 +32,7 @@ def process_work_items(): (image_id, namespace, repository)) logger.warning(msg) - image_diff_queue.complete(item) - - item = image_diff_queue.get() - - logger.debug('No more work.') - - if not db_connection.is_closed(): - logger.debug('Closing thread db connection.') - db_connection.close() - - -def start_worker(): - logger.debug("Scheduling worker.") - - sched = Scheduler() - sched.start() - - sched.add_interval_job(process_work_items, seconds=30) - - while True: - time.sleep(60 * 60 * 24) # sleep one day, basically forever + return True parser = argparse.ArgumentParser(description='Worker daemon to compute diffs') @@ -74,15 +43,17 @@ parser.add_argument('--log', default='diffsworker.log', args = parser.parse_args() +worker = DiffsWorker(image_diff_queue) + if args.D: handler = logging.FileHandler(args.log) handler.setFormatter(formatter) root_logger.addHandler(handler) with daemon.DaemonContext(files_preserve=[handler.stream]): - start_worker() + worker.start() else: handler = logging.StreamHandler() handler.setFormatter(formatter) root_logger.addHandler(handler) - start_worker() \ No newline at end of file + worker.start() \ No newline at end of file diff --git a/workers/worker.py b/workers/worker.py new file mode 100644 index 000000000..be4984bdd --- /dev/null +++ b/workers/worker.py @@ -0,0 +1,53 @@ +import logging +import json + +from threading import Event +from apscheduler.scheduler import Scheduler + + +logger = logging.getLogger(__name__) + + +class Worker(object): + def __init__(self, queue, poll_period_seconds=30, reservation_seconds=300): + self._sched = Scheduler() + self._poll_period_seconds = poll_period_seconds + self._reservation_seconds = reservation_seconds + self._stop = Event() + self._queue = queue + + def process_queue_item(self, job_details): + """ Return True if complete, False if it should be retried. """ + raise NotImplementedError('Workers must implement run.') + + def poll_queue(self): + logger.debug('Getting work item from queue.') + + item = self._queue.get() + while item: + logger.debug('Queue gave us some work: %s' % item.body) + + job_details = json.loads(item.body) + + if self.process_queue_item(job_details): + self._queue.complete(item) + else: + logger.warning('An error occurred processing request: %s' % item.body) + self._queue.incomplete(item) + + item = self._queue.get(processing_time=self._reservation_seconds) + + logger.debug('No more work.') + + def start(self): + logger.debug("Scheduling worker.") + + self._sched.start() + self._sched.add_interval_job(self.poll_queue, + seconds=self._poll_period_seconds) + + while not self._stop.wait(1): + pass + + def join(self): + self._stop.set() From 41c92deb0d9ff5580cc02f47a012f485f7fa8f88 Mon Sep 17 00:00:00 2001 From: yackob03 Date: Fri, 15 Nov 2013 16:12:12 -0500 Subject: [PATCH 03/10] Fix a bunch of errors that pylint was complaining about. --- endpoints/api.py | 119 +++++++++++++++++++++++------------------------ 1 file changed, 58 insertions(+), 61 deletions(-) diff --git a/endpoints/api.py b/endpoints/api.py index 8f93bd0d8..e3f2dab82 100644 --- a/endpoints/api.py +++ b/endpoints/api.py @@ -1,12 +1,11 @@ import logging import stripe -import re import requests import urlparse import json from flask import request, make_response, jsonify, abort -from flask.ext.login import login_required, current_user, logout_user +from flask.ext.login import current_user, logout_user from flask.ext.principal import identity_changed, AnonymousIdentity from functools import wraps from collections import defaultdict @@ -41,7 +40,8 @@ def api_login_required(f): if not current_user.is_authenticated(): abort(401) - if current_user and current_user.db_user() and current_user.db_user().organization: + if (current_user and current_user.db_user() and + current_user.db_user().organization): abort(401) return f(*args, **kwargs) @@ -144,7 +144,7 @@ def convert_user_to_organization(): def change_user_details(): user = current_user.db_user() - user_data = request.get_json(); + user_data = request.get_json() try: if 'password' in user_data: @@ -264,7 +264,7 @@ def get_matching_entities(prefix): if permission.can(): try: organization = model.get_organization(organization_name) - except: + except model.InvalidOrganizationException: pass if organization: @@ -272,7 +272,7 @@ def get_matching_entities(prefix): users = model.get_matching_users(prefix, organization) - def team_view(team): + def entity_team_view(team): result = { 'name': team.name, 'kind': 'team', @@ -291,20 +291,20 @@ def get_matching_entities(prefix): return user_json - team_data = [team_view(team) for team in teams] + team_data = [entity_team_view(team) for team in teams] user_data = [user_view(user) for user in users] return jsonify({ 'results': team_data + user_data }) -def team_view(orgname, t): - view_permission = ViewTeamPermission(orgname, t.name) - role = model.get_team_org_role(t).name +def team_view(orgname, team): + view_permission = ViewTeamPermission(orgname, team.name) + role = model.get_team_org_role(team).name return { - 'id': t.id, - 'name': t.name, - 'description': t.description, + 'id': team.id, + 'name': team.name, + 'description': team.description, 'can_view': view_permission.can(), 'role': role } @@ -317,8 +317,9 @@ def create_organization_api(): existing = None try: - existing = model.get_organization(org_data['name']) or model.get_user(org_data['name']) - except: + existing = (model.get_organization(org_data['name']) or + model.get_user(org_data['name'])) + except model.InvalidOrganizationException: pass if existing: @@ -329,8 +330,8 @@ def create_organization_api(): return error_resp try: - organization = model.create_organization(org_data['name'], org_data['email'], - current_user.db_user()) + model.create_organization(org_data['name'], org_data['email'], + current_user.db_user()) return make_response('Created', 201) except model.DataModelException as ex: error_resp = jsonify({ @@ -345,15 +346,13 @@ def create_organization_api(): def get_organization(orgname): permission = OrganizationMemberPermission(orgname) if permission.can(): - user = current_user.db_user() - - def org_view(o, teams): + def org_view(org, teams): admin_org = AdministerOrganizationPermission(orgname) is_admin = admin_org.can() return { - 'name': o.username, - 'email': o.email if is_admin else '', - 'gravatar': compute_hash(o.email), + 'name': org.username, + 'email': org.email if is_admin else '', + 'gravatar': compute_hash(org.email), 'teams': {t.name : team_view(orgname, t) for t in teams}, 'is_admin': is_admin } @@ -372,7 +371,7 @@ def get_organization(orgname): @api_login_required def get_organization_members(orgname): permission = AdministerOrganizationPermission(orgname) - if permission.can(): + if permission.can(): try: org = model.get_organization(orgname) except model.InvalidOrganizationException: @@ -385,9 +384,10 @@ def get_organization_members(orgname): members = model.get_organization_members_with_teams(org) for member in members: if not member.user.username in members_dict: - members_dict[member.user.username] = {'username': member.user.username, 'teams': []} + members_dict[member.user.username] = {'username': member.user.username, + 'teams': []} - members_dict[member.user.username]['teams'].append(member.team.name) + members_dict[member.user.username]['teams'].append(member.team.name) return jsonify({'members': members_dict}) @@ -416,9 +416,9 @@ def get_organization_private_allowed(orgname): abort(403) -def member_view(m): +def member_view(member): return { - 'username': m.username + 'username': member.username } @@ -430,25 +430,25 @@ def update_organization_team(orgname, teamname): if edit_permission.can(): team = None - json = request.get_json() + details = request.get_json() is_existing = False try: team = model.get_organization_team(orgname, teamname) is_existing = True - except: + except model.InvalidTeamException: # Create the new team. - description = json['description'] if 'description' in json else '' - role = json['role'] if 'role' in json else 'member' + description = details['description'] if 'description' in details else '' + role = details['role'] if 'role' in details else 'member' org = model.get_organization(orgname) team = model.create_team(teamname, org, role, description) if is_existing: - if 'description' in json: - team.description = json['description'] + if 'description' in details: + team.description = details['description'] team.save() - if 'role' in json: - team = model.set_team_org_permission(team, json['role'], + if 'role' in details: + team = model.set_team_org_permission(team, details['role'], current_user.db_user().username) resp = jsonify(team_view(orgname, team)) @@ -479,12 +479,10 @@ def get_organization_team_members(orgname, teamname): edit_permission = AdministerOrganizationPermission(orgname) if view_permission.can(): - user = current_user.db_user() team = None - try: team = model.get_organization_team(orgname, teamname) - except: + except model.InvalidTeamException: abort(404) members = model.get_organization_team_members(team.id) @@ -508,7 +506,7 @@ def update_organization_team_member(orgname, teamname, membername): # Find the team. try: team = model.get_organization_team(orgname, teamname) - except: + except model.InvalidTeamException: abort(404) # Find the user. @@ -542,23 +540,23 @@ def delete_organization_team_member(orgname, teamname, membername): @api_login_required def create_repo_api(): owner = current_user.db_user() - json = request.get_json() - namespace_name = json['namespace'] if 'namespace' in json else owner.username + req = request.get_json() + namespace_name = req['namespace'] if 'namespace' in req else owner.username permission = CreateRepositoryPermission(namespace_name) if permission.can(): - repository_name = json['repository'] - visibility = json['visibility'] + repository_name = req['repository'] + visibility = req['visibility'] existing = model.get_repository(namespace_name, repository_name) if existing: return make_response('Repository already exists', 400) - visibility = json['visibility'] + visibility = req['visibility'] repo = model.create_repository(namespace_name, repository_name, owner, visibility) - repo.description = json['description'] + repo.description = req['description'] repo.save() return jsonify({ @@ -610,7 +608,7 @@ def list_repos_api(): try: limit = int(limit) if limit else None - except: + except TypeError: limit = None include_public = include_public == 'true' @@ -710,7 +708,7 @@ def get_repo_api(namespace, repository): organization = None try: organization = model.get_organization(namespace) - except: + except model.InvalidOrganizationException: pass permission = ReadRepositoryPermission(namespace, repository) @@ -774,18 +772,6 @@ def get_repo_builds(namespace, repository): abort(403) # Permissions denied - -@app.route('/api/filedrop/', methods=['POST']) -@api_login_required -def get_filedrop_url(): - mime_type = request.get_json()['mimeType'] - (url, file_id) = user_files.prepare_for_drop(mime_type) - return jsonify({ - 'url': url, - 'file_id': file_id - }) - - @app.route('/api/repository//build/', methods=['POST']) @api_login_required @parse_repository_name @@ -813,6 +799,17 @@ def request_repo_build(namespace, repository): abort(403) # Permissions denied +@app.route('/api/filedrop/', methods=['POST']) +@api_login_required +def get_filedrop_url(): + mime_type = request.get_json()['mimeType'] + (url, file_id) = user_files.prepare_for_drop(mime_type) + return jsonify({ + 'url': url, + 'file_id': file_id + }) + + def role_view(repo_perm_obj): return { 'role': repo_perm_obj.role.name, @@ -1276,7 +1273,7 @@ def org_invoices_api(orgname): if not organization.stripe_id: abort(404) - invoices = stripe.Invoice.all(customer=organization.stripe_id, count=12) + invoices = stripe.Invoice.all(customer=organization.stripe_id, count=12) return jsonify({ 'invoices': [invoice_view(i) for i in invoices.data] }) From 7a071fa7310dbf7fa9875339ca1744d33f45312e Mon Sep 17 00:00:00 2001 From: yackob03 Date: Fri, 15 Nov 2013 16:44:33 -0500 Subject: [PATCH 04/10] Query for and take a queue item in a transaction. --- data/queue.py | 31 +++++++++++++++---------------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/data/queue.py b/data/queue.py index 5f292a1d7..e88c56a94 100644 --- a/data/queue.py +++ b/data/queue.py @@ -1,6 +1,6 @@ from datetime import datetime, timedelta -from database import QueueItem +from data.database import QueueItem, db class WorkQueue(object): @@ -34,25 +34,24 @@ class WorkQueue(object): available_or_expired = ((QueueItem.available == True) | (QueueItem.processing_expires <= now)) - # TODO the query and the update should be atomic, but for now we only - # have one worker. - avail = QueueItem.select().where(QueueItem.queue_name == self.queue_name, - QueueItem.available_after <= now, - available_or_expired, - QueueItem.retries_remaining > 0) + with db.transaction(): + avail = QueueItem.select().where(QueueItem.queue_name == self.queue_name, + QueueItem.available_after <= now, + available_or_expired, + QueueItem.retries_remaining > 0) - found = list(avail.limit(1).order_by(QueueItem.available_after)) + found = list(avail.limit(1).order_by(QueueItem.available_after)) - if found: - item = found[0] - item.available = False - item.processing_expires = now + timedelta(seconds=processing_time) - item.retries_remaining -= 1 - item.save() + if found: + item = found[0] + item.available = False + item.processing_expires = now + timedelta(seconds=processing_time) + item.retries_remaining -= 1 + item.save() - return item + return item - return None + return None def complete(self, completed_item): completed_item.delete_instance() From c1ea6263e17d5a7f480b4ac890eaca1747ab01e3 Mon Sep 17 00:00:00 2001 From: yackob03 Date: Fri, 15 Nov 2013 16:45:02 -0500 Subject: [PATCH 05/10] Flesh out the webworkers a bit. --- data/database.py | 12 ++++---- data/model.py | 27 ++++++++++++++++++ endpoints/api.py | 52 +++++++++++++++++++++++++++++++++- test/data/test.db | Bin 97280 -> 97280 bytes workers/webhookworker.py | 59 +++++++++++++++++++++++++++++++++++++++ 5 files changed, 144 insertions(+), 6 deletions(-) create mode 100644 workers/webhookworker.py diff --git a/data/database.py b/data/database.py index e6bda976a..18b6b8bad 100644 --- a/data/database.py +++ b/data/database.py @@ -104,11 +104,6 @@ class Repository(BaseModel): ) -class Webhook(BaseModel): - repository = ForeignKeyField(Repository) - parameters = TextField() - - class Role(BaseModel): name = CharField(index=True) @@ -135,6 +130,13 @@ def random_string_generator(length=16): return random_string +class Webhook(BaseModel): + public_id = CharField(default=random_string_generator(length=64), + unique=True, index=True) + repository = ForeignKeyField(Repository) + parameters = TextField() + + class AccessToken(BaseModel): friendly_name = CharField(null=True) code = CharField(default=random_string_generator(length=64), unique=True, diff --git a/data/model.py b/data/model.py index cfbb9340c..47af6c181 100644 --- a/data/model.py +++ b/data/model.py @@ -2,6 +2,7 @@ import bcrypt import logging import dateutil.parser import operator +import json from database import * from util.validation import * @@ -42,6 +43,10 @@ class InvalidRepositoryBuildException(DataModelException): pass +class InvalidWebhookException(DataModelException): + pass + + def create_user(username, password, email): if not validate_email(email): raise InvalidEmailAddressException('Invalid email address: %s' % email) @@ -935,3 +940,25 @@ def list_repository_builds(namespace_name, repository_name, def create_repository_build(repo, access_token, resource_key, tag): return RepositoryBuild.create(repository=repo, access_token=access_token, resource_key=resource_key, tag=tag) + + +def create_webhook(repo, params_obj): + return Webhook.create(repository=repo, parameters=json.dumps(params_obj)) + + +def get_webhook(namespace_name, repository_name, public_id): + joined = Webhook.select().join(Repository) + found = list(joined.where(Repository.namespace == namespace_name, + Repository.name == repository_name, + Webhook.public_id == public_id)) + + if not found: + raise InvalidWebhookException('No webhook found with id: %s' % public_id) + + return found[0] + + +def list_webhooks(namespace_name, repository_name): + joined = Webhook.select().join(Repository) + return joined.where(Repository.namespace == namespace_name, + Repository.name == repository_name) diff --git a/endpoints/api.py b/endpoints/api.py index e3f2dab82..d9a06e768 100644 --- a/endpoints/api.py +++ b/endpoints/api.py @@ -4,7 +4,7 @@ import requests import urlparse import json -from flask import request, make_response, jsonify, abort +from flask import request, make_response, jsonify, abort, url_for from flask.ext.login import current_user, logout_user from flask.ext.principal import identity_changed, AnonymousIdentity from functools import wraps @@ -799,6 +799,56 @@ def request_repo_build(namespace, repository): abort(403) # Permissions denied +def webhook_view(webhook): + return { + 'public_id': webhook.public_id, + 'parameters': json.loads(webhook.parameters), + } + + +@app.route('/api/repository//webhook/', methods=['POST']) +@api_login_required +@parse_repository_name +def create_webhook(namespace, repository): + permission = ModifyRepositoryPermission(namespace, repository) + if permission.can(): + repo = model.get_repository(namespace, repository) + webhook = model.create_webhook(repo, request.get_json()) + resp = jsonify(webhook_view(webhook)) + repo_string = '%s/%s' % (namespace, repository) + resp.headers['Location'] = url_for('get_webhook', repository=repo_string, + public_id=webhook.public_id) + + abort(403) # Permissions denied + + +@app.route('/api/repository//webhook/', + methods=['GET']) +@api_login_required +@parse_repository_name +def get_webhook(namespace, repository, public_id): + permission = ModifyRepositoryPermission(namespace, repository) + if permission.can(): + webhook = model.get_webhook(namespace, repository, public_id) + return jsonify(webhook_view(webhook)) + + abort(403) # Permission denied + + +@app.route('/api/repository//webhook/', methods=['GET']) +@api_login_required +@parse_repository_name +def list_webhooks(namespace, repository): + permission = ModifyRepositoryPermission(namespace, repository) + if permission.can(): + webhooks = model.list_webhooks(namespace, repository) + return jsonify({ + 'webhooks': [webhook_view(webhook) for webhook in webhooks] + }) + + abort(403) # Permission denied + + @app.route('/api/filedrop/', methods=['POST']) @api_login_required def get_filedrop_url(): diff --git a/test/data/test.db b/test/data/test.db index fc3f5c088710af863c3692ba8f133ab1364f8a62..c300b0c1c1a6981ac4f12223a93f5df684883c52 100644 GIT binary patch delta 469 zcmbu&OOlg7007X{=oO}1xzVaAYY5%VKV_9Lgd|OprU}plR|Ls0Oh{)iAINI%z*60K z0;^`@)&p2B-Fg5IFgx`QUf{j^$Fuv#voEg=`5+ct$|8S?(Z=PPcl4Jv7A7HRTtU=} zl}Fds@@U@{>1vqji=CUAS=-gdOSur|ciO@CaG2njmHK`i4@h3LDobP5;s6IVI-|y8{^5Wu-tY2&D`w1~deUKumLQa>wEIEM zyHEXRGC#l)P$%8+YC)HCgQ;z2xSPV@wd09^ub e3FPNMF(hQjk^nUwiRj_a&u>rP{BQ92*T=sPfR?EM delta 469 zcmbu&J(819007`>^$L!ajm|jEvOM7B0U2imLI@$@@lU{t0FoENLI~`Vt@aL-YU2r< z*~ZobIF_~^zyoZjhw!_=cmH^C|9J8Btq7w%F__$^nl#?f#9o|BRcd?cDr$~5N!@G& zwi#q?k8~3yJr#b~&SoVUTk#bPb_0qGx42uZYrF4MK+F&76Do1(nDG2hcXzn zb27e}zJy0*AgK*cp&x=Ob5Y1IvlRw|vn`984BKn9D?HCu+Eg~|;!fwfE+gYl{A~O@ zeGZehB5deV!nGWC`uwOuv*vK!ltfth2WvvBM3H;kO@c#ZO@iBaZzJ;XXZr2qdlh3C z^N|@Z<&|b}$W#5mSp(1HJ%1j|Bh3qS({mjZFtK6!TFj`vZ*X|Nv?a%2S5Fuku$KT_ e0{8+zk>f=gP!P~Gq#yqN`u_CY{{~-vfBFaMH=6AL diff --git a/workers/webhookworker.py b/workers/webhookworker.py new file mode 100644 index 000000000..73e549138 --- /dev/null +++ b/workers/webhookworker.py @@ -0,0 +1,59 @@ +import logging +import daemon +import argparse +import requests + +from data.queue import webhook_queue +from workers.worker import Worker + + +root_logger = logging.getLogger('') +root_logger.setLevel(logging.DEBUG) + +FORMAT = '%(asctime)-15s - %(levelname)s - %(pathname)s - %(funcName)s - %(message)s' +formatter = logging.Formatter(FORMAT) + +logger = logging.getLogger(__name__) + + +class WebhookWorker(Worker): + def process_queue_item(self, job_details): + url = job_details['url'] + payload = job_details['payload'] + + try: + resp = requests.post(url, data=payload) + if resp.status_code/100 != 2: + logger.error('%s response for webhook to url: %s' % (resp.status_code, + url)) + return False + except requests.exceptions.RequestException as ex: + logger.exception('Webhook was unable to be sent: %s' % ex.message) + return False + + return True + + +parser = argparse.ArgumentParser(description='Worker daemon to send webhooks') +parser.add_argument('-D', action='store_true', default=False, + help='Run the worker in daemon mode.') +parser.add_argument('--log', default='webhooks.log', + help='Specify the log file for the worker as a daemon.') +args = parser.parse_args() + + +worker = WebhookWorker(webhook_queue, poll_period_seconds=15, + reservation_seconds=3600) + +if args.D: + handler = logging.FileHandler(args.log) + handler.setFormatter(formatter) + root_logger.addHandler(handler) + with daemon.DaemonContext(files_preserve=[handler.stream]): + worker.start() + +else: + handler = logging.StreamHandler() + handler.setFormatter(formatter) + root_logger.addHandler(handler) + worker.start() \ No newline at end of file From ecc5f8fba712aa61cb8d6e84b5c4d4847666eda7 Mon Sep 17 00:00:00 2001 From: yackob03 Date: Fri, 15 Nov 2013 17:45:37 -0500 Subject: [PATCH 06/10] Wire up webhooks to the UI. --- data/database.py | 2 +- data/model.py | 5 ++++ endpoints/api.py | 20 +++++++++++++--- static/css/quay.css | 6 +++-- static/js/controllers.js | 27 ++++++++++++++++++++- static/partials/repo-admin.html | 41 ++++++++++++++++++++++++++++++++ test/data/test.db | Bin 97280 -> 100352 bytes 7 files changed, 94 insertions(+), 7 deletions(-) diff --git a/data/database.py b/data/database.py index 674aeebe8..1a8291d5a 100644 --- a/data/database.py +++ b/data/database.py @@ -212,4 +212,4 @@ class QueueItem(BaseModel): all_models = [User, Repository, Image, AccessToken, Role, RepositoryPermission, Visibility, RepositoryTag, EmailConfirmation, FederatedLogin, LoginService, QueueItem, - RepositoryBuild, Team, TeamMember, TeamRole] + RepositoryBuild, Team, TeamMember, TeamRole, Webhook] diff --git a/data/model.py b/data/model.py index 9d10b19b3..28c910d28 100644 --- a/data/model.py +++ b/data/model.py @@ -973,3 +973,8 @@ def list_webhooks(namespace_name, repository_name): joined = Webhook.select().join(Repository) return joined.where(Repository.namespace == namespace_name, Repository.name == repository_name) + + +def delete_webhook(namespace_name, repository_name, public_id): + webhook = get_webhook(namespace_name, repository_name, public_id) + webhook.delete_instance() diff --git a/endpoints/api.py b/endpoints/api.py index 1d95bacc4..bab7ee9f0 100644 --- a/endpoints/api.py +++ b/endpoints/api.py @@ -841,7 +841,7 @@ def webhook_view(webhook): @api_login_required @parse_repository_name def create_webhook(namespace, repository): - permission = ModifyRepositoryPermission(namespace, repository) + permission = AdministerRepositoryPermission(namespace, repository) if permission.can(): repo = model.get_repository(namespace, repository) webhook = model.create_webhook(repo, request.get_json()) @@ -849,6 +849,7 @@ def create_webhook(namespace, repository): repo_string = '%s/%s' % (namespace, repository) resp.headers['Location'] = url_for('get_webhook', repository=repo_string, public_id=webhook.public_id) + return resp abort(403) # Permissions denied @@ -858,7 +859,7 @@ def create_webhook(namespace, repository): @api_login_required @parse_repository_name def get_webhook(namespace, repository, public_id): - permission = ModifyRepositoryPermission(namespace, repository) + permission = AdministerRepositoryPermission(namespace, repository) if permission.can(): webhook = model.get_webhook(namespace, repository, public_id) return jsonify(webhook_view(webhook)) @@ -870,7 +871,7 @@ def get_webhook(namespace, repository, public_id): @api_login_required @parse_repository_name def list_webhooks(namespace, repository): - permission = ModifyRepositoryPermission(namespace, repository) + permission = AdministerRepositoryPermission(namespace, repository) if permission.can(): webhooks = model.list_webhooks(namespace, repository) return jsonify({ @@ -880,6 +881,19 @@ def list_webhooks(namespace, repository): abort(403) # Permission denied +@app.route('/api/repository//webhook/', + methods=['DELETE']) +@api_login_required +@parse_repository_name +def delete_webhook(namespace, repository, public_id): + permission = AdministerRepositoryPermission(namespace, repository) + if permission.can(): + model.delete_webhook(namespace, repository, public_id) + return make_response('No Content', 204) + + abort(403) # Permission denied + + @app.route('/api/filedrop/', methods=['POST']) @api_login_required def get_filedrop_url(): diff --git a/static/css/quay.css b/static/css/quay.css index e466e43d6..d2ba879f9 100644 --- a/static/css/quay.css +++ b/static/css/quay.css @@ -524,11 +524,13 @@ font-size: .4em; } -form input.ng-invalid.ng-dirty { +form input.ng-invalid.ng-dirty, +*[ng-form] input.ng-invalid.ng-dirty { background-color: #FDD7D9; } -form input.ng-valid.ng-dirty { +form input.ng-valid.ng-dirty, +*[ng-form] input.ng-valid.ng-dirty { background-color: #DDFFEE; } diff --git a/static/js/controllers.js b/static/js/controllers.js index e4e3d5d4f..696b2b4e9 100644 --- a/static/js/controllers.js +++ b/static/js/controllers.js @@ -559,7 +559,7 @@ function RepoAdminCtrl($scope, Restangular, $routeParams, $rootScope) { }); }; - $scope.roles = [ + $scope.roles = [ { 'id': 'read', 'title': 'Read', 'kind': 'success' }, { 'id': 'write', 'title': 'Write', 'kind': 'success' }, { 'id': 'admin', 'title': 'Admin', 'kind': 'primary' } @@ -700,6 +700,31 @@ function RepoAdminCtrl($scope, Restangular, $routeParams, $rootScope) { $scope.loading = false; }); + $scope.webhooksLoading = true; + $scope.loadWebhooks = function() { + $scope.webhooksLoading = true; + var fetchWebhooks = Restangular.one('repository/' + namespace + '/' + name + '/webhook/'); + fetchWebhooks.get().then(function(resp) { + $scope.webhooks = resp.webhooks; + $scope.webhooksLoading = false; + }); + }; + + $scope.createWebhook = function() { + var newWebhook = Restangular.one('repository/' + namespace + '/' + name + '/webhook/'); + newWebhook.customPOST($scope.newWebhook).then(function(resp) { + $scope.webhooks.push(resp); + $scope.newWebhook.url = ''; + $scope.newWebhookForm.$setPristine(); + }); + }; + + $scope.deleteWebhook = function(webhook) { + var deleteWebhookReq = Restangular.one('repository/' + namespace + '/' + name + '/webhook/' + webhook.public_id); + deleteWebhookReq.customDELETE().then(function(resp) { + $scope.webhooks.splice($scope.webhooks.indexOf(webhook), 1); + }); + }; } function UserAdminCtrl($scope, $timeout, $location, Restangular, PlanService, UserService, KeyService, $routeParams) { diff --git a/static/partials/repo-admin.html b/static/partials/repo-admin.html index ada422d74..e0846a6a5 100644 --- a/static/partials/repo-admin.html +++ b/static/partials/repo-admin.html @@ -23,6 +23,7 @@
@@ -148,6 +149,46 @@
+ +
+
+ Loading webhooks: +
+ +
+
+ + + + + + + + + + + + + + + + + + +
Webhook URL
{{ webhook.parameters.url }} + + + + +
+ + + +
+
+
+
+
diff --git a/test/data/test.db b/test/data/test.db index d35f34afe8ac55fb1ec13452d0b6cf95d409ba99..6a2cb0c869049a5802a332d49c5cb6c7124f7aea 100644 GIT binary patch delta 963 zcmbu+O>YuG7zc0$u&6cB#+s@`vyy17u?q`3ER?7Py1>FNurENNYRkeFwk)s(wzl?HGhGBcU~Gs)cfcHqu~kq6GP zg2iHy(fl*6J-qA#g%?Y^L0I&6`BW*IW4N~0;P`g0v8vNb!OKSRQZO;^^{nM#`5aGkUKh1R_d6NQj9*rQ7p2y+Wg^d3+VTh_4oxr8-&k72+)pUx|m^(XvsGDikcN z@{P@=ITgA9@gSSkm{LWrZfMDP3395T&_+$vy>qQ5F4U-c+P&6P!|OG^yHQW*ftuSV zn(s0v&Bq}pOtqA5mTOjw7_%Y=3@IhmDW5_6jV@HHtwNkt4Xx+Sb2(}*&vdKHd`yn1 zgD;_1j}|gSF(JX22L_|LKrrTEgmeT(B%YZIL`jm)Q#_pr^I=BJv!!GV@???Dksg-g zQZVWDQ_P6hL-?i%Vw!*$LCuiljEBN07$k$#VDIA_$L#+$*#10v5|kh6;{)v56C1xY zi~f2sgt$Q=u6q#+(}==*3`=1w4^zE?!?lce*32` zER){Pc3~G?e@~{i@>d)aPA9l)>y>3q8QvWhX81ZO9AWAG1oJ&MIr2Ng41Yaie&3Cp o*2|r8Q|T$4t_zdsc?nBr(9E&y)Q)@>JOj_|<41Pv82GX96a5As2LJ#7 delta 685 zcmbu+Pfycu7zglm1ynFm5)K%mnFlo(*xGKtw%a_=c4hs?`gdD9)Une2DU<@68w@TS zJwkTaLB046bTQ+IS1(>n3^!kZFMxvy{y6vwe3Iw$y?vj+v9v>y2VbO?$mvjM9h>?~ziQ=V;NXwR&7v(4Fp^ z1Ah=^o`Jn1pHlTWlkfp+9I3KOiuJ>K3)^P$@h%H&+u*x_?$_9Ype8Ynb4ZaR2H!Hz z0p-X7pEHw34T`Y4iN09e1?_HYRZr)|!*tjp`eg; Date: Fri, 15 Nov 2013 17:51:20 -0500 Subject: [PATCH 07/10] Add a panel header to the webhooks panel. --- static/partials/repo-admin.html | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/static/partials/repo-admin.html b/static/partials/repo-admin.html index e0846a6a5..d9016b83a 100644 --- a/static/partials/repo-admin.html +++ b/static/partials/repo-admin.html @@ -146,17 +146,20 @@
-
-
- Loading webhooks: -
+
+
Push Webhooks + +
-
-
+
+ Loading webhooks: +
+ +
From e787d8b2d89c99a785139318022fde276345babd Mon Sep 17 00:00:00 2001 From: yackob03 Date: Sat, 16 Nov 2013 15:05:26 -0500 Subject: [PATCH 08/10] Make the worker post json. Add a retry timeout after an incompletel queue item is processed. Submit webhook jobs to the queue on a successful push. --- README.md | 1 + data/queue.py | 4 +++- endpoints/index.py | 26 ++++++++++++++++++++++++-- workers/webhookworker.py | 4 +++- 4 files changed, 31 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 5a43f8325..35ea35a87 100644 --- a/README.md +++ b/README.md @@ -32,6 +32,7 @@ start the workers: ``` STACK=prod python -m workers.diffsworker -D STACK=prod python -m workers.dockerfilebuild -D +STACK=prod python -m workers.webhookworker -D ``` bouncing the servers: diff --git a/data/queue.py b/data/queue.py index e88c56a94..ef0026e52 100644 --- a/data/queue.py +++ b/data/queue.py @@ -56,7 +56,9 @@ class WorkQueue(object): def complete(self, completed_item): completed_item.delete_instance() - def incomplete(self, incomplete_item): + def incomplete(self, incomplete_item, retry_after=300): + retry_date = datetime.now() + timedelta(seconds=retry_after) + incomplete_item.available_after = retry_date incomplete_item.available = True incomplete_item.save() diff --git a/endpoints/index.py b/endpoints/index.py index 0fb1ad990..1fa05197a 100644 --- a/endpoints/index.py +++ b/endpoints/index.py @@ -6,6 +6,7 @@ from flask import request, make_response, jsonify, abort from functools import wraps from data import model +from data.queue import webhook_queue from app import app, mixpanel from auth.auth import (process_auth, get_authenticated_user, get_validated_token) @@ -178,18 +179,39 @@ def update_images(namespace, repository): permission = ModifyRepositoryPermission(namespace, repository) if permission.can(): - repository = model.get_repository(namespace, repository) - if not repository: + repo = model.get_repository(namespace, repository) + if not repo: # Make sure the repo actually exists. abort(404) image_with_checksums = json.loads(request.data) + updated_tags = {} for image in image_with_checksums: logger.debug('Setting checksum for image id: %s to %s' % (image['id'], image['checksum'])) + updated_tags[image['Tag']] = image['id'] model.set_image_checksum(image['id'], repository, image['checksum']) + # Generate a job for each webhook that has been added to this repo + webhooks = model.list_webhooks(namespace, repository) + for webhook in webhooks: + webhook_data = json.loads(webhook.parameters) + repo_string = '%s/%s' % (namespace, repository) + logger.debug('Creating webhook for repository \'%s\' for url \'%s\'' % + (repo_string, webhook_data['url'])) + webhook_data['payload'] = { + 'repository': repo_string, + 'namespace': namespace, + 'name': repository, + 'docker_url': 'quay.io/%s' % repo_string, + 'homepage': 'https://quay.io/repository/%s' % repo_string, + 'visibility': repo.visibility.name, + 'updated_tags': updated_tags, + 'pushed_image_count': len(image_with_checksums), + } + webhook_queue.put(json.dumps(webhook_data)) + return make_response('Updated', 204) abort(403) diff --git a/workers/webhookworker.py b/workers/webhookworker.py index 73e549138..575c11f41 100644 --- a/workers/webhookworker.py +++ b/workers/webhookworker.py @@ -2,6 +2,7 @@ import logging import daemon import argparse import requests +import json from data.queue import webhook_queue from workers.worker import Worker @@ -20,9 +21,10 @@ class WebhookWorker(Worker): def process_queue_item(self, job_details): url = job_details['url'] payload = job_details['payload'] + headers = {'Content-type': 'application/json'} try: - resp = requests.post(url, data=payload) + resp = requests.post(url, data=json.dumps(payload), headers=headers) if resp.status_code/100 != 2: logger.error('%s response for webhook to url: %s' % (resp.status_code, url)) From 1f4834ad97c51b5cabfdccd08169a03a69390abd Mon Sep 17 00:00:00 2001 From: yackob03 Date: Sun, 17 Nov 2013 17:24:56 -0500 Subject: [PATCH 09/10] Add a tool to compute our monthly revenue. --- tools/__init__.py | 0 tools/monthlyrevenue.py | 19 +++++++++++++++++++ 2 files changed, 19 insertions(+) create mode 100644 tools/__init__.py create mode 100644 tools/monthlyrevenue.py diff --git a/tools/__init__.py b/tools/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tools/monthlyrevenue.py b/tools/monthlyrevenue.py new file mode 100644 index 000000000..9472eeda6 --- /dev/null +++ b/tools/monthlyrevenue.py @@ -0,0 +1,19 @@ +from app import stripe + +EXCLUDE_CID = {'cus_2iVlmwz8CpHgOj'} + +offset = 0 +total_monthly_revenue = 0 + +batch = stripe.Customer.all(count=100, offset=offset) +while batch.data: + for cust in batch.data: + if cust.id not in EXCLUDE_CID and cust.subscription: + sub = cust.subscription + total_monthly_revenue += sub.plan.amount * sub.quantity + offset += len(batch.data) + batch = stripe.Customer.all(count=100, offset=offset) + +dollars = total_monthly_revenue / 100 +cents = total_monthly_revenue % 100 +print 'Monthly revenue: $%d.%02d' % (dollars, cents) From 98a77299b8a2767ed3accc629d480c795aee74e8 Mon Sep 17 00:00:00 2001 From: yackob03 Date: Mon, 18 Nov 2013 12:12:35 -0500 Subject: [PATCH 10/10] Fix a bug in the registry which tried to use a string as a repo object. --- endpoints/index.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/endpoints/index.py b/endpoints/index.py index 1fa05197a..207ace697 100644 --- a/endpoints/index.py +++ b/endpoints/index.py @@ -191,7 +191,7 @@ def update_images(namespace, repository): logger.debug('Setting checksum for image id: %s to %s' % (image['id'], image['checksum'])) updated_tags[image['Tag']] = image['id'] - model.set_image_checksum(image['id'], repository, image['checksum']) + model.set_image_checksum(image['id'], repo, image['checksum']) # Generate a job for each webhook that has been added to this repo webhooks = model.list_webhooks(namespace, repository)