added apis from multiple vendors

This commit is contained in:
Ilango Ravi 2020-09-09 08:50:07 -07:00
parent 48312858f3
commit fd531f0b4f
No known key found for this signature in database
GPG key ID: 3C037BF52C132530
5 changed files with 439 additions and 33 deletions

View file

@ -0,0 +1,111 @@
#!/usr/bin/python
#
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import random
import time
import traceback
from concurrent import futures
import googleclouddebugger
import googlecloudprofiler
from google.auth.exceptions import DefaultCredentialsError
import grpc
from opencensus.trace.exporters import print_exporter
from opencensus.trace.exporters import stackdriver_exporter
from opencensus.trace.ext.grpc import server_interceptor
from opencensus.common.transports.async_ import AsyncTransport
from opencensus.trace.samplers import always_on
import demo_pb2
import demo_pb2_grpc
from grpc_health.v1 import health_pb2
from grpc_health.v1 import health_pb2_grpc
from logger import getJSONLogger
logger = getJSONLogger('xchangerateservice-server')
from ddtrace import patch_all
patch_all()
def initStackdriverProfiling():
project_id = None
try:
project_id = os.environ["GCP_PROJECT_ID"]
except KeyError:
# Environment variable not set
pass
for retry in xrange(1,4):
try:
if project_id:
googlecloudprofiler.start(service='xchangerate_server', service_version='1.0.0', verbose=0, project_id=project_id)
else:
googlecloudprofiler.start(service='xchangerate_server', service_version='1.0.0', verbose=0)
logger.info("Successfully started Stackdriver Profiler.")
return
except (BaseException) as exc:
logger.info("Unable to start Stackdriver Profiler Python agent. " + str(exc))
if (retry < 4):
logger.info("Sleeping %d seconds to retry Stackdriver Profiler agent initialization"%(retry*10))
time.sleep (1)
else:
logger.warning("Could not initialize Stackdriver Profiler after retrying, giving up")
return
class xChangeRateiService():
def __init__(self):
API_KEY = '6a565ada48da0541918ea2a9' # os.environ("EXCHANGERATE_API_KEY")
url = 'https://v6.exchangerate-api.com/v6/'+ API_KEY + '/latest/USD'
response = requests.get(url)
data = response.json()
self.rates = data['conversion_rates']
def convert (self, amt, from_curr, to_curr):
try:
return amt * self.rates[to_curr] / self.rates[from_curr]
except:
raise
if __name__ == "__main__":
logger.info("initializing xchangerateservice")
port = os.environ.get('PORT', "8081")
api_key = os.environ.get('EXCHANGERATE_API_KEY', '')
if api_key == "":
raise Exception('EXCHANGERATE_API_KEY environment variable not set')
# create gRPC server
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10),
interceptors=(tracer_interceptor,))
# add class to gRPC server
service = xChangeRateiService()
demo_pb2_grpc.add_xChangeRateiServiceServicer_to_server(service, server)
health_pb2_grpc.add_HealthServicer_to_server(service, server)
# start server
logger.info("listening on port: " + port)
server.add_insecure_port('[::]:'+port)
server.start()
# keep alive
try:
while True:
time.sleep(10000)
except KeyboardInterrupt:
server.stop(0)

View file

@ -29,6 +29,9 @@ import demo_pb2_grpc
from grpc_health.v1 import health_pb2
from grpc_health.v1 import health_pb2_grpc
from sendgrid import SendGridAPIClient
from sendgrid.helpers.mail import Mail
from opencensus.trace.exporters import stackdriver_exporter
from opencensus.trace.exporters import print_exporter
from opencensus.trace.ext.grpc import server_interceptor
@ -72,23 +75,50 @@ class EmailService(BaseEmailService):
@staticmethod
def send_email(client, email_address, content):
response = client.send_message(
sender = client.sender_path(project_id, region, sender_id),
envelope_from_authority = '',
header_from_authority = '',
envelope_from_address = from_address,
simple_message = {
"from": {
"address_spec": from_address,
},
"to": [{
"address_spec": email_address
}],
"subject": "Your Confirmation Email",
"html_body": content
}
)
logger.info("Message sent: {}".format(response.rfc822_message_id))
from_address='from_email@microservices-demo.com'
message = Mail(
from_email=from_address,
to_emails=email_address,
subject='Your Confirmation Email',
html_content=content)
try:
sg = SendGridAPIClient(os.environ.get('SENDGRID_API_KEY'))
response = sg.send(message)
print(response.status_code)
print(response.body)
print(response.headers)
except Exception as e:
print(e.message)
logger.info("Message sent: {}".format(response.status_code))
# class EmailService(BaseEmailService):
# def __init__(self):
# raise Exception('cloud mail client not implemented')
# super().__init__()
# @staticmethod
# def send_email(client, email_address, content):
# response = client.send_message(
# sender = client.sender_path(project_id, region, sender_id),
# envelope_from_authority = '',
# header_from_authority = '',
# envelope_from_address = from_address,
# simple_message = {
# "from": {
# "address_spec": from_address,
# },
# "to": [{
# "address_spec": email_address
# }],
# "subject": "Your Confirmation Email",
# "html_body": content
# }
# )
# logger.info("Message sent: {}".format(response.rfc822_message_id))
def SendOrderConfirmation(self, request, context):
email = request.email

View file

@ -0,0 +1,109 @@
#! /usr/bin/env python3.6
"""
server.py
Stripe Recipe.
Python 3.6 or newer required.
"""
import stripe
import json
import os
from flask import Flask, render_template, jsonify, request, send_from_directory
from dotenv import load_dotenv, find_dotenv
# Setup Stripe python client library
load_dotenv(find_dotenv())
stripe.api_key = os.getenv('STRIPE_SECRET_KEY')
stripe.api_version = os.getenv('STRIPE_API_VERSION')
static_dir = str(os.path.abspath(os.path.join(
__file__, "..", os.getenv("STATIC_DIR"))))
app = Flask(__name__, static_folder=static_dir,
static_url_path="", template_folder=static_dir)
@app.route('/', methods=['GET'])
def get_example():
return render_template('index.html')
@app.route('/publishable-key', methods=['GET'])
def get_publishable_key():
return jsonify({'publishableKey': os.getenv('STRIPE_PUBLISHABLE_KEY')})
# Fetch the Checkout Session to display the JSON result on the success page
@app.route('/checkout-session', methods=['GET'])
def get_checkout_session():
id = request.args.get('sessionId')
checkout_session = stripe.checkout.Session.retrieve(id)
return jsonify(checkout_session)
@app.route('/create-checkout-session', methods=['POST'])
def create_checkout_session():
data = json.loads(request.data)
domain_url = os.getenv('DOMAIN')
price_id = os.getenv('SUBSCRIPTION_PRICE_ID')
product_id = os.getenv('DONATION_PRODUCT_ID')
line_items = [{"price": price_id, "quantity": 1}]
try:
if data['donation'] > 0:
line_items.append(
{"quantity": 1, "price_data": {"product": product_id, "unit_amount": data['donation'], "currency": "usd"}})
# Sign customer up for subscription
checkout_session = stripe.checkout.Session.create(
mode="subscription",
success_url=domain_url +
"/success.html?session_id={CHECKOUT_SESSION_ID}",
cancel_url=domain_url + "/cancel.html",
payment_method_types=["card"],
line_items=line_items
)
return jsonify({'checkoutSessionId': checkout_session['id']})
except Exception as e:
return jsonify(error=str(e)), 403
@app.route('/webhook', methods=['POST'])
def webhook_received():
# You can use webhooks to receive information about asynchronous payment events.
# For more about our webhook events check out https://stripe.com/docs/webhooks.
webhook_secret = os.getenv('STRIPE_WEBHOOK_SECRET')
request_data = json.loads(request.data)
if webhook_secret:
# Retrieve the event by verifying the signature using the raw body and secret if webhook signing is configured.
signature = request.headers.get('stripe-signature')
try:
event = stripe.Webhook.construct_event(
payload=request.data, sig_header=signature, secret=webhook_secret)
data = event['data']
except Exception as e:
return e
# Get the type of webhook event sent - used to check the status of PaymentIntents.
event_type = event['type']
else:
data = request_data['data']
event_type = request_data['type']
data_object = data['object']
if event_type == 'checkout.session.completed':
items = data_object['display_items']
customer = stripe.Customer.retrieve(data_object['customer'])
if len(items) > 0 and items[0].custom and items[0].custom.name == 'Pasha e-book':
print(
'🔔 Customer is subscribed and bought an e-book! Send the e-book to ' + customer.email)
else:
print(
'🔔 Customer is subscribed but did not buy an e-book')
return jsonify({'status': 'success'})
if __name__ == '__main__':
app.run(port=4242)

View file

@ -35,6 +35,12 @@ import demo_pb2_grpc
from grpc_health.v1 import health_pb2
from grpc_health.v1 import health_pb2_grpc
from recombee_api_client.api_client import RecombeeClient
from recombee_api_client.exceptions import APIException
from recombee_api_client.api_requests import AddPurchase, RecommendItemsToUser, Batch
import random
from logger import getJSONLogger
logger = getJSONLogger('recommendationservice-server')
@ -66,24 +72,62 @@ def initStackdriverProfiling():
logger.warning("Could not initialize Stackdriver Profiler after retrying, giving up")
return
# class RecommendationService(demo_pb2_grpc.RecommendationServiceServicer):
# def ListRecommendations(self, request, context):
# max_responses = 5
# # fetch list of products from product catalog stub
# cat_response = product_catalog_stub.ListProducts(demo_pb2.Empty())
# product_ids = [x.id for x in cat_response.products]
# filtered_products = list(set(product_ids)-set(request.product_ids))
# num_products = len(filtered_products)
# num_return = min(max_responses, num_products)
# # sample list of indicies to return
# indices = random.sample(range(num_products), num_return)
# # fetch product ids from indices
# prod_list = [filtered_products[i] for i in indices]
# logger.info("[Recv ListRecommendations] product_ids={}".format(prod_list))
# # build and return response
# response = demo_pb2.ListRecommendationsResponse()
# response.product_ids.extend(prod_list)
# return response
# def Check(self, request, context):
# return health_pb2.HealthCheckResponse(
# status=health_pb2.HealthCheckResponse.SERVING)
class RecommendationService(demo_pb2_grpc.RecommendationServiceServicer):
def ListRecommendations(self, request, context):
def __init__(self):
db_id = os.environ('RECOMBEE_DB_ID')
db_private_token = os.environ('RECOMBEE_PRIVATE_TOKEN')
self.client = RecombeeClient(db_id, db_private_token)
self.purchase_requests = generate_purchases()
def generate_purchases(self):
#Generate some random purchases of items by users
PROBABILITY_PURCHASED = 0.1
NUM = 100
purchase_requests = []
for user_id in ["user-%s" % i for i in range(NUM) ]:
for item_id in ["item-%s" % i for i in range(NUM) ]:
if random.random() < PROBABILITY_PURCHASED:
request = AddPurchase(user_id, item_id, cascade_create=True)
purchase_requests.append(request)
return purchase_requests
def ListRecommendations(self, user_id):
max_responses = 5
# fetch list of products from product catalog stub
cat_response = product_catalog_stub.ListProducts(demo_pb2.Empty())
product_ids = [x.id for x in cat_response.products]
filtered_products = list(set(product_ids)-set(request.product_ids))
num_products = len(filtered_products)
num_return = min(max_responses, num_products)
# sample list of indicies to return
indices = random.sample(range(num_products), num_return)
# fetch product ids from indices
prod_list = [filtered_products[i] for i in indices]
logger.info("[Recv ListRecommendations] product_ids={}".format(prod_list))
# build and return response
response = demo_pb2.ListRecommendationsResponse()
response.product_ids.extend(prod_list)
return response
try:
# Send the data to Recombee, use Batch for faster processing of larger data
client.send(Batch(self.purchase_requests))
# Get recommendations for user
recommended = client.send(RecommendItemsToUser(user_id, max_responses))
except:
raise APIException
logger.info("[Recv ListRecommendations] product_ids={}".format(recommended))
return recommended
def Check(self, request, context):
return health_pb2.HealthCheckResponse(

View file

@ -0,0 +1,112 @@
#!/usr/bin/python
#
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import random
import time
import traceback
from concurrent import futures
import googleclouddebugger
import googlecloudprofiler
from google.auth.exceptions import DefaultCredentialsError
import grpc
from opencensus.trace.exporters import print_exporter
from opencensus.trace.exporters import stackdriver_exporter
from opencensus.trace.ext.grpc import server_interceptor
from opencensus.common.transports.async_ import AsyncTransport
from opencensus.trace.samplers import always_on
import demo_pb2
import demo_pb2_grpc
from grpc_health.v1 import health_pb2
from grpc_health.v1 import health_pb2_grpc
import easypost
from logger import getJSONLogger
logger = getJSONLogger('shippingservice-server')
from ddtrace import patch_all
patch_all()
def initStackdriverProfiling():
project_id = None
try:
project_id = os.environ["GCP_PROJECT_ID"]
except KeyError:
# Environment variable not set
pass
for retry in xrange(1,4):
try:
if project_id:
googlecloudprofiler.start(service='shipping_server', service_version='1.0.0', verbose=0, project_id=project_id)
else:
googlecloudprofiler.start(service='shipping_server', service_version='1.0.0', verbose=0)
logger.info("Successfully started Stackdriver Profiler.")
return
except (BaseException) as exc:
logger.info("Unable to start Stackdriver Profiler Python agent. " + str(exc))
if (retry < 4):
logger.info("Sleeping %d seconds to retry Stackdriver Profiler agent initialization"%(retry*10))
time.sleep (1)
else:
logger.warning("Could not initialize Stackdriver Profiler after retrying, giving up")
return
class ShippingService():
def __init__(self, from_address_pb, to_address_pb):
easypost.api_key = os.environ("EASYPOST_API_KEY")
def create_shipment(self, from_address_pb, to_address_pb):
self.shipment = easypost.Shipment.create(
to_address=self.create_address(to_address_pb),
from_address=self.create_address(from_address_pb),
parcel=self.create_parcel()
self.shipment.buy(rate=shipment.lowest_rate(carriers=['USPS'],
services=['First']))
)
if __name__ == "__main__":
logger.info("initializing xchangerateservice")
port = os.environ.get('PORT', "8082")
api_key = os.environ.get('EASYPOST_API_KEY', '')
if api_key == "":
raise Exception('EASYPOST_API_KEY environment variable not set')
# create gRPC server
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10),
interceptors=(tracer_interceptor,))
# add class to gRPC server
service = ShippingService()
demo_pb2_grpc.add_ShippingServiceServicer_to_server(service, server)
health_pb2_grpc.add_HealthServicer_to_server(service, server)
# start server
logger.info("listening on port: " + port)
server.add_insecure_port('[::]:'+port)
server.start()
# keep alive
try:
while True:
time.sleep(10000)
except KeyboardInterrupt:
server.stop(0)