log: use python-json-logger and create CustomJsonFormatter

python-json-logger is one of the most popular JSON formatter for Python logging
and it requires a custom class to fit the log spec of Stackdriver Logging.
As Python projects in this project do not share the logger module,
the custom logger module needs to be duplicated in each micro services.
This commit is contained in:
Yoshi Yamaguchi 2018-10-03 12:58:14 +09:00
parent 8e6e7b1113
commit e858902995
8 changed files with 69 additions and 85 deletions

View file

@ -15,16 +15,12 @@
# limitations under the License. # limitations under the License.
import grpc import grpc
import logging
import demo_pb2 import demo_pb2
import demo_pb2_grpc import demo_pb2_grpc
from logger import JSONStreamHandler from logger import getJSONLogger
logger = getJSONLogger('emailservice-client')
log = logging.getLogger('emailservice')
log.setLevel(logging.INFO)
log.addHandler(JSONStreamHandler())
# from opencensus.trace.tracer import Tracer # from opencensus.trace.tracer import Tracer
# from opencensus.trace.exporters import stackdriver_exporter # from opencensus.trace.exporters import stackdriver_exporter
@ -46,10 +42,10 @@ def send_confirmation_email(email, order):
email = email, email = email,
order = order order = order
)) ))
log.info('Request sent.') logger.info('Request sent.')
except grpc.RpcError as err: except grpc.RpcError as err:
log.error(err.details()) logger.error(err.details())
log.error('{}, {}'.format(err.code().name, err.code().value)) logger.error('{}, {}'.format(err.code().name, err.code().value))
if __name__ == '__main__': if __name__ == '__main__':
log.info('Client for email service.') logger.info('Client for email service.')

View file

@ -16,7 +16,6 @@
from concurrent import futures from concurrent import futures
import argparse import argparse
import logging
import os import os
import sys import sys
import time import time
@ -29,8 +28,6 @@ import demo_pb2_grpc
from grpc_health.v1 import health_pb2 from grpc_health.v1 import health_pb2
from grpc_health.v1 import health_pb2_grpc from grpc_health.v1 import health_pb2_grpc
from logger import JSONStreamHandler
# from opencensus.trace.ext.grpc import server_interceptor # from opencensus.trace.ext.grpc import server_interceptor
# from opencensus.trace.samplers import always_on # from opencensus.trace.samplers import always_on
# from opencensus.trace.exporters import stackdriver_exporter # from opencensus.trace.exporters import stackdriver_exporter
@ -53,9 +50,8 @@ from logger import JSONStreamHandler
# except: # except:
# pass # pass
log = logging.getLogger('emailservice') from logger import getJSONLogger
log.setLevel(logging.INFO) logger = getJSONLogger('emailservice-server')
log.addHandler(JSONStreamHandler())
# Loads confirmation email template from file # Loads confirmation email template from file
env = Environment( env = Environment(
@ -92,7 +88,7 @@ class EmailService(BaseEmailService):
"html_body": content "html_body": content
} }
) )
log.info("Message sent: {}".format(response.rfc822_message_id)) logger.info("Message sent: {}".format(response.rfc822_message_id))
def SendOrderConfirmation(self, request, context): def SendOrderConfirmation(self, request, context):
email = request.email email = request.email
@ -102,7 +98,7 @@ class EmailService(BaseEmailService):
confirmation = template.render(order = order) confirmation = template.render(order = order)
except TemplateError as err: except TemplateError as err:
context.set_details("An error occurred when preparing the confirmation mail.") context.set_details("An error occurred when preparing the confirmation mail.")
log.error(err.message) logger.error(err.message)
context.set_code(grpc.StatusCode.INTERNAL) context.set_code(grpc.StatusCode.INTERNAL)
return demo_pb2.Empty() return demo_pb2.Empty()
@ -118,7 +114,7 @@ class EmailService(BaseEmailService):
class DummyEmailService(BaseEmailService): class DummyEmailService(BaseEmailService):
def SendOrderConfirmation(self, request, context): def SendOrderConfirmation(self, request, context):
log.info('A request to send order confirmation email to {} has been received.'.format(request.email)) logger.info('A request to send order confirmation email to {} has been received.'.format(request.email))
return demo_pb2.Empty() return demo_pb2.Empty()
class HealthCheck(): class HealthCheck():
@ -138,7 +134,7 @@ def start(dummy_mode):
health_pb2_grpc.add_HealthServicer_to_server(service, server) health_pb2_grpc.add_HealthServicer_to_server(service, server)
port = os.environ.get('PORT', "8080") port = os.environ.get('PORT', "8080")
log.info("listening on port: "+port) logger.info("listening on port: "+port)
server.add_insecure_port('[::]:'+port) server.add_insecure_port('[::]:'+port)
server.start() server.start()
try: try:
@ -149,5 +145,5 @@ def start(dummy_mode):
if __name__ == '__main__': if __name__ == '__main__':
log.info('starting the email service in dummy mode.') logger.info('starting the email service in dummy mode.')
start(dummy_mode = True) start(dummy_mode = True)

View file

@ -16,26 +16,25 @@
import logging import logging
import sys import sys
import json from pythonjsonlogger import jsonlogger
class JSONStreamHandler(logging.StreamHandler): # TODO(yoshifumi) this class is duplicated since other Python services are
"""JSONStreamHandler emits log data in JSON format to stdout""" # not sharing the modules for logging.
class CustomJsonFormatter(jsonlogger.JsonFormatter):
def add_fields(self, log_record, record, message_dict):
super(CustomJsonFormatter, self).add_fields(log_record, record, message_dict)
if not log_record.get('timestamp'):
log_record['timestamp'] = record.created
if log_record.get('severity'):
log_record['severity'] = log_record['severity'].upper()
else:
log_record['severity'] = record.levelname
def __init__(self): def getJSONLogger(name):
super().__init__(sys.stdout) logger = logging.getLogger(name)
handler = logging.StreamHandler(sys.stdout)
def emit(self, record): formatter = CustomJsonFormatter('(timestamp) (severity) (name) (message)')
try: handler.setFormatter(formatter)
msg = super().format(record) logger.addHandler(handler)
data = { logger.setLevel(logging.INFO)
'message': msg, return logger
'severity': record.levelname,
'name': 'emailservice'
}
stream = self.stream
stream.write(json.dumps(data)+'\n')
self.flush()
except (KeyboardInterrupt, SystemExit):
raise
except:
self.handleError(record)

View file

@ -32,6 +32,7 @@ pycairo==1.17.1
pycparser==2.19 pycparser==2.19
pycrypto==2.6.1 pycrypto==2.6.1
PyGObject==3.30.1 PyGObject==3.30.1
python-json-logger==0.1.9
pytz==2018.5 pytz==2018.5
pyxdg==0.26 pyxdg==0.26
requests==2.19.1 requests==2.19.1

View file

@ -14,21 +14,17 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import sys
import grpc import grpc
import demo_pb2 import demo_pb2
import demo_pb2_grpc import demo_pb2_grpc
import sys
import logging
from opencensus.trace.tracer import Tracer from opencensus.trace.tracer import Tracer
from opencensus.trace.exporters import stackdriver_exporter from opencensus.trace.exporters import stackdriver_exporter
from opencensus.trace.ext.grpc import client_interceptor from opencensus.trace.ext.grpc import client_interceptor
from logger import JSONStreamHandler from logger import getJSONLogger
logger = getJSONLogger('recommendationservice-server')
log = logging.getLogger('recommendationservice')
log.setLevel(logging.INFO)
log.addHandler(JSONStreamHandler())
if __name__ == "__main__": if __name__ == "__main__":
# get port # get port
@ -52,4 +48,4 @@ if __name__ == "__main__":
request = demo_pb2.ListRecommendationsRequest(user_id="test", product_ids=["test"]) request = demo_pb2.ListRecommendationsRequest(user_id="test", product_ids=["test"])
# make call to server # make call to server
response = stub.ListRecommendations(request) response = stub.ListRecommendations(request)
log.info(response) logger.info(response)

View file

@ -16,26 +16,25 @@
import logging import logging
import sys import sys
import json from pythonjsonlogger import jsonlogger
class JSONStreamHandler(logging.StreamHandler): # TODO(yoshifumi) this class is duplicated since other Python services are
"""JSONStreamHandler emits log data in JSON format to stdout""" # not sharing the modules for logging.
class CustomJsonFormatter(jsonlogger.JsonFormatter):
def add_fields(self, log_record, record, message_dict):
super(CustomJsonFormatter, self).add_fields(log_record, record, message_dict)
if not log_record.get('timestamp'):
log_record['timestamp'] = record.created
if log_record.get('severity'):
log_record['severity'] = log_record['severity'].upper()
else:
log_record['severity'] = record.levelname
def __init__(self): def getJSONLogger(name):
super(JSONStreamHandler, self).__init__(sys.stdout) logger = logging.getLogger(name)
handler = logging.StreamHandler(sys.stdout)
def emit(self, record): formatter = CustomJsonFormatter('(timestamp) (severity) (name) (message)')
try: handler.setFormatter(formatter)
msg = super(JSONStreamHandler, self).format(record) logger.addHandler(handler)
data = { logger.setLevel(logging.INFO)
'message': msg, return logger
'severity': record.levelname,
'name': 'emailservice'
}
stream = self.stream
stream.write(json.dumps(data)+'\n')
self.flush()
except (KeyboardInterrupt, SystemExit):
raise
except:
self.handleError(record)

View file

@ -18,9 +18,8 @@ import grpc
from concurrent import futures from concurrent import futures
import time import time
import traceback import traceback
import random
import os import os
import logging import random
import googleclouddebugger import googleclouddebugger
import demo_pb2 import demo_pb2
@ -28,11 +27,8 @@ import demo_pb2_grpc
from grpc_health.v1 import health_pb2 from grpc_health.v1 import health_pb2
from grpc_health.v1 import health_pb2_grpc from grpc_health.v1 import health_pb2_grpc
from logger import JSONStreamHandler from logger import getJSONLogger
logger = getJSONLogger('recommendationservice-server')
log = logging.getLogger('recommendationservice')
log.setLevel(logging.INFO)
log.addHandler(JSONStreamHandler())
# TODO(morganmclean,ahmetb) tracing currently disabled due to memory leak (see TODO below) # TODO(morganmclean,ahmetb) tracing currently disabled due to memory leak (see TODO below)
# from opencensus.trace.ext.grpc import server_interceptor # from opencensus.trace.ext.grpc import server_interceptor
@ -53,7 +49,7 @@ class RecommendationService(demo_pb2_grpc.RecommendationServiceServicer):
indices = random.sample(range(num_products), num_return) indices = random.sample(range(num_products), num_return)
# fetch product ids from indices # fetch product ids from indices
prod_list = [filtered_products[i] for i in indices] prod_list = [filtered_products[i] for i in indices]
log.info("[Recv ListRecommendations] product_ids={}".format(prod_list)) logger.info("[Recv ListRecommendations] product_ids={}".format(prod_list))
# build and return response # build and return response
response = demo_pb2.ListRecommendationsResponse() response = demo_pb2.ListRecommendationsResponse()
response.product_ids.extend(prod_list) response.product_ids.extend(prod_list)
@ -65,7 +61,7 @@ class RecommendationService(demo_pb2_grpc.RecommendationServiceServicer):
if __name__ == "__main__": if __name__ == "__main__":
log.info("initializing recommendationservice") logger.info("initializing recommendationservice")
# TODO(morganmclean,ahmetb) enabling the tracing interceptor/sampler below # TODO(morganmclean,ahmetb) enabling the tracing interceptor/sampler below
# causes an unbounded memory leak eventually OOMing the container. # causes an unbounded memory leak eventually OOMing the container.
@ -83,15 +79,15 @@ if __name__ == "__main__":
version='1.0.0' version='1.0.0'
) )
except Exception, err: except Exception, err:
log.error("could not enable debugger") logger.error("could not enable debugger")
log.error(traceback.print_exc()) logger.error(traceback.print_exc())
pass pass
port = os.environ.get('PORT', "8080") port = os.environ.get('PORT', "8080")
catalog_addr = os.environ.get('PRODUCT_CATALOG_SERVICE_ADDR', '') catalog_addr = os.environ.get('PRODUCT_CATALOG_SERVICE_ADDR', '')
if catalog_addr == "": if catalog_addr == "":
raise Exception('PRODUCT_CATALOG_SERVICE_ADDR environment variable not set') raise Exception('PRODUCT_CATALOG_SERVICE_ADDR environment variable not set')
log.info("product catalog address: " + catalog_addr) logger.info("product catalog address: " + catalog_addr)
channel = grpc.insecure_channel(catalog_addr) channel = grpc.insecure_channel(catalog_addr)
product_catalog_stub = demo_pb2_grpc.ProductCatalogServiceStub(channel) product_catalog_stub = demo_pb2_grpc.ProductCatalogServiceStub(channel)
@ -104,7 +100,7 @@ if __name__ == "__main__":
health_pb2_grpc.add_HealthServicer_to_server(service, server) health_pb2_grpc.add_HealthServicer_to_server(service, server)
# start server # start server
log.info("listening on port: " + port) logger.info("listening on port: " + port)
server.add_insecure_port('[::]:'+port) server.add_insecure_port('[::]:'+port)
server.start() server.start()

View file

@ -20,6 +20,7 @@ opencensus==0.1.5
protobuf==3.5.2.post1 protobuf==3.5.2.post1
pyasn1==0.4.3 pyasn1==0.4.3
pyasn1-modules==0.2.2 pyasn1-modules==0.2.2
python-json-logger==0.1.9
pytz==2018.5 pytz==2018.5
PyYAML==3.13 PyYAML==3.13
requests==2.19.1 requests==2.19.1