diff --git a/application.py b/application.py index 26101f50b..4cde6d9e8 100644 --- a/application.py +++ b/application.py @@ -12,6 +12,7 @@ from endpoints.web import web from endpoints.tags import tags from endpoints.registry import registry from endpoints.webhooks import webhooks +from endpoints.realtime import realtime logger = logging.getLogger(__name__) @@ -26,6 +27,7 @@ application.register_blueprint(tags, url_prefix='/v1') application.register_blueprint(registry, url_prefix='/v1') application.register_blueprint(api, url_prefix='/api') application.register_blueprint(webhooks, url_prefix='/webhooks') +application.register_blueprint(realtime, url_prefix='/realtime') # Remove this for prod config application.debug = True diff --git a/endpoints/realtime.py b/endpoints/realtime.py new file mode 100644 index 000000000..97388056f --- /dev/null +++ b/endpoints/realtime.py @@ -0,0 +1,96 @@ +import logging +import redis + +from functools import wraps +from flask import request, make_response, Blueprint, abort, Response +from flask.ext.login import current_user, logout_user +from data import model +from app import app + +logger = logging.getLogger(__name__) + +realtime = Blueprint('realtime', __name__) + +def api_login_required(f): + @wraps(f) + def decorated_view(*args, **kwargs): + if not current_user.is_authenticated(): + abort(401) + + if (current_user and current_user.db_user() and + current_user.db_user().organization): + abort(401) + + if (current_user and current_user.db_user() and + current_user.db_user().robot): + abort(401) + + return f(*args, **kwargs) + return decorated_view + + +# Based off of the SSE flask snippet here: http://flask.pocoo.org/snippets/116/ + +class ServerSentEvent(object): + def __init__(self, data): + self.data = data + self.event = None + self.id = None + self.desc_map = { + self.data : "data", + self.event : "event", + self.id : "id" + } + + def encode(self): + if not self.data: + return "" + lines = ["%s: %s" % (v, k) + for k, v in self.desc_map.iteritems() if k] + + return "%s\n\n" % "\n".join(lines) + +# The current subscriptions +subscriptions = [] + +@realtime.route("/") +def index(): + debug_template = """ + + + + +

Server sent events

+
+ + + + """ + return(debug_template) + + +@realtime.route("/subscribe") +@api_login_required +def subscribe(): + def gen(): + q = Queue() + subscriptions.append(q) + try: + while True: + result = q.get() + ev = ServerSentEvent(str(result)) + yield ev.encode() + except GeneratorExit: + subscriptions.remove(q) + + return Response(gen(), mimetype="text/event-stream")