added zipkin to recommendation service
This commit is contained in:
parent
cb3eece194
commit
ad1f47a65a
7 changed files with 131 additions and 126 deletions
|
@ -15,23 +15,36 @@
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
import grpc
|
import grpc
|
||||||
|
import os
|
||||||
|
|
||||||
import demo_pb2
|
import demo_pb2
|
||||||
import demo_pb2_grpc
|
import demo_pb2_grpc
|
||||||
|
|
||||||
|
from opencensus.trace.tracer import Tracer
|
||||||
|
from opencensus.ext.grpc import client_interceptor
|
||||||
|
from opencensus.ext.zipkin.trace_exporter import ZipkinExporter
|
||||||
|
from opencensus.trace.samplers import AlwaysOnSampler
|
||||||
|
|
||||||
from logger import getJSONLogger
|
from logger import getJSONLogger
|
||||||
logger = getJSONLogger('emailservice-client')
|
logger = getJSONLogger('emailservice-client')
|
||||||
|
|
||||||
from opencensus.trace.tracer import Tracer
|
# Setup Zipkin exporter
|
||||||
from opencensus.trace.exporters import stackdriver_exporter
|
try:
|
||||||
from opencensus.trace.ext.grpc import client_interceptor
|
zipkin_service_addr = os.environ.get("ZIPKIN_SERVICE_ADDR", '')
|
||||||
|
if zipkin_service_addr == "":
|
||||||
try:
|
logger.info("Skipping Zipkin traces initialization. Set environment variable ZIPKIN_SERVICE_ADDR=<host>:<port> to enable.")
|
||||||
exporter = stackdriver_exporter.StackdriverExporter()
|
raise KeyError()
|
||||||
tracer = Tracer(exporter=exporter)
|
host, port = zipkin_service_addr.split(":")
|
||||||
tracer_interceptor = client_interceptor.OpenCensusClientInterceptor(tracer, host_port='0.0.0.0:8080')
|
ze = ZipkinExporter(service_name="recommendationservice-client",
|
||||||
except:
|
host_name=host,
|
||||||
tracer_interceptor = client_interceptor.OpenCensusClientInterceptor()
|
port=port,
|
||||||
|
endpoint='/api/v2/spans')
|
||||||
|
sampler = AlwaysOnSampler()
|
||||||
|
tracer = Tracer(exporter=ze, sampler=sampler)
|
||||||
|
tracer_interceptor = client_interceptor.OpenCensusClientInterceptor(sampler, ze)
|
||||||
|
logger.info("Zipkin traces enabled, sending to " + zipkin_service_addr)
|
||||||
|
except KeyError:
|
||||||
|
tracer_interceptor = client_interceptor.OpenCensusClientInterceptor()
|
||||||
|
|
||||||
def send_confirmation_email(email, order):
|
def send_confirmation_email(email, order):
|
||||||
channel = grpc.insecure_channel('0.0.0.0:8080')
|
channel = grpc.insecure_channel('0.0.0.0:8080')
|
||||||
|
@ -49,3 +62,4 @@ def send_confirmation_email(email, order):
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
logger.info('Client for email service.')
|
logger.info('Client for email service.')
|
||||||
|
|
||||||
|
|
|
@ -33,35 +33,28 @@ from opencensus.ext.grpc import server_interceptor
|
||||||
from opencensus.trace.tracer import Tracer
|
from opencensus.trace.tracer import Tracer
|
||||||
from opencensus.ext.zipkin.trace_exporter import ZipkinExporter
|
from opencensus.ext.zipkin.trace_exporter import ZipkinExporter
|
||||||
from opencensus.trace.samplers import AlwaysOnSampler
|
from opencensus.trace.samplers import AlwaysOnSampler
|
||||||
from opencensus.trace import config_integration
|
|
||||||
|
|
||||||
# import googleclouddebugger
|
|
||||||
import googlecloudprofiler
|
|
||||||
|
|
||||||
zipkin_service_addr = os.environ["ZIPKIN_SERVICE_ADDR"]
|
|
||||||
host, port = zipkin_service_addr.split(":")
|
|
||||||
ze = ZipkinExporter(service_name="emailservice-server",
|
|
||||||
host_name=host,
|
|
||||||
port=port,
|
|
||||||
endpoint='/api/v2/spans')
|
|
||||||
sampler = AlwaysOnSampler()
|
|
||||||
tracer = Tracer(exporter=ze, sampler=sampler)
|
|
||||||
|
|
||||||
try:
|
|
||||||
tracer_interceptor = server_interceptor.OpenCensusServerInterceptor(sampler, ze)
|
|
||||||
except:
|
|
||||||
tracer_interceptor = server_interceptor.OpenCensusServerInterceptor()
|
|
||||||
|
|
||||||
from logger import getJSONLogger
|
from logger import getJSONLogger
|
||||||
logger = getJSONLogger('emailservice-server')
|
logger = getJSONLogger('emailservice-server')
|
||||||
|
|
||||||
# log the SPAN on the console
|
# Setup Zipkin exporter
|
||||||
config_integration.trace_integrations(['logging'])
|
try:
|
||||||
|
zipkin_service_addr = os.environ.get("ZIPKIN_SERVICE_ADDR", '')
|
||||||
# uncomment to create a test span
|
if zipkin_service_addr == "":
|
||||||
# with tracer.span(name='hello'):
|
logger.info("Skipping Zipkin traces initialization. Set environment variable ZIPKIN_SERVICE_ADDR=<host>:<port> to enable.")
|
||||||
# logger.warning('In the span')
|
raise KeyError()
|
||||||
|
host, port = zipkin_service_addr.split(":")
|
||||||
|
ze = ZipkinExporter(service_name="emailservice-server",
|
||||||
|
host_name=host,
|
||||||
|
port=port,
|
||||||
|
endpoint='/api/v2/spans')
|
||||||
|
sampler = AlwaysOnSampler()
|
||||||
|
tracer = Tracer(exporter=ze, sampler=sampler)
|
||||||
|
tracer_interceptor = server_interceptor.OpenCensusServerInterceptor(sampler, ze)
|
||||||
|
logger.info("Zipkin traces enabled, sending to " + zipkin_service_addr)
|
||||||
|
except KeyError:
|
||||||
|
tracer_interceptor = server_interceptor.OpenCensusServerInterceptor()
|
||||||
|
|
||||||
# Loads confirmation email template from file
|
# Loads confirmation email template from file
|
||||||
env = Environment(
|
env = Environment(
|
||||||
|
@ -179,24 +172,6 @@ def initStackdriverProfiling():
|
||||||
logger.warning("Could not initialize Stackdriver Profiler after retrying, giving up")
|
logger.warning("Could not initialize Stackdriver Profiler after retrying, giving up")
|
||||||
return
|
return
|
||||||
|
|
||||||
def initZipkinTrace(zipkin_service_addr):
|
|
||||||
host, port = zipkin_service_addr.split(":")
|
|
||||||
ze = ZipkinExporter(service_name="emailservice-server",
|
|
||||||
host_name=host,
|
|
||||||
port=port,
|
|
||||||
endpoint='/api/v2/spans')
|
|
||||||
sampler = AlwaysOnSampler()
|
|
||||||
tracer = Tracer(exporter=ze, sampler=sampler)
|
|
||||||
|
|
||||||
try:
|
|
||||||
tracer_interceptor = server_interceptor.OpenCensusServerInterceptor(sampler, ze)
|
|
||||||
except:
|
|
||||||
tracer_interceptor = server_interceptor.OpenCensusServerInterceptor()
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
logger.info("Successfully started Zipkin Traces to " + zipkin_service_addr)
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
logger.info('starting the email service in dummy mode.')
|
logger.info('starting the email service in dummy mode.')
|
||||||
start(dummy_mode = True)
|
start(dummy_mode = True)
|
||||||
|
|
24
src/recommendationservice/README.md
Normal file
24
src/recommendationservice/README.md
Normal file
|
@ -0,0 +1,24 @@
|
||||||
|
# Recommendation Service
|
||||||
|
|
||||||
|
This is the recommendation service in Python 2.7
|
||||||
|
|
||||||
|
## Local test
|
||||||
|
|
||||||
|
start a docker image and run commands to importe requirement and start the app:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
docker run -ti --rm -v $(pwd):/hipstershop python:2.7-slim bash
|
||||||
|
|
||||||
|
# install tools
|
||||||
|
cd /hipstershop/src/recommendationservice
|
||||||
|
apt-get update -qqy
|
||||||
|
apt-get -qqy install wget g++ vim netcat
|
||||||
|
pip install pip-tools
|
||||||
|
|
||||||
|
# install python dependencies
|
||||||
|
pip-compile --output-file=requirements.txt requirements.in
|
||||||
|
pip install -r requirements.txt
|
||||||
|
|
||||||
|
# run the app
|
||||||
|
python recommendation_server.py
|
||||||
|
```
|
|
@ -16,16 +16,36 @@
|
||||||
|
|
||||||
import sys
|
import sys
|
||||||
import grpc
|
import grpc
|
||||||
|
import os
|
||||||
import demo_pb2
|
import demo_pb2
|
||||||
import demo_pb2_grpc
|
import demo_pb2_grpc
|
||||||
|
|
||||||
from opencensus.trace.tracer import Tracer
|
from opencensus.trace.tracer import Tracer
|
||||||
from opencensus.trace.exporters import stackdriver_exporter
|
from opencensus.ext.grpc import client_interceptor
|
||||||
from opencensus.trace.ext.grpc import client_interceptor
|
from opencensus.ext.zipkin.trace_exporter import ZipkinExporter
|
||||||
|
from opencensus.trace.samplers import AlwaysOnSampler
|
||||||
|
|
||||||
from logger import getJSONLogger
|
from logger import getJSONLogger
|
||||||
logger = getJSONLogger('recommendationservice-server')
|
logger = getJSONLogger('recommendationservice-server')
|
||||||
|
|
||||||
|
# Setup Zipkin exporter
|
||||||
|
try:
|
||||||
|
zipkin_service_addr = os.environ.get("ZIPKIN_SERVICE_ADDR", '')
|
||||||
|
if zipkin_service_addr == "":
|
||||||
|
logger.info("Skipping Zipkin traces initialization. Set environment variable ZIPKIN_SERVICE_ADDR=<host>:<port> to enable.")
|
||||||
|
raise KeyError()
|
||||||
|
host, port = zipkin_service_addr.split(":")
|
||||||
|
ze = ZipkinExporter(service_name="recommendationservice-client",
|
||||||
|
host_name=host,
|
||||||
|
port=port,
|
||||||
|
endpoint='/api/v2/spans')
|
||||||
|
sampler = AlwaysOnSampler()
|
||||||
|
tracer = Tracer(exporter=ze, sampler=sampler)
|
||||||
|
tracer_interceptor = client_interceptor.OpenCensusClientInterceptor(sampler, ze)
|
||||||
|
logger.info("Zipkin traces enabled, sending to " + zipkin_service_addr)
|
||||||
|
except KeyError:
|
||||||
|
tracer_interceptor = client_interceptor.OpenCensusClientInterceptor()
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
# get port
|
# get port
|
||||||
if len(sys.argv) > 1:
|
if len(sys.argv) > 1:
|
||||||
|
@ -33,13 +53,6 @@ if __name__ == "__main__":
|
||||||
else:
|
else:
|
||||||
port = "8080"
|
port = "8080"
|
||||||
|
|
||||||
try:
|
|
||||||
exporter = stackdriver_exporter.StackdriverExporter()
|
|
||||||
tracer = Tracer(exporter=exporter)
|
|
||||||
tracer_interceptor = client_interceptor.OpenCensusClientInterceptor(tracer, host_port='localhost:'+port)
|
|
||||||
except:
|
|
||||||
tracer_interceptor = client_interceptor.OpenCensusClientInterceptor()
|
|
||||||
|
|
||||||
# set up server stub
|
# set up server stub
|
||||||
channel = grpc.insecure_channel('localhost:'+port)
|
channel = grpc.insecure_channel('localhost:'+port)
|
||||||
channel = grpc.intercept_channel(channel, tracer_interceptor)
|
channel = grpc.intercept_channel(channel, tracer_interceptor)
|
||||||
|
@ -48,4 +61,4 @@ if __name__ == "__main__":
|
||||||
request = demo_pb2.ListRecommendationsRequest(user_id="test", product_ids=["test"])
|
request = demo_pb2.ListRecommendationsRequest(user_id="test", product_ids=["test"])
|
||||||
# make call to server
|
# make call to server
|
||||||
response = stub.ListRecommendations(request)
|
response = stub.ListRecommendations(request)
|
||||||
logger.info(response)
|
logger.info(response)
|
|
@ -20,13 +20,19 @@ import time
|
||||||
import traceback
|
import traceback
|
||||||
from concurrent import futures
|
from concurrent import futures
|
||||||
|
|
||||||
import googleclouddebugger
|
# import googleclouddebugger
|
||||||
import googlecloudprofiler
|
# import googlecloudprofiler
|
||||||
import grpc
|
import grpc
|
||||||
from opencensus.trace.exporters import print_exporter
|
# from opencensus.trace.exporters import print_exporter
|
||||||
from opencensus.trace.exporters import stackdriver_exporter
|
# from opencensus.trace.exporters import stackdriver_exporter
|
||||||
from opencensus.trace.ext.grpc import server_interceptor
|
# from opencensus.trace.ext.grpc import server_interceptor
|
||||||
from opencensus.trace.samplers import always_on
|
# from opencensus.trace.samplers import always_on
|
||||||
|
|
||||||
|
from opencensus.ext.grpc import server_interceptor
|
||||||
|
from opencensus.trace.tracer import Tracer
|
||||||
|
from opencensus.ext.zipkin.trace_exporter import ZipkinExporter
|
||||||
|
from opencensus.trace.samplers import AlwaysOnSampler
|
||||||
|
from opencensus.trace import config_integration
|
||||||
|
|
||||||
import demo_pb2
|
import demo_pb2
|
||||||
import demo_pb2_grpc
|
import demo_pb2_grpc
|
||||||
|
@ -36,30 +42,23 @@ from grpc_health.v1 import health_pb2_grpc
|
||||||
from logger import getJSONLogger
|
from logger import getJSONLogger
|
||||||
logger = getJSONLogger('recommendationservice-server')
|
logger = getJSONLogger('recommendationservice-server')
|
||||||
|
|
||||||
def initStackdriverProfiling():
|
# Setup Zipkin exporter
|
||||||
project_id = None
|
try:
|
||||||
try:
|
zipkin_service_addr = os.environ.get("ZIPKIN_SERVICE_ADDR", '')
|
||||||
project_id = os.environ["GCP_PROJECT_ID"]
|
if zipkin_service_addr == "":
|
||||||
except KeyError:
|
logger.info("Skipping Zipkin traces initialization. Set environment variable ZIPKIN_SERVICE_ADDR=<host>:<port> to enable.")
|
||||||
# Environment variable not set
|
raise KeyError()
|
||||||
pass
|
host, port = zipkin_service_addr.split(":")
|
||||||
|
ze = ZipkinExporter(service_name="recommendationservice-server",
|
||||||
for retry in xrange(1,4):
|
host_name=host,
|
||||||
try:
|
port=port,
|
||||||
if project_id:
|
endpoint='/api/v2/spans')
|
||||||
googlecloudprofiler.start(service='recommendation_server', service_version='1.0.0', verbose=0, project_id=project_id)
|
sampler = AlwaysOnSampler()
|
||||||
else:
|
tracer = Tracer(exporter=ze, sampler=sampler)
|
||||||
googlecloudprofiler.start(service='recommendation_server', service_version='1.0.0', verbose=0)
|
tracer_interceptor = server_interceptor.OpenCensusServerInterceptor(sampler, ze)
|
||||||
logger.info("Successfully started Stackdriver Profiler.")
|
logger.info("Zipkin traces enabled, sending to " + zipkin_service_addr)
|
||||||
return
|
except KeyError:
|
||||||
except (BaseException) as exc:
|
tracer_interceptor = server_interceptor.OpenCensusServerInterceptor()
|
||||||
logger.info("Unable to start Stackdriver Profiler Python agent. " + str(exc))
|
|
||||||
if (retry < 4):
|
|
||||||
logger.info("Sleeping %d seconds to retry Stackdriver Profiler agent initialization"%(retry*10))
|
|
||||||
time.sleep (1)
|
|
||||||
else:
|
|
||||||
logger.warning("Could not initialize Stackdriver Profiler after retrying, giving up")
|
|
||||||
return
|
|
||||||
|
|
||||||
class RecommendationService(demo_pb2_grpc.RecommendationServiceServicer):
|
class RecommendationService(demo_pb2_grpc.RecommendationServiceServicer):
|
||||||
def ListRecommendations(self, request, context):
|
def ListRecommendations(self, request, context):
|
||||||
|
@ -88,34 +87,6 @@ class RecommendationService(demo_pb2_grpc.RecommendationServiceServicer):
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
logger.info("initializing recommendationservice")
|
logger.info("initializing recommendationservice")
|
||||||
|
|
||||||
try:
|
|
||||||
enable_profiler = os.environ["ENABLE_PROFILER"]
|
|
||||||
if enable_profiler != "1":
|
|
||||||
raise KeyError()
|
|
||||||
else:
|
|
||||||
initStackdriverProfiling()
|
|
||||||
except KeyError:
|
|
||||||
logger.info("Skipping Stackdriver Profiler Python agent initialization. Set environment variable ENABLE_PROFILER=1 to enable.")
|
|
||||||
|
|
||||||
try:
|
|
||||||
sampler = always_on.AlwaysOnSampler()
|
|
||||||
exporter = stackdriver_exporter.StackdriverExporter(
|
|
||||||
project_id=os.environ.get('GCP_PROJECT_ID'),
|
|
||||||
transport=AsyncTransport)
|
|
||||||
tracer_interceptor = server_interceptor.OpenCensusServerInterceptor(sampler, exporter)
|
|
||||||
except:
|
|
||||||
tracer_interceptor = server_interceptor.OpenCensusServerInterceptor()
|
|
||||||
|
|
||||||
try:
|
|
||||||
googleclouddebugger.enable(
|
|
||||||
module='recommendationserver',
|
|
||||||
version='1.0.0'
|
|
||||||
)
|
|
||||||
except Exception, err:
|
|
||||||
logger.error("could not enable debugger")
|
|
||||||
logger.error(traceback.print_exc())
|
|
||||||
pass
|
|
||||||
|
|
||||||
port = os.environ.get('PORT', "8080")
|
port = os.environ.get('PORT', "8080")
|
||||||
catalog_addr = os.environ.get('PRODUCT_CATALOG_SERVICE_ADDR', '')
|
catalog_addr = os.environ.get('PRODUCT_CATALOG_SERVICE_ADDR', '')
|
||||||
if catalog_addr == "":
|
if catalog_addr == "":
|
||||||
|
|
|
@ -2,6 +2,8 @@ google-api-core==1.6.0
|
||||||
google-python-cloud-debugger==2.9
|
google-python-cloud-debugger==2.9
|
||||||
grpcio-health-checking==1.13.0
|
grpcio-health-checking==1.13.0
|
||||||
grpcio==1.16.1
|
grpcio==1.16.1
|
||||||
opencensus[stackdriver]==0.1.10
|
opencensus==0.7.6
|
||||||
|
opencensus-ext-zipkin==0.2.2
|
||||||
|
opencensus-ext-grpc==0.7.1
|
||||||
python-json-logger==0.1.9
|
python-json-logger==0.1.9
|
||||||
google-cloud-profiler==1.0.8
|
google-cloud-profiler==1.0.8
|
||||||
|
|
|
@ -2,25 +2,28 @@
|
||||||
# This file is autogenerated by pip-compile
|
# This file is autogenerated by pip-compile
|
||||||
# To update, run:
|
# To update, run:
|
||||||
#
|
#
|
||||||
# pip-compile --output-file requirements.txt requirements.in
|
# pip-compile --output-file=requirements.txt requirements.in
|
||||||
#
|
#
|
||||||
cachetools==3.1.0 # via google-auth
|
cachetools==3.1.0 # via google-auth
|
||||||
certifi==2018.11.29 # via requests
|
certifi==2018.11.29 # via requests
|
||||||
chardet==3.0.4 # via requests
|
chardet==3.0.4 # via requests
|
||||||
google-api-core[grpc]==1.6.0
|
enum34==1.1.6 # via grpcio
|
||||||
|
futures==3.3.0 # via google-api-core, grpcio
|
||||||
|
google-api-core==1.6.0
|
||||||
google-api-python-client==1.7.8 # via google-cloud-profiler, google-python-cloud-debugger
|
google-api-python-client==1.7.8 # via google-cloud-profiler, google-python-cloud-debugger
|
||||||
google-auth-httplib2==0.0.3 # via google-api-python-client, google-cloud-profiler, google-python-cloud-debugger
|
google-auth-httplib2==0.0.3 # via google-api-python-client, google-cloud-profiler, google-python-cloud-debugger
|
||||||
google-auth==1.6.2 # via google-api-core, google-api-python-client, google-auth-httplib2, google-cloud-profiler, google-python-cloud-debugger
|
google-auth==1.6.2 # via google-api-core, google-api-python-client, google-auth-httplib2, google-cloud-profiler, google-python-cloud-debugger
|
||||||
google-cloud-core==0.29.1 # via google-cloud-trace
|
|
||||||
google-cloud-profiler==1.0.8
|
google-cloud-profiler==1.0.8
|
||||||
google-cloud-trace==0.20.2 # via opencensus
|
|
||||||
google-python-cloud-debugger==2.9
|
google-python-cloud-debugger==2.9
|
||||||
googleapis-common-protos==1.5.6 # via google-api-core
|
googleapis-common-protos==1.5.6 # via google-api-core
|
||||||
grpcio-health-checking==1.13.0
|
grpcio-health-checking==1.13.0
|
||||||
grpcio==1.16.1
|
grpcio==1.16.1
|
||||||
httplib2==0.12.0 # via google-api-python-client, google-auth-httplib2
|
httplib2==0.12.0 # via google-api-python-client, google-auth-httplib2
|
||||||
idna==2.8 # via requests
|
idna==2.8 # via requests
|
||||||
opencensus[stackdriver]==0.1.10
|
opencensus-context==0.1.1 # via opencensus
|
||||||
|
opencensus-ext-grpc==0.7.1
|
||||||
|
opencensus-ext-zipkin==0.2.2
|
||||||
|
opencensus==0.7.6
|
||||||
protobuf==3.6.1 # via google-api-core, google-cloud-profiler, googleapis-common-protos, grpcio-health-checking
|
protobuf==3.6.1 # via google-api-core, google-cloud-profiler, googleapis-common-protos, grpcio-health-checking
|
||||||
pyasn1-modules==0.2.4 # via google-auth
|
pyasn1-modules==0.2.4 # via google-auth
|
||||||
pyasn1==0.4.5 # via pyasn1-modules, rsa
|
pyasn1==0.4.5 # via pyasn1-modules, rsa
|
||||||
|
@ -32,3 +35,6 @@ rsa==4.0 # via google-auth
|
||||||
six==1.12.0 # via google-api-core, google-api-python-client, google-auth, google-python-cloud-debugger, grpcio, protobuf
|
six==1.12.0 # via google-api-core, google-api-python-client, google-auth, google-python-cloud-debugger, grpcio, protobuf
|
||||||
uritemplate==3.0.0 # via google-api-python-client
|
uritemplate==3.0.0 # via google-api-python-client
|
||||||
urllib3==1.24.1 # via requests
|
urllib3==1.24.1 # via requests
|
||||||
|
|
||||||
|
# The following packages are considered to be unsafe in a requirements file:
|
||||||
|
# setuptools
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue