Get the basic tutorial working completely, including reacting to server-side events
This commit is contained in:
parent
b7afc83204
commit
fa1bf94af1
20 changed files with 431 additions and 99 deletions
90
data/userevent.py
Normal file
90
data/userevent.py
Normal file
|
@ -0,0 +1,90 @@
|
|||
import redis
|
||||
import json
|
||||
import threading
|
||||
|
||||
class UserEventBuilder(object):
|
||||
"""
|
||||
Defines a helper class for constructing UserEvent and UserEventListener
|
||||
instances.
|
||||
"""
|
||||
def __init__(self, redis_host):
|
||||
self._redis_host = redis_host
|
||||
|
||||
def get_event(self, username):
|
||||
return UserEvent(self._redis_host, username)
|
||||
|
||||
def get_listener(self, username, events):
|
||||
return UserEventListener(self._redis_host, username, events)
|
||||
|
||||
|
||||
class UserEvent(object):
|
||||
"""
|
||||
Defines a helper class for publishing to realtime user events
|
||||
as backed by Redis.
|
||||
"""
|
||||
def __init__(self, redis_host, username):
|
||||
self._redis = redis.StrictRedis(host=redis_host)
|
||||
self._username = username
|
||||
|
||||
@staticmethod
|
||||
def _user_event_key(username, event_id):
|
||||
return 'user/%s/events/%s' % (username, event_id)
|
||||
|
||||
def publish_event_data_sync(self, event_id, data_obj):
|
||||
return self._redis.publish(self._user_event_key(self._username, event_id), json.dumps(data_obj))
|
||||
|
||||
def publish_event_data(self, event_id, data_obj):
|
||||
"""
|
||||
Publishes the serialized form of the data object for the given event. Note that this occurs
|
||||
in a thread to prevent blocking.
|
||||
"""
|
||||
def conduct():
|
||||
try:
|
||||
self.publish_event_data_sync(event_id, data_obj)
|
||||
except Exception as e:
|
||||
print e
|
||||
|
||||
thread = threading.Thread(target=conduct)
|
||||
thread.start()
|
||||
|
||||
|
||||
class UserEventListener(object):
|
||||
"""
|
||||
Defines a helper class for subscribing to realtime user events as
|
||||
backed by Redis.
|
||||
"""
|
||||
def __init__(self, redis_host, username, events=set([])):
|
||||
channels = [self._user_event_key(username, e) for e in events]
|
||||
|
||||
self._redis = redis.StrictRedis(host=redis_host)
|
||||
self._pubsub = self._redis.pubsub()
|
||||
self._pubsub.subscribe(channels)
|
||||
|
||||
@staticmethod
|
||||
def _user_event_key(username, event_id):
|
||||
return 'user/%s/events/%s' % (username, event_id)
|
||||
|
||||
def event_stream(self):
|
||||
"""
|
||||
Starts listening for events on the channel(s), yielding for each event
|
||||
found.
|
||||
"""
|
||||
for item in self._pubsub.listen():
|
||||
channel = item['channel']
|
||||
event_id = channel.split('/')[3] # user/{username}/{events}/{id}
|
||||
data = None
|
||||
|
||||
try:
|
||||
data = json.loads(item['data'] or '{}')
|
||||
except:
|
||||
pass
|
||||
|
||||
if data:
|
||||
yield event_id, data
|
||||
|
||||
def stop(self):
|
||||
"""
|
||||
Unsubscribes from the channel(s). Should be called once the connection
|
||||
has terminated.
|
||||
"""
|
||||
self._pubsub.unsubscribe()
|
Reference in a new issue