- Add web hook queue code back in. We'll remove it and turn it off after this CL goes to prod
- Make notification lookup always be by repo and its UUID, rather than the internal DB ID - Add the init script for the notification worker
This commit is contained in:
parent
1c7d72914b
commit
49801bc2c4
11 changed files with 37 additions and 56 deletions
|
@ -36,6 +36,9 @@ ADD conf/init/runmigration.sh /etc/my_init.d/
|
||||||
ADD conf/init/gunicorn /etc/service/gunicorn
|
ADD conf/init/gunicorn /etc/service/gunicorn
|
||||||
ADD conf/init/nginx /etc/service/nginx
|
ADD conf/init/nginx /etc/service/nginx
|
||||||
ADD conf/init/diffsworker /etc/service/diffsworker
|
ADD conf/init/diffsworker /etc/service/diffsworker
|
||||||
|
ADD conf/init/notificationworker /etc/service/notificationworker
|
||||||
|
|
||||||
|
# TODO: Remove this after the prod CL push
|
||||||
ADD conf/init/webhookworker /etc/service/webhookworker
|
ADD conf/init/webhookworker /etc/service/webhookworker
|
||||||
|
|
||||||
# Download any external libs.
|
# Download any external libs.
|
||||||
|
|
3
app.py
3
app.py
|
@ -80,6 +80,9 @@ dockerfile_build_queue = WorkQueue(app.config['DOCKERFILE_BUILD_QUEUE_NAME'], tf
|
||||||
reporter=queue_metrics.report)
|
reporter=queue_metrics.report)
|
||||||
notification_queue = WorkQueue(app.config['NOTIFICATION_QUEUE_NAME'], tf)
|
notification_queue = WorkQueue(app.config['NOTIFICATION_QUEUE_NAME'], tf)
|
||||||
|
|
||||||
|
# TODO: Remove this in the prod push following the notifications change.
|
||||||
|
webhook_queue = WorkQueue(app.config['WEBHOOK_QUEUE_NAME'], tf)
|
||||||
|
|
||||||
database.configure(app.config)
|
database.configure(app.config)
|
||||||
model.config.app_config = app.config
|
model.config.app_config = app.config
|
||||||
model.config.store = storage
|
model.config.store = storage
|
||||||
|
|
2
conf/init/notificationworker/log/run
Executable file
2
conf/init/notificationworker/log/run
Executable file
|
@ -0,0 +1,2 @@
|
||||||
|
#!/bin/sh
|
||||||
|
exec svlogd -t /var/log/notificationworker/
|
8
conf/init/notificationworker/run
Executable file
8
conf/init/notificationworker/run
Executable file
|
@ -0,0 +1,8 @@
|
||||||
|
#! /bin/bash
|
||||||
|
|
||||||
|
echo 'Starting notification worker'
|
||||||
|
|
||||||
|
cd /
|
||||||
|
venv/bin/python -m workers.notificationworker
|
||||||
|
|
||||||
|
echo 'Notification worker exited'
|
|
@ -125,6 +125,9 @@ class DefaultConfig(object):
|
||||||
DIFFS_QUEUE_NAME = 'imagediff'
|
DIFFS_QUEUE_NAME = 'imagediff'
|
||||||
DOCKERFILE_BUILD_QUEUE_NAME = 'dockerfilebuild'
|
DOCKERFILE_BUILD_QUEUE_NAME = 'dockerfilebuild'
|
||||||
|
|
||||||
|
# TODO: Remove this in the prod push following the notifications change.
|
||||||
|
WEBHOOK_QUEUE_NAME = 'webhook'
|
||||||
|
|
||||||
# Super user config. Note: This MUST BE an empty list for the default config.
|
# Super user config. Note: This MUST BE an empty list for the default config.
|
||||||
SUPER_USERS = []
|
SUPER_USERS = []
|
||||||
|
|
||||||
|
|
|
@ -56,10 +56,6 @@ class InvalidRepositoryBuildException(DataModelException):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
class InvalidWebhookException(DataModelException):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class InvalidNotificationException(DataModelException):
|
class InvalidNotificationException(DataModelException):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
@ -1552,13 +1548,6 @@ def create_repo_notification(repo, event_name, method_name, config):
|
||||||
config_json=json.dumps(config))
|
config_json=json.dumps(config))
|
||||||
|
|
||||||
|
|
||||||
def lookup_repo_notification(notification_id):
|
|
||||||
try:
|
|
||||||
return RepositoryNotification.get(RepositoryNotification.id == notification_id)
|
|
||||||
except RepositoryNotification.DoesNotExist:
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
def get_repo_notification(namespace_name, repository_name, uuid):
|
def get_repo_notification(namespace_name, repository_name, uuid):
|
||||||
joined = RepositoryNotification.select().join(Repository)
|
joined = RepositoryNotification.select().join(Repository)
|
||||||
found = list(joined.where(Repository.namespace == namespace_name,
|
found = list(joined.where(Repository.namespace == namespace_name,
|
||||||
|
@ -1588,34 +1577,6 @@ def list_repo_notifications(namespace_name, repository_name, event_name=None):
|
||||||
|
|
||||||
return where
|
return where
|
||||||
|
|
||||||
# TODO: remove webhook methods when no longer used.
|
|
||||||
def create_webhook(repo, params_obj):
|
|
||||||
return Webhook.create(repository=repo, parameters=json.dumps(params_obj))
|
|
||||||
|
|
||||||
|
|
||||||
def get_webhook(namespace_name, repository_name, public_id):
|
|
||||||
joined = Webhook.select().join(Repository)
|
|
||||||
found = list(joined.where(Repository.namespace == namespace_name,
|
|
||||||
Repository.name == repository_name,
|
|
||||||
Webhook.public_id == public_id))
|
|
||||||
|
|
||||||
if not found:
|
|
||||||
raise InvalidWebhookException('No webhook found with id: %s' % public_id)
|
|
||||||
|
|
||||||
return found[0]
|
|
||||||
|
|
||||||
|
|
||||||
def list_webhooks(namespace_name, repository_name):
|
|
||||||
joined = Webhook.select().join(Repository)
|
|
||||||
return joined.where(Repository.namespace == namespace_name,
|
|
||||||
Repository.name == repository_name)
|
|
||||||
|
|
||||||
|
|
||||||
def delete_webhook(namespace_name, repository_name, public_id):
|
|
||||||
webhook = get_webhook(namespace_name, repository_name, public_id)
|
|
||||||
webhook.delete_instance()
|
|
||||||
return webhook
|
|
||||||
|
|
||||||
|
|
||||||
def list_logs(start_time, end_time, performer=None, repository=None, namespace=None):
|
def list_logs(start_time, end_time, performer=None, repository=None, namespace=None):
|
||||||
joined = LogEntry.select().join(User)
|
joined = LogEntry.select().join(User)
|
||||||
|
|
|
@ -8,6 +8,7 @@ from endpoints.api import (RepositoryParamResource, nickname, resource, require_
|
||||||
from endpoints.notificationevent import NotificationEvent
|
from endpoints.notificationevent import NotificationEvent
|
||||||
from endpoints.notificationmethod import (NotificationMethod,
|
from endpoints.notificationmethod import (NotificationMethod,
|
||||||
CannotValidateNotificationMethodException)
|
CannotValidateNotificationMethodException)
|
||||||
|
from endpoints.notificationhelper import build_notification_data
|
||||||
from data import model
|
from data import model
|
||||||
|
|
||||||
|
|
||||||
|
@ -134,11 +135,7 @@ class TestRepositoryNotification(RepositoryParamResource):
|
||||||
|
|
||||||
event_info = NotificationEvent.get_event(notification.event.name)
|
event_info = NotificationEvent.get_event(notification.event.name)
|
||||||
sample_data = event_info.get_sample_data(repository=notification.repository)
|
sample_data = event_info.get_sample_data(repository=notification.repository)
|
||||||
notification_data = {
|
notification_data = build_notification_data(notification, sample_data)
|
||||||
'notification_id': notification.id,
|
|
||||||
'repository_id': notification.repository.id,
|
|
||||||
'event_data': sample_data
|
|
||||||
}
|
|
||||||
notification_queue.put([namespace, repository, notification.event.name],
|
notification_queue.put([namespace, repository, notification.event.name],
|
||||||
json.dumps(notification_data))
|
json.dumps(notification_data))
|
||||||
|
|
||||||
|
|
|
@ -71,7 +71,7 @@ class RepoPushEvent(NotificationEvent):
|
||||||
Tags Updated: %s
|
Tags Updated: %s
|
||||||
""" % (event_data['homepage'],
|
""" % (event_data['homepage'],
|
||||||
event_data['repository'],
|
event_data['repository'],
|
||||||
event_data['updated_tags'])
|
', '.join(event_data['updated_tags']))
|
||||||
|
|
||||||
return html
|
return html
|
||||||
|
|
||||||
|
|
|
@ -27,17 +27,20 @@ def build_event_data(repo, extra_data={}, subpage=None):
|
||||||
event_data.update(extra_data)
|
event_data.update(extra_data)
|
||||||
return event_data
|
return event_data
|
||||||
|
|
||||||
|
def build_notification_data(notification, event_data):
|
||||||
|
return {
|
||||||
|
'notification_uuid': notification.uuid,
|
||||||
|
'repository_namespace': notification.repository.namespace,
|
||||||
|
'repository_name': notification.repository.name,
|
||||||
|
'event_data': event_data
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
def spawn_notification(repo, event_name, extra_data={}, subpage=None, pathargs=[]):
|
def spawn_notification(repo, event_name, extra_data={}, subpage=None, pathargs=[]):
|
||||||
event_data = build_event_data(repo, extra_data=extra_data, subpage=subpage)
|
event_data = build_event_data(repo, extra_data=extra_data, subpage=subpage)
|
||||||
|
|
||||||
notifications = model.list_repo_notifications(repo.namespace, repo.name, event_name=event_name)
|
notifications = model.list_repo_notifications(repo.namespace, repo.name, event_name=event_name)
|
||||||
for notification in notifications:
|
for notification in notifications:
|
||||||
notification_data = {
|
notification_data = build_notification_data(notification, event_data)
|
||||||
'notification_id': notification.id,
|
|
||||||
'repository_id': repo.id,
|
|
||||||
'event_data': event_data
|
|
||||||
}
|
|
||||||
|
|
||||||
path = [repo.namespace, repo.name, event_name] + pathargs
|
path = [repo.namespace, repo.name, event_name] + pathargs
|
||||||
notification_queue.put(path, json.dumps(notification_data))
|
notification_queue.put(path, json.dumps(notification_data))
|
||||||
|
|
|
@ -100,8 +100,7 @@ class QuayNotificationMethod(NotificationMethod):
|
||||||
|
|
||||||
|
|
||||||
def perform(self, notification, event_handler, notification_data):
|
def perform(self, notification, event_handler, notification_data):
|
||||||
repository_id = notification_data['repository_id']
|
repository = notification.repository
|
||||||
repository = model.lookup_repository(repository_id)
|
|
||||||
if not repository:
|
if not repository:
|
||||||
# Probably deleted.
|
# Probably deleted.
|
||||||
return True
|
return True
|
||||||
|
|
|
@ -22,9 +22,11 @@ logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
class NotificationWorker(Worker):
|
class NotificationWorker(Worker):
|
||||||
def process_queue_item(self, job_details):
|
def process_queue_item(self, job_details):
|
||||||
notification_id = job_details['notification_id'];
|
notification_uuid = job_details['notification_uuid'];
|
||||||
notification = model.lookup_repo_notification(notification_id)
|
repo_namespace = job_details['repository_namespace']
|
||||||
|
repo_name = job_details['repository_name']
|
||||||
|
|
||||||
|
notification = model.get_repo_notification(repo_namespace, repo_name, notification_uuid)
|
||||||
if not notification:
|
if not notification:
|
||||||
# Probably deleted.
|
# Probably deleted.
|
||||||
return True
|
return True
|
||||||
|
|
Reference in a new issue