diff --git a/src/currencyservice/currency_server.py b/src/currencyservice/currency_server.py new file mode 100644 index 0000000..cba3073 --- /dev/null +++ b/src/currencyservice/currency_server.py @@ -0,0 +1,111 @@ +#!/usr/bin/python +# +# Copyright 2018 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import random +import time +import traceback +from concurrent import futures + +import googleclouddebugger +import googlecloudprofiler +from google.auth.exceptions import DefaultCredentialsError +import grpc +from opencensus.trace.exporters import print_exporter +from opencensus.trace.exporters import stackdriver_exporter +from opencensus.trace.ext.grpc import server_interceptor +from opencensus.common.transports.async_ import AsyncTransport +from opencensus.trace.samplers import always_on + +import demo_pb2 +import demo_pb2_grpc +from grpc_health.v1 import health_pb2 +from grpc_health.v1 import health_pb2_grpc + +from logger import getJSONLogger +logger = getJSONLogger('xchangerateservice-server') + +from ddtrace import patch_all +patch_all() + +def initStackdriverProfiling(): + project_id = None + try: + project_id = os.environ["GCP_PROJECT_ID"] + except KeyError: + # Environment variable not set + pass + + for retry in xrange(1,4): + try: + if project_id: + googlecloudprofiler.start(service='xchangerate_server', service_version='1.0.0', verbose=0, project_id=project_id) + else: + googlecloudprofiler.start(service='xchangerate_server', service_version='1.0.0', verbose=0) + logger.info("Successfully started Stackdriver Profiler.") + return + except (BaseException) as exc: + logger.info("Unable to start Stackdriver Profiler Python agent. " + str(exc)) + if (retry < 4): + logger.info("Sleeping %d seconds to retry Stackdriver Profiler agent initialization"%(retry*10)) + time.sleep (1) + else: + logger.warning("Could not initialize Stackdriver Profiler after retrying, giving up") + return + +class xChangeRateiService(): + + def __init__(self): + API_KEY = '6a565ada48da0541918ea2a9' # os.environ("EXCHANGERATE_API_KEY") + url = 'https://v6.exchangerate-api.com/v6/'+ API_KEY + '/latest/USD' + response = requests.get(url) + data = response.json() + self.rates = data['conversion_rates'] + + def convert (self, amt, from_curr, to_curr): + try: + return amt * self.rates[to_curr] / self.rates[from_curr] + except: + raise + +if __name__ == "__main__": + logger.info("initializing xchangerateservice") + + port = os.environ.get('PORT', "8081") + api_key = os.environ.get('EXCHANGERATE_API_KEY', '') + if api_key == "": + raise Exception('EXCHANGERATE_API_KEY environment variable not set') + + # create gRPC server + server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), + interceptors=(tracer_interceptor,)) + + # add class to gRPC server + service = xChangeRateiService() + demo_pb2_grpc.add_xChangeRateiServiceServicer_to_server(service, server) + health_pb2_grpc.add_HealthServicer_to_server(service, server) + + # start server + logger.info("listening on port: " + port) + server.add_insecure_port('[::]:'+port) + server.start() + + # keep alive + try: + while True: + time.sleep(10000) + except KeyboardInterrupt: + server.stop(0) diff --git a/src/emailservice/email_server.py b/src/emailservice/email_server.py index ee5fc71..a1a261b 100644 --- a/src/emailservice/email_server.py +++ b/src/emailservice/email_server.py @@ -29,6 +29,9 @@ import demo_pb2_grpc from grpc_health.v1 import health_pb2 from grpc_health.v1 import health_pb2_grpc +from sendgrid import SendGridAPIClient +from sendgrid.helpers.mail import Mail + from opencensus.trace.exporters import stackdriver_exporter from opencensus.trace.exporters import print_exporter from opencensus.trace.ext.grpc import server_interceptor @@ -72,23 +75,50 @@ class EmailService(BaseEmailService): @staticmethod def send_email(client, email_address, content): - response = client.send_message( - sender = client.sender_path(project_id, region, sender_id), - envelope_from_authority = '', - header_from_authority = '', - envelope_from_address = from_address, - simple_message = { - "from": { - "address_spec": from_address, - }, - "to": [{ - "address_spec": email_address - }], - "subject": "Your Confirmation Email", - "html_body": content - } - ) - logger.info("Message sent: {}".format(response.rfc822_message_id)) + + from_address='from_email@microservices-demo.com' + + message = Mail( + from_email=from_address, + to_emails=email_address, + subject='Your Confirmation Email', + html_content=content) + try: + sg = SendGridAPIClient(os.environ.get('SENDGRID_API_KEY')) + response = sg.send(message) + print(response.status_code) + print(response.body) + print(response.headers) + except Exception as e: + print(e.message) + + logger.info("Message sent: {}".format(response.status_code)) + + +# class EmailService(BaseEmailService): +# def __init__(self): +# raise Exception('cloud mail client not implemented') +# super().__init__() + +# @staticmethod +# def send_email(client, email_address, content): +# response = client.send_message( +# sender = client.sender_path(project_id, region, sender_id), +# envelope_from_authority = '', +# header_from_authority = '', +# envelope_from_address = from_address, +# simple_message = { +# "from": { +# "address_spec": from_address, +# }, +# "to": [{ +# "address_spec": email_address +# }], +# "subject": "Your Confirmation Email", +# "html_body": content +# } +# ) +# logger.info("Message sent: {}".format(response.rfc822_message_id)) def SendOrderConfirmation(self, request, context): email = request.email diff --git a/src/paymentservice/stripe-server.py b/src/paymentservice/stripe-server.py new file mode 100644 index 0000000..4f0161c --- /dev/null +++ b/src/paymentservice/stripe-server.py @@ -0,0 +1,109 @@ +#! /usr/bin/env python3.6 + +""" +server.py +Stripe Recipe. +Python 3.6 or newer required. +""" + +import stripe +import json +import os + +from flask import Flask, render_template, jsonify, request, send_from_directory +from dotenv import load_dotenv, find_dotenv + +# Setup Stripe python client library +load_dotenv(find_dotenv()) +stripe.api_key = os.getenv('STRIPE_SECRET_KEY') +stripe.api_version = os.getenv('STRIPE_API_VERSION') + +static_dir = str(os.path.abspath(os.path.join( + __file__, "..", os.getenv("STATIC_DIR")))) +app = Flask(__name__, static_folder=static_dir, + static_url_path="", template_folder=static_dir) + + +@app.route('/', methods=['GET']) +def get_example(): + return render_template('index.html') + + +@app.route('/publishable-key', methods=['GET']) +def get_publishable_key(): + return jsonify({'publishableKey': os.getenv('STRIPE_PUBLISHABLE_KEY')}) + +# Fetch the Checkout Session to display the JSON result on the success page +@app.route('/checkout-session', methods=['GET']) +def get_checkout_session(): + id = request.args.get('sessionId') + checkout_session = stripe.checkout.Session.retrieve(id) + return jsonify(checkout_session) + + +@app.route('/create-checkout-session', methods=['POST']) +def create_checkout_session(): + data = json.loads(request.data) + domain_url = os.getenv('DOMAIN') + price_id = os.getenv('SUBSCRIPTION_PRICE_ID') + product_id = os.getenv('DONATION_PRODUCT_ID') + line_items = [{"price": price_id, "quantity": 1}] + + try: + if data['donation'] > 0: + line_items.append( + {"quantity": 1, "price_data": {"product": product_id, "unit_amount": data['donation'], "currency": "usd"}}) + # Sign customer up for subscription + checkout_session = stripe.checkout.Session.create( + mode="subscription", + success_url=domain_url + + "/success.html?session_id={CHECKOUT_SESSION_ID}", + cancel_url=domain_url + "/cancel.html", + payment_method_types=["card"], + line_items=line_items + ) + + return jsonify({'checkoutSessionId': checkout_session['id']}) + except Exception as e: + return jsonify(error=str(e)), 403 + + +@app.route('/webhook', methods=['POST']) +def webhook_received(): + # You can use webhooks to receive information about asynchronous payment events. + # For more about our webhook events check out https://stripe.com/docs/webhooks. + webhook_secret = os.getenv('STRIPE_WEBHOOK_SECRET') + request_data = json.loads(request.data) + + if webhook_secret: + # Retrieve the event by verifying the signature using the raw body and secret if webhook signing is configured. + signature = request.headers.get('stripe-signature') + try: + event = stripe.Webhook.construct_event( + payload=request.data, sig_header=signature, secret=webhook_secret) + data = event['data'] + except Exception as e: + return e + # Get the type of webhook event sent - used to check the status of PaymentIntents. + event_type = event['type'] + else: + data = request_data['data'] + event_type = request_data['type'] + data_object = data['object'] + + if event_type == 'checkout.session.completed': + items = data_object['display_items'] + customer = stripe.Customer.retrieve(data_object['customer']) + + if len(items) > 0 and items[0].custom and items[0].custom.name == 'Pasha e-book': + print( + '🔔 Customer is subscribed and bought an e-book! Send the e-book to ' + customer.email) + else: + print( + '🔔 Customer is subscribed but did not buy an e-book') + + return jsonify({'status': 'success'}) + + +if __name__ == '__main__': + app.run(port=4242) diff --git a/src/recommendationservice/recommendation_server.py b/src/recommendationservice/recommendation_server.py index 67d598a..9017e93 100644 --- a/src/recommendationservice/recommendation_server.py +++ b/src/recommendationservice/recommendation_server.py @@ -35,6 +35,12 @@ import demo_pb2_grpc from grpc_health.v1 import health_pb2 from grpc_health.v1 import health_pb2_grpc +from recombee_api_client.api_client import RecombeeClient +from recombee_api_client.exceptions import APIException +from recombee_api_client.api_requests import AddPurchase, RecommendItemsToUser, Batch +import random + + from logger import getJSONLogger logger = getJSONLogger('recommendationservice-server') @@ -66,24 +72,62 @@ def initStackdriverProfiling(): logger.warning("Could not initialize Stackdriver Profiler after retrying, giving up") return +# class RecommendationService(demo_pb2_grpc.RecommendationServiceServicer): +# def ListRecommendations(self, request, context): +# max_responses = 5 +# # fetch list of products from product catalog stub +# cat_response = product_catalog_stub.ListProducts(demo_pb2.Empty()) +# product_ids = [x.id for x in cat_response.products] +# filtered_products = list(set(product_ids)-set(request.product_ids)) +# num_products = len(filtered_products) +# num_return = min(max_responses, num_products) +# # sample list of indicies to return +# indices = random.sample(range(num_products), num_return) +# # fetch product ids from indices +# prod_list = [filtered_products[i] for i in indices] +# logger.info("[Recv ListRecommendations] product_ids={}".format(prod_list)) +# # build and return response +# response = demo_pb2.ListRecommendationsResponse() +# response.product_ids.extend(prod_list) +# return response + +# def Check(self, request, context): +# return health_pb2.HealthCheckResponse( +# status=health_pb2.HealthCheckResponse.SERVING) + + class RecommendationService(demo_pb2_grpc.RecommendationServiceServicer): - def ListRecommendations(self, request, context): + def __init__(self): + db_id = os.environ('RECOMBEE_DB_ID') + db_private_token = os.environ('RECOMBEE_PRIVATE_TOKEN') + self.client = RecombeeClient(db_id, db_private_token) + self.purchase_requests = generate_purchases() + + def generate_purchases(self): + #Generate some random purchases of items by users + PROBABILITY_PURCHASED = 0.1 + NUM = 100 + purchase_requests = [] + + for user_id in ["user-%s" % i for i in range(NUM) ]: + for item_id in ["item-%s" % i for i in range(NUM) ]: + if random.random() < PROBABILITY_PURCHASED: + request = AddPurchase(user_id, item_id, cascade_create=True) + purchase_requests.append(request) + return purchase_requests + + def ListRecommendations(self, user_id): max_responses = 5 - # fetch list of products from product catalog stub - cat_response = product_catalog_stub.ListProducts(demo_pb2.Empty()) - product_ids = [x.id for x in cat_response.products] - filtered_products = list(set(product_ids)-set(request.product_ids)) - num_products = len(filtered_products) - num_return = min(max_responses, num_products) - # sample list of indicies to return - indices = random.sample(range(num_products), num_return) - # fetch product ids from indices - prod_list = [filtered_products[i] for i in indices] - logger.info("[Recv ListRecommendations] product_ids={}".format(prod_list)) - # build and return response - response = demo_pb2.ListRecommendationsResponse() - response.product_ids.extend(prod_list) - return response + try: + # Send the data to Recombee, use Batch for faster processing of larger data + client.send(Batch(self.purchase_requests)) + # Get recommendations for user + recommended = client.send(RecommendItemsToUser(user_id, max_responses)) + except: + raise APIException + + logger.info("[Recv ListRecommendations] product_ids={}".format(recommended)) + return recommended def Check(self, request, context): return health_pb2.HealthCheckResponse( diff --git a/src/shippingservice/shipping_server.py b/src/shippingservice/shipping_server.py new file mode 100644 index 0000000..b35ecae --- /dev/null +++ b/src/shippingservice/shipping_server.py @@ -0,0 +1,112 @@ +#!/usr/bin/python +# +# Copyright 2018 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import random +import time +import traceback +from concurrent import futures + +import googleclouddebugger +import googlecloudprofiler +from google.auth.exceptions import DefaultCredentialsError +import grpc +from opencensus.trace.exporters import print_exporter +from opencensus.trace.exporters import stackdriver_exporter +from opencensus.trace.ext.grpc import server_interceptor +from opencensus.common.transports.async_ import AsyncTransport +from opencensus.trace.samplers import always_on + +import demo_pb2 +import demo_pb2_grpc +from grpc_health.v1 import health_pb2 +from grpc_health.v1 import health_pb2_grpc + +import easypost + +from logger import getJSONLogger +logger = getJSONLogger('shippingservice-server') + +from ddtrace import patch_all +patch_all() + +def initStackdriverProfiling(): + project_id = None + try: + project_id = os.environ["GCP_PROJECT_ID"] + except KeyError: + # Environment variable not set + pass + + for retry in xrange(1,4): + try: + if project_id: + googlecloudprofiler.start(service='shipping_server', service_version='1.0.0', verbose=0, project_id=project_id) + else: + googlecloudprofiler.start(service='shipping_server', service_version='1.0.0', verbose=0) + logger.info("Successfully started Stackdriver Profiler.") + return + except (BaseException) as exc: + logger.info("Unable to start Stackdriver Profiler Python agent. " + str(exc)) + if (retry < 4): + logger.info("Sleeping %d seconds to retry Stackdriver Profiler agent initialization"%(retry*10)) + time.sleep (1) + else: + logger.warning("Could not initialize Stackdriver Profiler after retrying, giving up") + return + +class ShippingService(): + + def __init__(self, from_address_pb, to_address_pb): + easypost.api_key = os.environ("EASYPOST_API_KEY") + + def create_shipment(self, from_address_pb, to_address_pb): + self.shipment = easypost.Shipment.create( + to_address=self.create_address(to_address_pb), + from_address=self.create_address(from_address_pb), + parcel=self.create_parcel() + self.shipment.buy(rate=shipment.lowest_rate(carriers=['USPS'], + services=['First'])) + ) + +if __name__ == "__main__": + logger.info("initializing xchangerateservice") + + port = os.environ.get('PORT', "8082") + api_key = os.environ.get('EASYPOST_API_KEY', '') + if api_key == "": + raise Exception('EASYPOST_API_KEY environment variable not set') + + # create gRPC server + server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), + interceptors=(tracer_interceptor,)) + + # add class to gRPC server + service = ShippingService() + demo_pb2_grpc.add_ShippingServiceServicer_to_server(service, server) + health_pb2_grpc.add_HealthServicer_to_server(service, server) + + # start server + logger.info("listening on port: " + port) + server.add_insecure_port('[::]:'+port) + server.start() + + # keep alive + try: + while True: + time.sleep(10000) + except KeyboardInterrupt: + server.stop(0)