This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/endpoints/notificationhelper.py

63 lines
2.2 KiB
Python
Raw Normal View History

from app import app, notification_queue
from data import model
from auth.auth_context import get_authenticated_user, get_validated_oauth_token
import json
def build_event_data(repo, extra_data={}, subpage=None):
repo_string = '%s/%s' % (repo.namespace_user.username, repo.name)
homepage = '%s://%s/repository/%s' % (app.config['PREFERRED_URL_SCHEME'],
app.config['SERVER_HOSTNAME'],
repo_string)
if subpage:
if not subpage.startswith('/'):
subpage = '/' + subpage
homepage = homepage + subpage
event_data = {
'repository': repo_string,
'namespace': repo.namespace_user.username,
'name': repo.name,
'docker_url': '%s/%s' % (app.config['SERVER_HOSTNAME'], repo_string),
'homepage': homepage,
'visibility': repo.visibility.name
}
event_data.update(extra_data)
return event_data
def build_notification_data(notification, event_data, performer_data=None):
if not performer_data:
performer_data = {}
oauth_token = get_validated_oauth_token()
if oauth_token:
performer_data['oauth_token_id'] = oauth_token.id
performer_data['oauth_token_application_id'] = oauth_token.application.client_id
performer_data['oauth_token_application'] = oauth_token.application.name
performer_user = get_authenticated_user()
if performer_user:
performer_data['entity_id'] = performer_user.id
performer_data['entity_name'] = performer_user.username
return {
'notification_uuid': notification.uuid,
'event_data': event_data,
'performer_data': performer_data,
}
def spawn_notification(repo, event_name, extra_data={}, subpage=None, pathargs=[],
performer_data=None):
event_data = build_event_data(repo, extra_data=extra_data, subpage=subpage)
notifications = model.list_repo_notifications(repo.namespace_user.username, repo.name,
event_name=event_name)
for notification in list(notifications):
notification_data = build_notification_data(notification, event_data, performer_data)
path = [repo.namespace_user.username, repo.name, event_name] + pathargs
notification_queue.put(path, json.dumps(notification_data))