Write our users to Marketo as leads.
This commit is contained in:
parent
013e27f7d5
commit
f04b018805
11 changed files with 250 additions and 6 deletions
63
util/asyncwrapper.py
Normal file
63
util/asyncwrapper.py
Normal file
|
@ -0,0 +1,63 @@
|
|||
import queue
|
||||
|
||||
from functools import wraps
|
||||
|
||||
from concurrent.futures import Executor, Future, CancelledError
|
||||
|
||||
|
||||
class AsyncExecutorWrapper(object):
|
||||
""" This class will wrap a syncronous library transparently in a way which
|
||||
will move all calls off to an asynchronous Executor, and will change all
|
||||
returned values to be Future objects.
|
||||
"""
|
||||
SYNC_FLAG_FIELD = '__AsyncExecutorWrapper__sync__'
|
||||
|
||||
def __init__(self, delegate, executor):
|
||||
""" Wrap the specified synchronous delegate instance, and submit() all
|
||||
method calls to the specified Executor instance.
|
||||
"""
|
||||
self._delegate = delegate
|
||||
self._executor = executor
|
||||
|
||||
def __getattr__(self, attr_name):
|
||||
maybe_callable = getattr(self._delegate, attr_name) # Will raise proper attribute error
|
||||
if callable(maybe_callable):
|
||||
# Build a callable which when executed places the request
|
||||
# onto a queue
|
||||
@wraps(maybe_callable)
|
||||
def wrapped_method(*args, **kwargs):
|
||||
if getattr(maybe_callable, self.SYNC_FLAG_FIELD, False):
|
||||
sync_result = Future()
|
||||
try:
|
||||
sync_result.set_result(maybe_callable(*args, **kwargs))
|
||||
except Exception as ex:
|
||||
sync_result.set_exception(ex)
|
||||
return sync_result
|
||||
|
||||
try:
|
||||
return self._executor.submit(maybe_callable, *args, **kwargs)
|
||||
except queue.Full as ex:
|
||||
queue_full = Future()
|
||||
queue_full.set_exception(ex)
|
||||
return queue_full
|
||||
|
||||
return wrapped_method
|
||||
else:
|
||||
return maybe_callable
|
||||
|
||||
@classmethod
|
||||
def sync(cls, f):
|
||||
""" Annotate the given method to flag it as synchronous so that AsyncExecutorWrapper
|
||||
will return the result immediately without submitting it to the executor.
|
||||
"""
|
||||
setattr(f, cls.SYNC_FLAG_FIELD, True)
|
||||
return f
|
||||
|
||||
|
||||
class NullExecutor(Executor):
|
||||
""" Executor instance which always returns a Future completed with a
|
||||
CancelledError exception. """
|
||||
def submit(self, _, *args, **kwargs):
|
||||
always_fail = Future()
|
||||
always_fail.set_exception(CancelledError('Null executor always fails.'))
|
||||
return always_fail
|
137
util/saas/useranalytics.py
Normal file
137
util/saas/useranalytics.py
Normal file
|
@ -0,0 +1,137 @@
|
|||
import logging
|
||||
|
||||
from hashlib import sha1
|
||||
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from marketorestpython.client import MarketoClient
|
||||
|
||||
from util.asyncwrapper import AsyncExecutorWrapper, NullExecutor
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class LeadNotFoundException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class _MarketoAnalyticsClient(object):
|
||||
""" User analytics implementation which will report user changes to the
|
||||
Marketo API.
|
||||
"""
|
||||
def __init__(self, marketo_client, munchkin_private_key, lead_source):
|
||||
""" Instantiate with the given marketorestpython.client, the Marketo
|
||||
Munchkin Private Key, and the Lead Source that we want to set when we
|
||||
create new lead records in Marketo.
|
||||
"""
|
||||
self._marketo = marketo_client
|
||||
self._munchkin_private_key = munchkin_private_key
|
||||
self._lead_source = lead_source
|
||||
|
||||
def create_lead(self, email, username):
|
||||
lead_data = dict(
|
||||
email=email,
|
||||
Quay_Username__c=username,
|
||||
leadSource='Web - Product Trial',
|
||||
Lead_Source_Detail__c=self._lead_source,
|
||||
)
|
||||
self._marketo.create_update_leads(
|
||||
action='createOrUpdate',
|
||||
leads=[lead_data],
|
||||
asyncProcessing=True,
|
||||
lookupField='email',
|
||||
)
|
||||
|
||||
def _find_leads_by_email(self, email):
|
||||
# Fetch the existing user from the database by email
|
||||
found = self._marketo.get_multiple_leads_by_filter_type(
|
||||
filterType='email',
|
||||
filterValues=[email],
|
||||
)
|
||||
|
||||
if not found:
|
||||
raise LeadNotFoundException('No lead found with email: {}'.format(email))
|
||||
|
||||
return found
|
||||
|
||||
def change_email(self, old_email, new_email):
|
||||
found = self._find_leads_by_email(old_email)
|
||||
|
||||
# Update using their user id.
|
||||
updated = [dict(id=lead['id'], email=new_email) for lead in found]
|
||||
self._marketo.create_update_leads(
|
||||
action='updateOnly',
|
||||
leads=updated,
|
||||
asyncProcessing=True,
|
||||
lookupField='id',
|
||||
)
|
||||
|
||||
def change_username(self, email, new_username):
|
||||
found = self._find_leads_by_email(email)
|
||||
|
||||
# Update using their user id.
|
||||
updated = [dict(id=lead['id'], Quay_Username__c=new_username) for lead in found]
|
||||
self._marketo.create_update_leads(
|
||||
action='updateOnly',
|
||||
leads=updated,
|
||||
asyncProcessing=True,
|
||||
lookupField='id',
|
||||
)
|
||||
|
||||
@AsyncExecutorWrapper.sync
|
||||
def get_user_analytics_metadata(self, user_obj):
|
||||
""" Return a list of properties that should be added to the user object to allow
|
||||
analytics associations.
|
||||
"""
|
||||
if not self._munchkin_private_key:
|
||||
return dict()
|
||||
|
||||
marketo_user_hash = sha1(self._munchkin_private_key)
|
||||
marketo_user_hash.update(user_obj.email)
|
||||
|
||||
return dict(
|
||||
marketo_user_hash=marketo_user_hash.hexdigest(),
|
||||
)
|
||||
|
||||
|
||||
class UserAnalytics(object):
|
||||
def __init__(self, app=None):
|
||||
self.app = app
|
||||
if app is not None:
|
||||
self.state = self.init_app(app)
|
||||
else:
|
||||
self.state = None
|
||||
|
||||
def init_app(self, app):
|
||||
analytics_type = app.config.get('USER_ANALYTICS_TYPE', 'FakeAnalytics')
|
||||
|
||||
marketo_munchkin_id = ''
|
||||
marketo_munchkin_private_key = ''
|
||||
marketo_client_id = ''
|
||||
marketo_client_secret = ''
|
||||
marketo_lead_source = ''
|
||||
executor = NullExecutor()
|
||||
|
||||
if analytics_type == 'Marketo':
|
||||
marketo_munchkin_id = app.config['MARKETO_MUNCHKIN_ID']
|
||||
marketo_munchkin_private_key = app.config['MARKETO_MUNCHKIN_PRIVATE_KEY']
|
||||
marketo_client_id = app.config['MARKETO_CLIENT_ID']
|
||||
marketo_client_secret = app.config['MARKETO_CLIENT_SECRET']
|
||||
marketo_lead_source = app.config['MARKETO_LEAD_SOURCE']
|
||||
|
||||
logger.debug('Initializing marketo with keys: %s %s %s', marketo_munchkin_id,
|
||||
marketo_client_id, marketo_client_secret)
|
||||
|
||||
executor = ThreadPoolExecutor(max_workers=1)
|
||||
|
||||
marketo_client = MarketoClient(marketo_munchkin_id, marketo_client_id, marketo_client_secret)
|
||||
client_wrapper = _MarketoAnalyticsClient(marketo_client, marketo_munchkin_private_key,
|
||||
marketo_lead_source)
|
||||
user_analytics = AsyncExecutorWrapper(client_wrapper, executor)
|
||||
|
||||
# register extension with app
|
||||
app.extensions = getattr(app, 'extensions', {})
|
||||
app.extensions['user_analytics'] = user_analytics
|
||||
return user_analytics
|
||||
|
||||
def __getattr__(self, name):
|
||||
return getattr(self.state, name, None)
|
Reference in a new issue