import redis import json import threading import logging logger = logging.getLogger(__name__) class UserEventBuilder(object): """ Defines a helper class for constructing UserEvent and UserEventListener instances. """ def __init__(self, redis_config): self._redis_config = redis_config def get_event(self, username): return UserEvent(self._redis_config, username) def get_listener(self, username, events): return UserEventListener(self._redis_config, username, events) class UserEventsBuilderModule(object): def __init__(self, app=None): self.app = app if app is not None: self.state = self.init_app(app) else: self.state = None def init_app(self, app): redis_config = app.config.get('USER_EVENTS_REDIS') if not redis_config: # This is the old key name. redis_config = { 'host': app.config.get('USER_EVENTS_REDIS_HOSTNAME'), } user_events = UserEventBuilder(redis_config) # register extension with app app.extensions = getattr(app, 'extensions', {}) app.extensions['userevents'] = user_events return user_events def __getattr__(self, name): return getattr(self.state, name, None) class UserEvent(object): """ Defines a helper class for publishing to realtime user events as backed by Redis. """ def __init__(self, redis_config, username): self._redis = redis.StrictRedis(socket_connect_timeout=5, **redis_config) self._username = username @staticmethod def _user_event_key(username, event_id): return 'user/%s/events/%s' % (username, event_id) def publish_event_data_sync(self, event_id, data_obj): return self._redis.publish(self._user_event_key(self._username, event_id), json.dumps(data_obj)) def publish_event_data(self, event_id, data_obj): """ Publishes the serialized form of the data object for the given event. Note that this occurs in a thread to prevent blocking. """ def conduct(): try: self.publish_event_data_sync(event_id, data_obj) logger.debug('Published user event %s: %s', event_id, data_obj) except Exception: logger.exception('Could not publish user event') thread = threading.Thread(target=conduct) thread.start() class UserEventListener(object): """ Defines a helper class for subscribing to realtime user events as backed by Redis. """ def __init__(self, redis_config, username, events=set([])): channels = [self._user_event_key(username, e) for e in events] self._redis = redis.StrictRedis(socket_connect_timeout=5, **redis_config) self._pubsub = self._redis.pubsub() self._pubsub.subscribe(channels) @staticmethod def _user_event_key(username, event_id): return 'user/%s/events/%s' % (username, event_id) def event_stream(self): """ Starts listening for events on the channel(s), yielding for each event found. """ for item in self._pubsub.listen(): channel = item['channel'] event_id = channel.split('/')[3] # user/{username}/{events}/{id} data = None try: data = json.loads(item['data'] or '{}') except: pass if data: yield event_id, data def stop(self): """ Unsubscribes from the channel(s). Should be called once the connection has terminated. """ self._pubsub.unsubscribe()