117 lines
		
	
	
	
		
			3.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			117 lines
		
	
	
	
		
			3.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import redis
 | |
| import json
 | |
| import threading
 | |
| 
 | |
| class UserEventBuilder(object):
 | |
|   """
 | |
|   Defines a helper class for constructing UserEvent and UserEventListener
 | |
|   instances.
 | |
|   """
 | |
|   def __init__(self, redis_config):
 | |
|     self._redis_config = redis_config
 | |
| 
 | |
|   def get_event(self, username):
 | |
|     return UserEvent(self._redis_config, username)
 | |
| 
 | |
|   def get_listener(self, username, events):
 | |
|     return UserEventListener(self._redis_config, username, events)
 | |
| 
 | |
| 
 | |
| class UserEventsBuilderModule(object):
 | |
|   def __init__(self, app=None):
 | |
|     self.app = app
 | |
|     if app is not None:
 | |
|       self.state = self.init_app(app)
 | |
|     else:
 | |
|       self.state = None
 | |
| 
 | |
|   def init_app(self, app):
 | |
|     redis_config = app.config.get('USER_EVENTS_REDIS')
 | |
|     if not redis_config:
 | |
|       # This is the old key name.
 | |
|       redis_config = {
 | |
|         'host': app.config.get('USER_EVENTS_REDIS_HOSTNAME'),
 | |
|       }
 | |
| 
 | |
|     user_events = UserEventBuilder(redis_config)
 | |
| 
 | |
|     # register extension with app
 | |
|     app.extensions = getattr(app, 'extensions', {})
 | |
|     app.extensions['userevents'] = user_events
 | |
|     return user_events
 | |
| 
 | |
|   def __getattr__(self, name):
 | |
|     return getattr(self.state, name, None)
 | |
| 
 | |
| 
 | |
| class UserEvent(object):
 | |
|   """
 | |
|   Defines a helper class for publishing to realtime user events
 | |
|   as backed by Redis.
 | |
|   """
 | |
|   def __init__(self, redis_config, username):
 | |
|     self._redis = redis.StrictRedis(socket_connect_timeout=5, **redis_config)
 | |
|     self._username = username
 | |
| 
 | |
|   @staticmethod
 | |
|   def _user_event_key(username, event_id):
 | |
|     return 'user/%s/events/%s' % (username, event_id)
 | |
| 
 | |
|   def publish_event_data_sync(self, event_id, data_obj):
 | |
|     return self._redis.publish(self._user_event_key(self._username, event_id), json.dumps(data_obj))
 | |
| 
 | |
|   def publish_event_data(self, event_id, data_obj):
 | |
|     """
 | |
|     Publishes the serialized form of the data object for the given event. Note that this occurs
 | |
|     in a thread to prevent blocking.
 | |
|     """
 | |
|     def conduct():
 | |
|       try:
 | |
|         self.publish_event_data_sync(event_id, data_obj)
 | |
|       except Exception as e:
 | |
|         print e
 | |
| 
 | |
|     thread = threading.Thread(target=conduct)
 | |
|     thread.start()
 | |
| 
 | |
| 
 | |
| class UserEventListener(object):
 | |
|   """
 | |
|   Defines a helper class for subscribing to realtime user events as
 | |
|   backed by Redis.
 | |
|   """
 | |
|   def __init__(self, redis_config, username, events=set([])):
 | |
|     channels = [self._user_event_key(username, e) for e in events]
 | |
| 
 | |
|     self._redis = redis.StrictRedis(socket_connect_timeout=5, **redis_config)
 | |
|     self._pubsub = self._redis.pubsub()
 | |
|     self._pubsub.subscribe(channels)
 | |
| 
 | |
|   @staticmethod
 | |
|   def _user_event_key(username, event_id):
 | |
|     return 'user/%s/events/%s' % (username, event_id)
 | |
| 
 | |
|   def event_stream(self):
 | |
|     """
 | |
|     Starts listening for events on the channel(s), yielding for each event
 | |
|     found.
 | |
|     """
 | |
|     for item in self._pubsub.listen():
 | |
|       channel = item['channel']
 | |
|       event_id = channel.split('/')[3] # user/{username}/{events}/{id}
 | |
|       data = None
 | |
| 
 | |
|       try:
 | |
|         data = json.loads(item['data'] or '{}')
 | |
|       except:
 | |
|         pass
 | |
| 
 | |
|       if data:
 | |
|         yield event_id, data
 | |
| 
 | |
|   def stop(self):
 | |
|     """
 | |
|     Unsubscribes from the channel(s). Should be called once the connection
 | |
|     has terminated.
 | |
|     """
 | |
|     self._pubsub.unsubscribe()
 |