From ad1f47a65afac6690c4c42d8c956f0339bd9d24f Mon Sep 17 00:00:00 2001 From: prune Date: Thu, 30 Jan 2020 09:01:03 -0500 Subject: [PATCH] added zipkin to recommendation service --- src/emailservice/email_client.py | 34 +++++--- src/emailservice/email_server.py | 59 ++++--------- src/recommendationservice/README.md | 24 +++++ src/recommendationservice/client.py | 33 ++++--- .../recommendation_server.py | 87 +++++++------------ src/recommendationservice/requirements.in | 4 +- src/recommendationservice/requirements.txt | 16 ++-- 7 files changed, 131 insertions(+), 126 deletions(-) create mode 100644 src/recommendationservice/README.md diff --git a/src/emailservice/email_client.py b/src/emailservice/email_client.py index c31f4ef..60c6026 100644 --- a/src/emailservice/email_client.py +++ b/src/emailservice/email_client.py @@ -15,23 +15,36 @@ # limitations under the License. import grpc +import os import demo_pb2 import demo_pb2_grpc +from opencensus.trace.tracer import Tracer +from opencensus.ext.grpc import client_interceptor +from opencensus.ext.zipkin.trace_exporter import ZipkinExporter +from opencensus.trace.samplers import AlwaysOnSampler + from logger import getJSONLogger logger = getJSONLogger('emailservice-client') -from opencensus.trace.tracer import Tracer -from opencensus.trace.exporters import stackdriver_exporter -from opencensus.trace.ext.grpc import client_interceptor - -try: - exporter = stackdriver_exporter.StackdriverExporter() - tracer = Tracer(exporter=exporter) - tracer_interceptor = client_interceptor.OpenCensusClientInterceptor(tracer, host_port='0.0.0.0:8080') -except: - tracer_interceptor = client_interceptor.OpenCensusClientInterceptor() +# Setup Zipkin exporter +try: + zipkin_service_addr = os.environ.get("ZIPKIN_SERVICE_ADDR", '') + if zipkin_service_addr == "": + logger.info("Skipping Zipkin traces initialization. Set environment variable ZIPKIN_SERVICE_ADDR=: to enable.") + raise KeyError() + host, port = zipkin_service_addr.split(":") + ze = ZipkinExporter(service_name="recommendationservice-client", + host_name=host, + port=port, + endpoint='/api/v2/spans') + sampler = AlwaysOnSampler() + tracer = Tracer(exporter=ze, sampler=sampler) + tracer_interceptor = client_interceptor.OpenCensusClientInterceptor(sampler, ze) + logger.info("Zipkin traces enabled, sending to " + zipkin_service_addr) +except KeyError: + tracer_interceptor = client_interceptor.OpenCensusClientInterceptor() def send_confirmation_email(email, order): channel = grpc.insecure_channel('0.0.0.0:8080') @@ -49,3 +62,4 @@ def send_confirmation_email(email, order): if __name__ == '__main__': logger.info('Client for email service.') + diff --git a/src/emailservice/email_server.py b/src/emailservice/email_server.py index bcfecd1..ac322d8 100644 --- a/src/emailservice/email_server.py +++ b/src/emailservice/email_server.py @@ -33,35 +33,28 @@ from opencensus.ext.grpc import server_interceptor from opencensus.trace.tracer import Tracer from opencensus.ext.zipkin.trace_exporter import ZipkinExporter from opencensus.trace.samplers import AlwaysOnSampler -from opencensus.trace import config_integration -# import googleclouddebugger -import googlecloudprofiler - -zipkin_service_addr = os.environ["ZIPKIN_SERVICE_ADDR"] -host, port = zipkin_service_addr.split(":") -ze = ZipkinExporter(service_name="emailservice-server", - host_name=host, - port=port, - endpoint='/api/v2/spans') -sampler = AlwaysOnSampler() -tracer = Tracer(exporter=ze, sampler=sampler) - -try: - tracer_interceptor = server_interceptor.OpenCensusServerInterceptor(sampler, ze) -except: - tracer_interceptor = server_interceptor.OpenCensusServerInterceptor() from logger import getJSONLogger logger = getJSONLogger('emailservice-server') -# log the SPAN on the console -config_integration.trace_integrations(['logging']) - -# uncomment to create a test span -# with tracer.span(name='hello'): -# logger.warning('In the span') - +# Setup Zipkin exporter +try: + zipkin_service_addr = os.environ.get("ZIPKIN_SERVICE_ADDR", '') + if zipkin_service_addr == "": + logger.info("Skipping Zipkin traces initialization. Set environment variable ZIPKIN_SERVICE_ADDR=: to enable.") + raise KeyError() + host, port = zipkin_service_addr.split(":") + ze = ZipkinExporter(service_name="emailservice-server", + host_name=host, + port=port, + endpoint='/api/v2/spans') + sampler = AlwaysOnSampler() + tracer = Tracer(exporter=ze, sampler=sampler) + tracer_interceptor = server_interceptor.OpenCensusServerInterceptor(sampler, ze) + logger.info("Zipkin traces enabled, sending to " + zipkin_service_addr) +except KeyError: + tracer_interceptor = server_interceptor.OpenCensusServerInterceptor() # Loads confirmation email template from file env = Environment( @@ -179,24 +172,6 @@ def initStackdriverProfiling(): logger.warning("Could not initialize Stackdriver Profiler after retrying, giving up") return -def initZipkinTrace(zipkin_service_addr): - host, port = zipkin_service_addr.split(":") - ze = ZipkinExporter(service_name="emailservice-server", - host_name=host, - port=port, - endpoint='/api/v2/spans') - sampler = AlwaysOnSampler() - tracer = Tracer(exporter=ze, sampler=sampler) - - try: - tracer_interceptor = server_interceptor.OpenCensusServerInterceptor(sampler, ze) - except: - tracer_interceptor = server_interceptor.OpenCensusServerInterceptor() - - - - logger.info("Successfully started Zipkin Traces to " + zipkin_service_addr) - if __name__ == '__main__': logger.info('starting the email service in dummy mode.') start(dummy_mode = True) diff --git a/src/recommendationservice/README.md b/src/recommendationservice/README.md new file mode 100644 index 0000000..14399ac --- /dev/null +++ b/src/recommendationservice/README.md @@ -0,0 +1,24 @@ +# Recommendation Service + +This is the recommendation service in Python 2.7 + +## Local test + +start a docker image and run commands to importe requirement and start the app: + +```bash +docker run -ti --rm -v $(pwd):/hipstershop python:2.7-slim bash + +# install tools +cd /hipstershop/src/recommendationservice +apt-get update -qqy +apt-get -qqy install wget g++ vim netcat +pip install pip-tools + +# install python dependencies +pip-compile --output-file=requirements.txt requirements.in +pip install -r requirements.txt + +# run the app +python recommendation_server.py +``` \ No newline at end of file diff --git a/src/recommendationservice/client.py b/src/recommendationservice/client.py index 7894886..0713e33 100644 --- a/src/recommendationservice/client.py +++ b/src/recommendationservice/client.py @@ -16,16 +16,36 @@ import sys import grpc +import os import demo_pb2 import demo_pb2_grpc from opencensus.trace.tracer import Tracer -from opencensus.trace.exporters import stackdriver_exporter -from opencensus.trace.ext.grpc import client_interceptor +from opencensus.ext.grpc import client_interceptor +from opencensus.ext.zipkin.trace_exporter import ZipkinExporter +from opencensus.trace.samplers import AlwaysOnSampler from logger import getJSONLogger logger = getJSONLogger('recommendationservice-server') +# Setup Zipkin exporter +try: + zipkin_service_addr = os.environ.get("ZIPKIN_SERVICE_ADDR", '') + if zipkin_service_addr == "": + logger.info("Skipping Zipkin traces initialization. Set environment variable ZIPKIN_SERVICE_ADDR=: to enable.") + raise KeyError() + host, port = zipkin_service_addr.split(":") + ze = ZipkinExporter(service_name="recommendationservice-client", + host_name=host, + port=port, + endpoint='/api/v2/spans') + sampler = AlwaysOnSampler() + tracer = Tracer(exporter=ze, sampler=sampler) + tracer_interceptor = client_interceptor.OpenCensusClientInterceptor(sampler, ze) + logger.info("Zipkin traces enabled, sending to " + zipkin_service_addr) +except KeyError: + tracer_interceptor = client_interceptor.OpenCensusClientInterceptor() + if __name__ == "__main__": # get port if len(sys.argv) > 1: @@ -33,13 +53,6 @@ if __name__ == "__main__": else: port = "8080" - try: - exporter = stackdriver_exporter.StackdriverExporter() - tracer = Tracer(exporter=exporter) - tracer_interceptor = client_interceptor.OpenCensusClientInterceptor(tracer, host_port='localhost:'+port) - except: - tracer_interceptor = client_interceptor.OpenCensusClientInterceptor() - # set up server stub channel = grpc.insecure_channel('localhost:'+port) channel = grpc.intercept_channel(channel, tracer_interceptor) @@ -48,4 +61,4 @@ if __name__ == "__main__": request = demo_pb2.ListRecommendationsRequest(user_id="test", product_ids=["test"]) # make call to server response = stub.ListRecommendations(request) - logger.info(response) + logger.info(response) \ No newline at end of file diff --git a/src/recommendationservice/recommendation_server.py b/src/recommendationservice/recommendation_server.py index c4e8fe6..f5da9c8 100644 --- a/src/recommendationservice/recommendation_server.py +++ b/src/recommendationservice/recommendation_server.py @@ -20,13 +20,19 @@ import time import traceback from concurrent import futures -import googleclouddebugger -import googlecloudprofiler +# import googleclouddebugger +# import googlecloudprofiler import grpc -from opencensus.trace.exporters import print_exporter -from opencensus.trace.exporters import stackdriver_exporter -from opencensus.trace.ext.grpc import server_interceptor -from opencensus.trace.samplers import always_on +# from opencensus.trace.exporters import print_exporter +# from opencensus.trace.exporters import stackdriver_exporter +# from opencensus.trace.ext.grpc import server_interceptor +# from opencensus.trace.samplers import always_on + +from opencensus.ext.grpc import server_interceptor +from opencensus.trace.tracer import Tracer +from opencensus.ext.zipkin.trace_exporter import ZipkinExporter +from opencensus.trace.samplers import AlwaysOnSampler +from opencensus.trace import config_integration import demo_pb2 import demo_pb2_grpc @@ -36,30 +42,23 @@ from grpc_health.v1 import health_pb2_grpc from logger import getJSONLogger logger = getJSONLogger('recommendationservice-server') -def initStackdriverProfiling(): - project_id = None - try: - project_id = os.environ["GCP_PROJECT_ID"] - except KeyError: - # Environment variable not set - pass - - for retry in xrange(1,4): - try: - if project_id: - googlecloudprofiler.start(service='recommendation_server', service_version='1.0.0', verbose=0, project_id=project_id) - else: - googlecloudprofiler.start(service='recommendation_server', service_version='1.0.0', verbose=0) - logger.info("Successfully started Stackdriver Profiler.") - return - except (BaseException) as exc: - logger.info("Unable to start Stackdriver Profiler Python agent. " + str(exc)) - if (retry < 4): - logger.info("Sleeping %d seconds to retry Stackdriver Profiler agent initialization"%(retry*10)) - time.sleep (1) - else: - logger.warning("Could not initialize Stackdriver Profiler after retrying, giving up") - return +# Setup Zipkin exporter +try: + zipkin_service_addr = os.environ.get("ZIPKIN_SERVICE_ADDR", '') + if zipkin_service_addr == "": + logger.info("Skipping Zipkin traces initialization. Set environment variable ZIPKIN_SERVICE_ADDR=: to enable.") + raise KeyError() + host, port = zipkin_service_addr.split(":") + ze = ZipkinExporter(service_name="recommendationservice-server", + host_name=host, + port=port, + endpoint='/api/v2/spans') + sampler = AlwaysOnSampler() + tracer = Tracer(exporter=ze, sampler=sampler) + tracer_interceptor = server_interceptor.OpenCensusServerInterceptor(sampler, ze) + logger.info("Zipkin traces enabled, sending to " + zipkin_service_addr) +except KeyError: + tracer_interceptor = server_interceptor.OpenCensusServerInterceptor() class RecommendationService(demo_pb2_grpc.RecommendationServiceServicer): def ListRecommendations(self, request, context): @@ -88,34 +87,6 @@ class RecommendationService(demo_pb2_grpc.RecommendationServiceServicer): if __name__ == "__main__": logger.info("initializing recommendationservice") - try: - enable_profiler = os.environ["ENABLE_PROFILER"] - if enable_profiler != "1": - raise KeyError() - else: - initStackdriverProfiling() - except KeyError: - logger.info("Skipping Stackdriver Profiler Python agent initialization. Set environment variable ENABLE_PROFILER=1 to enable.") - - try: - sampler = always_on.AlwaysOnSampler() - exporter = stackdriver_exporter.StackdriverExporter( - project_id=os.environ.get('GCP_PROJECT_ID'), - transport=AsyncTransport) - tracer_interceptor = server_interceptor.OpenCensusServerInterceptor(sampler, exporter) - except: - tracer_interceptor = server_interceptor.OpenCensusServerInterceptor() - - try: - googleclouddebugger.enable( - module='recommendationserver', - version='1.0.0' - ) - except Exception, err: - logger.error("could not enable debugger") - logger.error(traceback.print_exc()) - pass - port = os.environ.get('PORT', "8080") catalog_addr = os.environ.get('PRODUCT_CATALOG_SERVICE_ADDR', '') if catalog_addr == "": diff --git a/src/recommendationservice/requirements.in b/src/recommendationservice/requirements.in index bfc28a5..b9ed585 100644 --- a/src/recommendationservice/requirements.in +++ b/src/recommendationservice/requirements.in @@ -2,6 +2,8 @@ google-api-core==1.6.0 google-python-cloud-debugger==2.9 grpcio-health-checking==1.13.0 grpcio==1.16.1 -opencensus[stackdriver]==0.1.10 +opencensus==0.7.6 +opencensus-ext-zipkin==0.2.2 +opencensus-ext-grpc==0.7.1 python-json-logger==0.1.9 google-cloud-profiler==1.0.8 diff --git a/src/recommendationservice/requirements.txt b/src/recommendationservice/requirements.txt index b504fff..087a56e 100644 --- a/src/recommendationservice/requirements.txt +++ b/src/recommendationservice/requirements.txt @@ -2,25 +2,28 @@ # This file is autogenerated by pip-compile # To update, run: # -# pip-compile --output-file requirements.txt requirements.in +# pip-compile --output-file=requirements.txt requirements.in # cachetools==3.1.0 # via google-auth certifi==2018.11.29 # via requests chardet==3.0.4 # via requests -google-api-core[grpc]==1.6.0 +enum34==1.1.6 # via grpcio +futures==3.3.0 # via google-api-core, grpcio +google-api-core==1.6.0 google-api-python-client==1.7.8 # via google-cloud-profiler, google-python-cloud-debugger google-auth-httplib2==0.0.3 # via google-api-python-client, google-cloud-profiler, google-python-cloud-debugger google-auth==1.6.2 # via google-api-core, google-api-python-client, google-auth-httplib2, google-cloud-profiler, google-python-cloud-debugger -google-cloud-core==0.29.1 # via google-cloud-trace google-cloud-profiler==1.0.8 -google-cloud-trace==0.20.2 # via opencensus google-python-cloud-debugger==2.9 googleapis-common-protos==1.5.6 # via google-api-core grpcio-health-checking==1.13.0 grpcio==1.16.1 httplib2==0.12.0 # via google-api-python-client, google-auth-httplib2 idna==2.8 # via requests -opencensus[stackdriver]==0.1.10 +opencensus-context==0.1.1 # via opencensus +opencensus-ext-grpc==0.7.1 +opencensus-ext-zipkin==0.2.2 +opencensus==0.7.6 protobuf==3.6.1 # via google-api-core, google-cloud-profiler, googleapis-common-protos, grpcio-health-checking pyasn1-modules==0.2.4 # via google-auth pyasn1==0.4.5 # via pyasn1-modules, rsa @@ -32,3 +35,6 @@ rsa==4.0 # via google-auth six==1.12.0 # via google-api-core, google-api-python-client, google-auth, google-python-cloud-debugger, grpcio, protobuf uritemplate==3.0.0 # via google-api-python-client urllib3==1.24.1 # via requests + +# The following packages are considered to be unsafe in a requirements file: +# setuptools