diff --git a/config.py b/config.py index 9be2c86a6..683e13d0f 100644 --- a/config.py +++ b/config.py @@ -7,6 +7,7 @@ from storage.s3 import S3Storage from storage.local import LocalStorage from data.userfiles import UserRequestFiles from data.buildlogs import BuildLogs +from data.userevent import UserEventBuilder from util import analytics from test.teststorage import FakeStorage, FakeUserfiles @@ -91,6 +92,10 @@ class RedisBuildLogs(object): BUILDLOGS = BuildLogs('logs.quay.io') +class UserEventConfig(object): + USER_EVENTS = UserEventBuilder('logs.quay.io') + + class StripeTestConfig(object): STRIPE_SECRET_KEY = 'sk_test_PEbmJCYrLXPW0VRLSnWUiZ7Y' STRIPE_PUBLISHABLE_KEY = 'pk_test_uEDHANKm9CHCvVa2DLcipGRh' @@ -154,7 +159,8 @@ def logs_init_builder(level=logging.DEBUG): class TestConfig(FlaskConfig, FakeStorage, EphemeralDB, FakeUserfiles, - FakeAnalytics, StripeTestConfig, RedisBuildLogs): + FakeAnalytics, StripeTestConfig, RedisBuildLogs, + UserEventConfig): LOGGING_CONFIG = logs_init_builder(logging.WARN) POPULATE_DB_TEST_DATA = True TESTING = True @@ -164,7 +170,7 @@ class TestConfig(FlaskConfig, FakeStorage, EphemeralDB, FakeUserfiles, class DebugConfig(FlaskConfig, MailConfig, LocalStorage, SQLiteDB, StripeTestConfig, MixpanelTestConfig, GitHubTestConfig, DigitalOceanConfig, BuildNodeConfig, S3Userfiles, - RedisBuildLogs): + RedisBuildLogs, UserEventConfig): LOGGING_CONFIG = logs_init_builder() SEND_FILE_MAX_AGE_DEFAULT = 0 POPULATE_DB_TEST_DATA = True @@ -174,7 +180,8 @@ class DebugConfig(FlaskConfig, MailConfig, LocalStorage, SQLiteDB, class LocalHostedConfig(FlaskConfig, MailConfig, S3Storage, RDSMySQL, StripeLiveConfig, MixpanelTestConfig, GitHubProdConfig, DigitalOceanConfig, - BuildNodeConfig, S3Userfiles, RedisBuildLogs): + BuildNodeConfig, S3Userfiles, RedisBuildLogs, + UserEventConfig): LOGGING_CONFIG = logs_init_builder() SEND_FILE_MAX_AGE_DEFAULT = 0 @@ -182,7 +189,7 @@ class LocalHostedConfig(FlaskConfig, MailConfig, S3Storage, RDSMySQL, class ProductionConfig(FlaskProdConfig, MailConfig, S3Storage, RDSMySQL, StripeLiveConfig, MixpanelProdConfig, GitHubProdConfig, DigitalOceanConfig, BuildNodeConfig, - S3Userfiles, RedisBuildLogs): + S3Userfiles, RedisBuildLogs, UserEventConfig): LOGGING_CONFIG = logs_init_builder() SEND_FILE_MAX_AGE_DEFAULT = 0 diff --git a/data/userevent.py b/data/userevent.py new file mode 100644 index 000000000..3df24294e --- /dev/null +++ b/data/userevent.py @@ -0,0 +1,90 @@ +import redis +import json +import threading + +class UserEventBuilder(object): + """ + Defines a helper class for constructing UserEvent and UserEventListener + instances. + """ + def __init__(self, redis_host): + self._redis_host = redis_host + + def get_event(self, username): + return UserEvent(self._redis_host, username) + + def get_listener(self, username, events): + return UserEventListener(self._redis_host, username, events) + + +class UserEvent(object): + """ + Defines a helper class for publishing to realtime user events + as backed by Redis. + """ + def __init__(self, redis_host, username): + self._redis = redis.StrictRedis(host=redis_host) + self._username = username + + @staticmethod + def _user_event_key(username, event_id): + return 'user/%s/events/%s' % (username, event_id) + + def publish_event_data_sync(self, event_id, data_obj): + return self._redis.publish(self._user_event_key(self._username, event_id), json.dumps(data_obj)) + + def publish_event_data(self, event_id, data_obj): + """ + Publishes the serialized form of the data object for the given event. Note that this occurs + in a thread to prevent blocking. + """ + def conduct(): + try: + self.publish_event_data_sync(event_id, data_obj) + except Exception as e: + print e + + thread = threading.Thread(target=conduct) + thread.start() + + +class UserEventListener(object): + """ + Defines a helper class for subscribing to realtime user events as + backed by Redis. + """ + def __init__(self, redis_host, username, events=set([])): + channels = [self._user_event_key(username, e) for e in events] + + self._redis = redis.StrictRedis(host=redis_host) + self._pubsub = self._redis.pubsub() + self._pubsub.subscribe(channels) + + @staticmethod + def _user_event_key(username, event_id): + return 'user/%s/events/%s' % (username, event_id) + + def event_stream(self): + """ + Starts listening for events on the channel(s), yielding for each event + found. + """ + for item in self._pubsub.listen(): + channel = item['channel'] + event_id = channel.split('/')[3] # user/{username}/{events}/{id} + data = None + + try: + data = json.loads(item['data'] or '{}') + except: + pass + + if data: + yield event_id, data + + def stop(self): + """ + Unsubscribes from the channel(s). Should be called once the connection + has terminated. + """ + self._pubsub.unsubscribe() diff --git a/endpoints/index.py b/endpoints/index.py index 998a0f94d..40a226270 100644 --- a/endpoints/index.py +++ b/endpoints/index.py @@ -5,9 +5,9 @@ import urlparse from flask import request, make_response, jsonify, session, Blueprint from functools import wraps -from data import model +from data import model, userevent from data.queue import webhook_queue -from app import mixpanel +from app import mixpanel, app from auth.auth import (process_auth, get_authenticated_user, get_validated_token) from util.names import parse_repository_name @@ -80,8 +80,16 @@ def create_user(): if existing_user: verified = model.verify_user(username, password) if verified: + # Mark that the user was logged in. + event = app.config['USER_EVENTS'].get_event(username) + event.publish_event_data('docker-cli', {'action': 'login'}) + return make_response('Verified', 201) else: + # Mark that the login failed. + event = app.config['USER_EVENTS'].get_event(username) + event.publish_event_data('docker-cli', {'action': 'loginfailure'}) + abort(400, 'Invalid password.', issue='login-failure') else: @@ -186,9 +194,21 @@ def create_repository(namespace, repository): } if get_authenticated_user(): - mixpanel.track(get_authenticated_user().username, 'push_repo', - extra_params) - metadata['username'] = get_authenticated_user().username + username = get_authenticated_user().username + + mixpanel.track(username, 'push_repo', extra_params) + metadata['username'] = username + + # Mark that the user has started pushing the repo. + user_data = { + 'action': 'push_repo', + 'repository': repository, + 'namespace': namespace + } + + event = app.config['USER_EVENTS'].get_event(username) + event.publish_event_data('docker-cli', user_data) + else: mixpanel.track(get_validated_token().code, 'push_repo', extra_params) metadata['token'] = get_validated_token().friendly_name @@ -222,6 +242,19 @@ def update_images(namespace, repository): updated_tags[image['Tag']] = image['id'] model.set_image_checksum(image['id'], repo, image['checksum']) + if get_authenticated_user(): + username = get_authenticated_user().username + + # Mark that the user has pushed the repo. + user_data = { + 'action': 'pushed_repo', + 'repository': repository, + 'namespace': namespace + } + + event = app.config['USER_EVENTS'].get_event(username) + event.publish_event_data('docker-cli', user_data) + # Generate a job for each webhook that has been added to this repo webhooks = model.list_webhooks(namespace, repository) for webhook in webhooks: diff --git a/endpoints/realtime.py b/endpoints/realtime.py index 97388056f..a4df130aa 100644 --- a/endpoints/realtime.py +++ b/endpoints/realtime.py @@ -1,10 +1,11 @@ import logging import redis +import json from functools import wraps from flask import request, make_response, Blueprint, abort, Response from flask.ext.login import current_user, logout_user -from data import model +from data import model, userevent from app import app logger = logging.getLogger(__name__) @@ -29,31 +30,8 @@ def api_login_required(f): return decorated_view -# Based off of the SSE flask snippet here: http://flask.pocoo.org/snippets/116/ - -class ServerSentEvent(object): - def __init__(self, data): - self.data = data - self.event = None - self.id = None - self.desc_map = { - self.data : "data", - self.event : "event", - self.id : "id" - } - - def encode(self): - if not self.data: - return "" - lines = ["%s: %s" % (v, k) - for k, v in self.desc_map.iteritems() if k] - - return "%s\n\n" % "\n".join(lines) - -# The current subscriptions -subscriptions = [] - -@realtime.route("/") +@realtime.route("/user/") +@api_login_required def index(): debug_template = """ @@ -65,7 +43,7 @@ def index():