This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/data/userevent.py

111 lines
3 KiB
Python

import redis
import json
import threading
class UserEventBuilder(object):
"""
Defines a helper class for constructing UserEvent and UserEventListener
instances.
"""
def __init__(self, redis_host):
self._redis_host = redis_host
def get_event(self, username):
return UserEvent(self._redis_host, username)
def get_listener(self, username, events):
return UserEventListener(self._redis_host, username, events)
class UserEventsBuilderModule(object):
def __init__(self, app=None):
self.app = app
if app is not None:
self.state = self.init_app(app)
else:
self.state = None
def init_app(self, app):
redis_hostname = app.config.get('USER_EVENTS_REDIS_HOSTNAME')
user_events = UserEventBuilder(redis_hostname)
# register extension with app
app.extensions = getattr(app, 'extensions', {})
app.extensions['userevents'] = user_events
return user_events
def __getattr__(self, name):
return getattr(self.state, name, None)
class UserEvent(object):
"""
Defines a helper class for publishing to realtime user events
as backed by Redis.
"""
def __init__(self, redis_host, username):
self._redis = redis.StrictRedis(host=redis_host, socket_timeout=5)
self._username = username
@staticmethod
def _user_event_key(username, event_id):
return 'user/%s/events/%s' % (username, event_id)
def publish_event_data_sync(self, event_id, data_obj):
return self._redis.publish(self._user_event_key(self._username, event_id), json.dumps(data_obj))
def publish_event_data(self, event_id, data_obj):
"""
Publishes the serialized form of the data object for the given event. Note that this occurs
in a thread to prevent blocking.
"""
def conduct():
try:
self.publish_event_data_sync(event_id, data_obj)
except Exception as e:
print e
thread = threading.Thread(target=conduct)
thread.start()
class UserEventListener(object):
"""
Defines a helper class for subscribing to realtime user events as
backed by Redis.
"""
def __init__(self, redis_host, username, events=set([])):
channels = [self._user_event_key(username, e) for e in events]
self._redis = redis.StrictRedis(host=redis_host, socket_timeout=5)
self._pubsub = self._redis.pubsub()
self._pubsub.subscribe(channels)
@staticmethod
def _user_event_key(username, event_id):
return 'user/%s/events/%s' % (username, event_id)
def event_stream(self):
"""
Starts listening for events on the channel(s), yielding for each event
found.
"""
for item in self._pubsub.listen():
channel = item['channel']
event_id = channel.split('/')[3] # user/{username}/{events}/{id}
data = None
try:
data = json.loads(item['data'] or '{}')
except:
pass
if data:
yield event_id, data
def stop(self):
"""
Unsubscribes from the channel(s). Should be called once the connection
has terminated.
"""
self._pubsub.unsubscribe()