From 7d0b45d6c8220b713d4100f27d007f1d956cc88d Mon Sep 17 00:00:00 2001 From: Dmitry Date: Wed, 13 May 2020 15:49:05 -0700 Subject: [PATCH] Add trace_id and span_id fields to logs in go services This commit adds `trace_id` and `span_id` json fields to log output in go services. It's the first step of introducing traces/logs correlation. --- src/checkoutservice/main.go | 71 ++++++++++++--------- src/frontend/middleware.go | 17 +++++ src/productcatalogservice/server.go | 97 +++++++++++++++++------------ src/shippingservice/main.go | 70 +++++++++++++-------- 4 files changed, 159 insertions(+), 96 deletions(-) diff --git a/src/checkoutservice/main.go b/src/checkoutservice/main.go index f27dcaa..4a8ab84 100644 --- a/src/checkoutservice/main.go +++ b/src/checkoutservice/main.go @@ -16,6 +16,7 @@ package main import ( "context" + "encoding/hex" "fmt" "net" "os" @@ -43,12 +44,12 @@ const ( usdCurrency = "USD" ) -var log *logrus.Logger +var logger *logrus.Logger func init() { - log = logrus.New() - log.Level = logrus.DebugLevel - log.Formatter = &logrus.JSONFormatter{ + logger = logrus.New() + logger.Level = logrus.DebugLevel + logger.Formatter = &logrus.JSONFormatter{ FieldMap: logrus.FieldMap{ logrus.FieldKeyTime: "timestamp", logrus.FieldKeyLevel: "severity", @@ -56,7 +57,7 @@ func init() { }, TimestampFormat: time.RFC3339Nano, } - log.Out = os.Stdout + logger.Out = os.Stdout } type checkoutService struct { @@ -70,17 +71,17 @@ type checkoutService struct { func main() { if os.Getenv("DISABLE_TRACING") == "" { - log.Info("Tracing enabled.") + logger.Info("Tracing enabled.") go initTracing() } else { - log.Info("Tracing disabled.") + logger.Info("Tracing disabled.") } if os.Getenv("DISABLE_PROFILER") == "" { - log.Info("Profiling enabled.") + logger.Info("Profiling enabled.") go initProfiling("checkoutservice", "1.0.0") } else { - log.Info("Profiling disabled.") + logger.Info("Profiling disabled.") } port := listenPort @@ -96,32 +97,32 @@ func main() { mustMapEnv(&svc.emailSvcAddr, "EMAIL_SERVICE_ADDR") mustMapEnv(&svc.paymentSvcAddr, "PAYMENT_SERVICE_ADDR") - log.Infof("service config: %+v", svc) + logger.Infof("service config: %+v", svc) lis, err := net.Listen("tcp", fmt.Sprintf(":%s", port)) if err != nil { - log.Fatal(err) + logger.Fatal(err) } var srv *grpc.Server if os.Getenv("DISABLE_STATS") == "" { - log.Info("Stats enabled.") + logger.Info("Stats enabled.") srv = grpc.NewServer(grpc.StatsHandler(&ocgrpc.ServerHandler{})) } else { - log.Info("Stats disabled.") + logger.Info("Stats disabled.") srv = grpc.NewServer() } pb.RegisterCheckoutServiceServer(srv, svc) healthpb.RegisterHealthServer(srv, svc) - log.Infof("starting to listen on tcp: %q", lis.Addr().String()) + logger.Infof("starting to listen on tcp: %q", lis.Addr().String()) err = srv.Serve(lis) - log.Fatal(err) + logger.Fatal(err) } func initJaegerTracing() { svcAddr := os.Getenv("JAEGER_SERVICE_ADDR") if svcAddr == "" { - log.Info("jaeger initialization disabled.") + logger.Info("jaeger initialization disabled.") return } @@ -134,19 +135,19 @@ func initJaegerTracing() { }, }) if err != nil { - log.Fatal(err) + logger.Fatal(err) } trace.RegisterExporter(exporter) - log.Info("jaeger initialization completed.") + logger.Info("jaeger initialization completed.") } func initStats(exporter *stackdriver.Exporter) { view.SetReportingPeriod(60 * time.Second) view.RegisterExporter(exporter) if err := view.Register(ocgrpc.DefaultServerViews...); err != nil { - log.Warn("Error registering default server views") + logger.Warn("Error registering default server views") } else { - log.Info("Registered default server views") + logger.Info("Registered default server views") } } @@ -156,20 +157,20 @@ func initStackdriverTracing() { for i := 1; i <= 3; i++ { exporter, err := stackdriver.NewExporter(stackdriver.Options{}) if err != nil { - log.Infof("failed to initialize stackdriver exporter: %+v", err) + logger.Infof("failed to initialize stackdriver exporter: %+v", err) } else { trace.RegisterExporter(exporter) - log.Info("registered Stackdriver tracing") + logger.Info("registered Stackdriver tracing") // Register the views to collect server stats. initStats(exporter) return } d := time.Second * 10 * time.Duration(i) - log.Infof("sleeping %v to retry initializing Stackdriver exporter", d) + logger.Infof("sleeping %v to retry initializing Stackdriver exporter", d) time.Sleep(d) } - log.Warn("could not initialize Stackdriver exporter after retrying, giving up") + logger.Warn("could not initialize Stackdriver exporter after retrying, giving up") } func initTracing() { @@ -187,16 +188,16 @@ func initProfiling(service, version string) { // ProjectID must be set if not running on GCP. // ProjectID: "my-project", }); err != nil { - log.Warnf("failed to start profiler: %+v", err) + logger.Warnf("failed to start profiler: %+v", err) } else { - log.Info("started Stackdriver profiler") + logger.Info("started Stackdriver profiler") return } d := time.Second * 10 * time.Duration(i) - log.Infof("sleeping %v to retry initializing Stackdriver profiler", d) + logger.Infof("sleeping %v to retry initializing Stackdriver profiler", d) time.Sleep(d) } - log.Warn("could not initialize Stackdriver profiler after retrying, giving up") + logger.Warn("could not initialize Stackdriver profiler after retrying, giving up") } func mustMapEnv(target *string, envKey string) { @@ -216,6 +217,7 @@ func (cs *checkoutService) Watch(req *healthpb.HealthCheckRequest, ws healthpb.H } func (cs *checkoutService) PlaceOrder(ctx context.Context, req *pb.PlaceOrderRequest) (*pb.PlaceOrderResponse, error) { + log := logger.WithFields(getTraceLogFields(ctx)) log.Infof("[PlaceOrder] user_id=%q user_currency=%q", req.UserId, req.UserCurrency) orderID, err := uuid.NewUUID() @@ -428,3 +430,16 @@ func (cs *checkoutService) shipOrder(ctx context.Context, address *pb.Address, i } // TODO: Dial and create client once, reuse. + +func getTraceLogFields(ctx context.Context) logrus.Fields { + span := trace.FromContext(ctx) + if span == nil { + return logrus.Fields{} + } + traceID := span.SpanContext().TraceID + spanID := span.SpanContext().SpanID + return logrus.Fields{ + "trace_id": hex.EncodeToString(traceID[:]), + "span_id": hex.EncodeToString(spanID[:]), + } +} diff --git a/src/frontend/middleware.go b/src/frontend/middleware.go index 4d4221a..8d05451 100644 --- a/src/frontend/middleware.go +++ b/src/frontend/middleware.go @@ -16,11 +16,13 @@ package main import ( "context" + "encoding/hex" "net/http" "time" "github.com/google/uuid" "github.com/sirupsen/logrus" + "go.opencensus.io/trace" ) type ctxKeyLog struct{} @@ -68,6 +70,7 @@ func (lh *logHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if v, ok := r.Context().Value(ctxKeySessionID{}).(string); ok { log = log.WithField("session", v) } + log = withTraceContext(log, r) log.Debug("request started") defer func() { log.WithFields(logrus.Fields{ @@ -81,6 +84,20 @@ func (lh *logHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { lh.next.ServeHTTP(rr, r) } +// withTraceContext add trace fields to log messages +func withTraceContext(log *logrus.Entry, r *http.Request) *logrus.Entry { + span := trace.FromContext(r.Context()) + if span == nil { + return log + } + traceID := span.SpanContext().TraceID + spanID := span.SpanContext().SpanID + return log.WithFields(logrus.Fields{ + "trace_id": hex.EncodeToString(traceID[:]), + "span_id": hex.EncodeToString(spanID[:]), + }) +} + func ensureSessionID(next http.Handler) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { var sessionID string diff --git a/src/productcatalogservice/server.go b/src/productcatalogservice/server.go index 47e6c7a..80f3b6b 100644 --- a/src/productcatalogservice/server.go +++ b/src/productcatalogservice/server.go @@ -17,6 +17,7 @@ package main import ( "bytes" "context" + "encoding/hex" "flag" "fmt" "io/ioutil" @@ -48,7 +49,7 @@ import ( var ( cat pb.ListProductsResponse catalogMutex *sync.Mutex - log *logrus.Logger + logger *logrus.Logger extraLatency time.Duration port = "3550" @@ -57,8 +58,8 @@ var ( ) func init() { - log = logrus.New() - log.Formatter = &logrus.JSONFormatter{ + logger = logrus.New() + logger.Formatter = &logrus.JSONFormatter{ FieldMap: logrus.FieldMap{ logrus.FieldKeyTime: "timestamp", logrus.FieldKeyLevel: "severity", @@ -66,27 +67,27 @@ func init() { }, TimestampFormat: time.RFC3339Nano, } - log.Out = os.Stdout + logger.Out = os.Stdout catalogMutex = &sync.Mutex{} - err := readCatalogFile(&cat) + err := readCatalogFile(context.Background(), &cat) if err != nil { - log.Warnf("could not parse product catalog") + logger.Warnf("could not parse product catalog") } } func main() { if os.Getenv("DISABLE_TRACING") == "" { - log.Info("Tracing enabled.") + logger.Info("Tracing enabled.") go initTracing() } else { - log.Info("Tracing disabled.") + logger.Info("Tracing disabled.") } if os.Getenv("DISABLE_PROFILER") == "" { - log.Info("Profiling enabled.") + logger.Info("Profiling enabled.") go initProfiling("productcatalogservice", "1.0.0") } else { - log.Info("Profiling disabled.") + logger.Info("Profiling disabled.") } flag.Parse() @@ -95,10 +96,10 @@ func main() { if s := os.Getenv("EXTRA_LATENCY"); s != "" { v, err := time.ParseDuration(s) if err != nil { - log.Fatalf("failed to parse EXTRA_LATENCY (%s) as time.Duration: %+v", v, err) + logger.Fatalf("failed to parse EXTRA_LATENCY (%s) as time.Duration: %+v", v, err) } extraLatency = v - log.Infof("extra latency enabled (duration: %v)", extraLatency) + logger.Infof("extra latency enabled (duration: %v)", extraLatency) } else { extraLatency = time.Duration(0) } @@ -108,13 +109,13 @@ func main() { go func() { for { sig := <-sigs - log.Printf("Received signal: %s", sig) + logger.Printf("Received signal: %s", sig) if sig == syscall.SIGUSR1 { reloadCatalog = true - log.Infof("Enable catalog reloading") + logger.Infof("Enable catalog reloading") } else { reloadCatalog = false - log.Infof("Disable catalog reloading") + logger.Infof("Disable catalog reloading") } } }() @@ -122,7 +123,7 @@ func main() { if os.Getenv("PORT") != "" { port = os.Getenv("PORT") } - log.Infof("starting grpc server at :%s", port) + logger.Infof("starting grpc server at :%s", port) run(port) select {} } @@ -130,14 +131,14 @@ func main() { func run(port string) string { l, err := net.Listen("tcp", fmt.Sprintf(":%s", port)) if err != nil { - log.Fatal(err) + logger.Fatal(err) } var srv *grpc.Server if os.Getenv("DISABLE_STATS") == "" { - log.Info("Stats enabled.") + logger.Info("Stats enabled.") srv = grpc.NewServer(grpc.StatsHandler(&ocgrpc.ServerHandler{})) } else { - log.Info("Stats disabled.") + logger.Info("Stats disabled.") srv = grpc.NewServer() } @@ -152,7 +153,7 @@ func run(port string) string { func initJaegerTracing() { svcAddr := os.Getenv("JAEGER_SERVICE_ADDR") if svcAddr == "" { - log.Info("jaeger initialization disabled.") + logger.Info("jaeger initialization disabled.") return } // Register the Jaeger exporter to be able to retrieve @@ -164,19 +165,19 @@ func initJaegerTracing() { }, }) if err != nil { - log.Fatal(err) + logger.Fatal(err) } trace.RegisterExporter(exporter) - log.Info("jaeger initialization completed.") + logger.Info("jaeger initialization completed.") } func initStats(exporter *stackdriver.Exporter) { view.SetReportingPeriod(60 * time.Second) view.RegisterExporter(exporter) if err := view.Register(ocgrpc.DefaultServerViews...); err != nil { - log.Info("Error registering default server views") + logger.Info("Error registering default server views") } else { - log.Info("Registered default server views") + logger.Info("Registered default server views") } } @@ -186,21 +187,21 @@ func initStackdriverTracing() { for i := 1; i <= 3; i++ { exporter, err := stackdriver.NewExporter(stackdriver.Options{}) if err != nil { - log.Warnf("failed to initialize Stackdriver exporter: %+v", err) + logger.Warnf("failed to initialize Stackdriver exporter: %+v", err) } else { trace.RegisterExporter(exporter) trace.ApplyConfig(trace.Config{DefaultSampler: trace.AlwaysSample()}) - log.Info("registered Stackdriver tracing") + logger.Info("registered Stackdriver tracing") // Register the views to collect server stats. initStats(exporter) return } d := time.Second * 10 * time.Duration(i) - log.Infof("sleeping %v to retry initializing Stackdriver exporter", d) + logger.Infof("sleeping %v to retry initializing Stackdriver exporter", d) time.Sleep(d) } - log.Warn("could not initialize Stackdriver exporter after retrying, giving up") + logger.Warn("could not initialize Stackdriver exporter after retrying, giving up") } func initTracing() { @@ -218,21 +219,22 @@ func initProfiling(service, version string) { // ProjectID must be set if not running on GCP. // ProjectID: "my-project", }); err != nil { - log.Warnf("failed to start profiler: %+v", err) + logger.Warnf("failed to start profiler: %+v", err) } else { - log.Info("started Stackdriver profiler") + logger.Info("started Stackdriver profiler") return } d := time.Second * 10 * time.Duration(i) - log.Infof("sleeping %v to retry initializing Stackdriver profiler", d) + logger.Infof("sleeping %v to retry initializing Stackdriver profiler", d) time.Sleep(d) } - log.Warn("could not initialize Stackdriver profiler after retrying, giving up") + logger.Warn("could not initialize Stackdriver profiler after retrying, giving up") } type productCatalog struct{} -func readCatalogFile(catalog *pb.ListProductsResponse) error { +func readCatalogFile(ctx context.Context, catalog *pb.ListProductsResponse) error { + log := logger.WithFields(getTraceLogFields(ctx)) catalogMutex.Lock() defer catalogMutex.Unlock() catalogJSON, err := ioutil.ReadFile("products.json") @@ -248,9 +250,9 @@ func readCatalogFile(catalog *pb.ListProductsResponse) error { return nil } -func parseCatalog() []*pb.Product { +func parseCatalog(ctx context.Context) []*pb.Product { if reloadCatalog || len(cat.Products) == 0 { - err := readCatalogFile(&cat) + err := readCatalogFile(ctx, &cat) if err != nil { return []*pb.Product{} } @@ -266,17 +268,17 @@ func (p *productCatalog) Watch(req *healthpb.HealthCheckRequest, ws healthpb.Hea return status.Errorf(codes.Unimplemented, "health check via Watch not implemented") } -func (p *productCatalog) ListProducts(context.Context, *pb.Empty) (*pb.ListProductsResponse, error) { +func (p *productCatalog) ListProducts(ctx context.Context, _ *pb.Empty) (*pb.ListProductsResponse, error) { time.Sleep(extraLatency) - return &pb.ListProductsResponse{Products: parseCatalog()}, nil + return &pb.ListProductsResponse{Products: parseCatalog(ctx)}, nil } func (p *productCatalog) GetProduct(ctx context.Context, req *pb.GetProductRequest) (*pb.Product, error) { time.Sleep(extraLatency) var found *pb.Product - for i := 0; i < len(parseCatalog()); i++ { - if req.Id == parseCatalog()[i].Id { - found = parseCatalog()[i] + for i := 0; i < len(parseCatalog(ctx)); i++ { + if req.Id == parseCatalog(ctx)[i].Id { + found = parseCatalog(ctx)[i] } } if found == nil { @@ -289,7 +291,7 @@ func (p *productCatalog) SearchProducts(ctx context.Context, req *pb.SearchProdu time.Sleep(extraLatency) // Intepret query as a substring match in name or description. var ps []*pb.Product - for _, p := range parseCatalog() { + for _, p := range parseCatalog(ctx) { if strings.Contains(strings.ToLower(p.Name), strings.ToLower(req.Query)) || strings.Contains(strings.ToLower(p.Description), strings.ToLower(req.Query)) { ps = append(ps, p) @@ -297,3 +299,16 @@ func (p *productCatalog) SearchProducts(ctx context.Context, req *pb.SearchProdu } return &pb.SearchProductsResponse{Results: ps}, nil } + +func getTraceLogFields(ctx context.Context) logrus.Fields { + span := trace.FromContext(ctx) + if span == nil { + return logrus.Fields{} + } + traceID := span.SpanContext().TraceID + spanID := span.SpanContext().SpanID + return logrus.Fields{ + "trace_id": hex.EncodeToString(traceID[:]), + "span_id": hex.EncodeToString(spanID[:]), + } +} diff --git a/src/shippingservice/main.go b/src/shippingservice/main.go index 0a30752..aae5408 100644 --- a/src/shippingservice/main.go +++ b/src/shippingservice/main.go @@ -15,6 +15,7 @@ package main import ( + "encoding/hex" "fmt" "net" "os" @@ -41,12 +42,12 @@ const ( defaultPort = "50051" ) -var log *logrus.Logger +var logger *logrus.Logger func init() { - log = logrus.New() - log.Level = logrus.DebugLevel - log.Formatter = &logrus.JSONFormatter{ + logger = logrus.New() + logger.Level = logrus.DebugLevel + logger.Formatter = &logrus.JSONFormatter{ FieldMap: logrus.FieldMap{ logrus.FieldKeyTime: "timestamp", logrus.FieldKeyLevel: "severity", @@ -54,22 +55,22 @@ func init() { }, TimestampFormat: time.RFC3339Nano, } - log.Out = os.Stdout + logger.Out = os.Stdout } func main() { if os.Getenv("DISABLE_TRACING") == "" { - log.Info("Tracing enabled.") + logger.Info("Tracing enabled.") go initTracing() } else { - log.Info("Tracing disabled.") + logger.Info("Tracing disabled.") } if os.Getenv("DISABLE_PROFILER") == "" { - log.Info("Profiling enabled.") + logger.Info("Profiling enabled.") go initProfiling("shippingservice", "1.0.0") } else { - log.Info("Profiling disabled.") + logger.Info("Profiling disabled.") } port := defaultPort @@ -80,26 +81,26 @@ func main() { lis, err := net.Listen("tcp", port) if err != nil { - log.Fatalf("failed to listen: %v", err) + logger.Fatalf("failed to listen: %v", err) } var srv *grpc.Server if os.Getenv("DISABLE_STATS") == "" { - log.Info("Stats enabled.") + logger.Info("Stats enabled.") srv = grpc.NewServer(grpc.StatsHandler(&ocgrpc.ServerHandler{})) } else { - log.Info("Stats disabled.") + logger.Info("Stats disabled.") srv = grpc.NewServer() } svc := &server{} pb.RegisterShippingServiceServer(srv, svc) healthpb.RegisterHealthServer(srv, svc) - log.Infof("Shipping Service listening on port %s", port) + logger.Infof("Shipping Service listening on port %s", port) // Register reflection service on gRPC server. reflection.Register(srv) if err := srv.Serve(lis); err != nil { - log.Fatalf("failed to serve: %v", err) + logger.Fatalf("failed to serve: %v", err) } } @@ -117,6 +118,7 @@ func (s *server) Watch(req *healthpb.HealthCheckRequest, ws healthpb.Health_Watc // GetQuote produces a shipping quote (cost) in USD. func (s *server) GetQuote(ctx context.Context, in *pb.GetQuoteRequest) (*pb.GetQuoteResponse, error) { + log := logger.WithFields(getTraceLogFields(ctx)) log.Info("[GetQuote] received request") defer log.Info("[GetQuote] completed request") @@ -142,6 +144,7 @@ func (s *server) GetQuote(ctx context.Context, in *pb.GetQuoteRequest) (*pb.GetQ // ShipOrder mocks that the requested items will be shipped. // It supplies a tracking ID for notional lookup of shipment delivery status. func (s *server) ShipOrder(ctx context.Context, in *pb.ShipOrderRequest) (*pb.ShipOrderResponse, error) { + log := logger.WithFields(getTraceLogFields(ctx)) log.Info("[ShipOrder] received request") defer log.Info("[ShipOrder] completed request") // 1. Create a Tracking ID @@ -157,7 +160,7 @@ func (s *server) ShipOrder(ctx context.Context, in *pb.ShipOrderRequest) (*pb.Sh func initJaegerTracing() { svcAddr := os.Getenv("JAEGER_SERVICE_ADDR") if svcAddr == "" { - log.Info("jaeger initialization disabled.") + logger.Info("jaeger initialization disabled.") return } @@ -170,19 +173,19 @@ func initJaegerTracing() { }, }) if err != nil { - log.Fatal(err) + logger.Fatal(err) } trace.RegisterExporter(exporter) - log.Info("jaeger initialization completed.") + logger.Info("jaeger initialization completed.") } func initStats(exporter *stackdriver.Exporter) { view.SetReportingPeriod(60 * time.Second) view.RegisterExporter(exporter) if err := view.Register(ocgrpc.DefaultServerViews...); err != nil { - log.Warn("Error registering default server views") + logger.Warn("Error registering default server views") } else { - log.Info("Registered default server views") + logger.Info("Registered default server views") } } @@ -192,21 +195,21 @@ func initStackdriverTracing() { for i := 1; i <= 3; i++ { exporter, err := stackdriver.NewExporter(stackdriver.Options{}) if err != nil { - log.Warnf("failed to initialize Stackdriver exporter: %+v", err) + logger.Warnf("failed to initialize Stackdriver exporter: %+v", err) } else { trace.RegisterExporter(exporter) trace.ApplyConfig(trace.Config{DefaultSampler: trace.AlwaysSample()}) - log.Info("registered Stackdriver tracing") + logger.Info("registered Stackdriver tracing") // Register the views to collect server stats. initStats(exporter) return } d := time.Second * 10 * time.Duration(i) - log.Infof("sleeping %v to retry initializing Stackdriver exporter", d) + logger.Infof("sleeping %v to retry initializing Stackdriver exporter", d) time.Sleep(d) } - log.Warn("could not initialize Stackdriver exporter after retrying, giving up") + logger.Warn("could not initialize Stackdriver exporter after retrying, giving up") } func initTracing() { @@ -224,14 +227,27 @@ func initProfiling(service, version string) { // ProjectID must be set if not running on GCP. // ProjectID: "my-project", }); err != nil { - log.Warnf("failed to start profiler: %+v", err) + logger.Warnf("failed to start profiler: %+v", err) } else { - log.Info("started Stackdriver profiler") + logger.Info("started Stackdriver profiler") return } d := time.Second * 10 * time.Duration(i) - log.Infof("sleeping %v to retry initializing Stackdriver profiler", d) + logger.Infof("sleeping %v to retry initializing Stackdriver profiler", d) time.Sleep(d) } - log.Warn("could not initialize Stackdriver profiler after retrying, giving up") + logger.Warn("could not initialize Stackdriver profiler after retrying, giving up") +} + +func getTraceLogFields(ctx context.Context) logrus.Fields { + span := trace.FromContext(ctx) + if span == nil { + return logrus.Fields{} + } + traceID := span.SpanContext().TraceID + spanID := span.SpanContext().SpanID + return logrus.Fields{ + "trace_id": hex.EncodeToString(traceID[:]), + "span_id": hex.EncodeToString(spanID[:]), + } }