Add trace_id and span_id fields to logs in go services

This commit adds `trace_id` and `span_id` json fields to log output in go services. It's the first step of introducing traces/logs correlation.
This commit is contained in:
Dmitry 2020-05-13 15:49:05 -07:00 committed by Dmitrii Anoshin
parent db03c38ffe
commit 7d0b45d6c8
4 changed files with 159 additions and 96 deletions

View file

@ -16,6 +16,7 @@ package main
import (
"context"
"encoding/hex"
"fmt"
"net"
"os"
@ -43,12 +44,12 @@ const (
usdCurrency = "USD"
)
var log *logrus.Logger
var logger *logrus.Logger
func init() {
log = logrus.New()
log.Level = logrus.DebugLevel
log.Formatter = &logrus.JSONFormatter{
logger = logrus.New()
logger.Level = logrus.DebugLevel
logger.Formatter = &logrus.JSONFormatter{
FieldMap: logrus.FieldMap{
logrus.FieldKeyTime: "timestamp",
logrus.FieldKeyLevel: "severity",
@ -56,7 +57,7 @@ func init() {
},
TimestampFormat: time.RFC3339Nano,
}
log.Out = os.Stdout
logger.Out = os.Stdout
}
type checkoutService struct {
@ -70,17 +71,17 @@ type checkoutService struct {
func main() {
if os.Getenv("DISABLE_TRACING") == "" {
log.Info("Tracing enabled.")
logger.Info("Tracing enabled.")
go initTracing()
} else {
log.Info("Tracing disabled.")
logger.Info("Tracing disabled.")
}
if os.Getenv("DISABLE_PROFILER") == "" {
log.Info("Profiling enabled.")
logger.Info("Profiling enabled.")
go initProfiling("checkoutservice", "1.0.0")
} else {
log.Info("Profiling disabled.")
logger.Info("Profiling disabled.")
}
port := listenPort
@ -96,32 +97,32 @@ func main() {
mustMapEnv(&svc.emailSvcAddr, "EMAIL_SERVICE_ADDR")
mustMapEnv(&svc.paymentSvcAddr, "PAYMENT_SERVICE_ADDR")
log.Infof("service config: %+v", svc)
logger.Infof("service config: %+v", svc)
lis, err := net.Listen("tcp", fmt.Sprintf(":%s", port))
if err != nil {
log.Fatal(err)
logger.Fatal(err)
}
var srv *grpc.Server
if os.Getenv("DISABLE_STATS") == "" {
log.Info("Stats enabled.")
logger.Info("Stats enabled.")
srv = grpc.NewServer(grpc.StatsHandler(&ocgrpc.ServerHandler{}))
} else {
log.Info("Stats disabled.")
logger.Info("Stats disabled.")
srv = grpc.NewServer()
}
pb.RegisterCheckoutServiceServer(srv, svc)
healthpb.RegisterHealthServer(srv, svc)
log.Infof("starting to listen on tcp: %q", lis.Addr().String())
logger.Infof("starting to listen on tcp: %q", lis.Addr().String())
err = srv.Serve(lis)
log.Fatal(err)
logger.Fatal(err)
}
func initJaegerTracing() {
svcAddr := os.Getenv("JAEGER_SERVICE_ADDR")
if svcAddr == "" {
log.Info("jaeger initialization disabled.")
logger.Info("jaeger initialization disabled.")
return
}
@ -134,19 +135,19 @@ func initJaegerTracing() {
},
})
if err != nil {
log.Fatal(err)
logger.Fatal(err)
}
trace.RegisterExporter(exporter)
log.Info("jaeger initialization completed.")
logger.Info("jaeger initialization completed.")
}
func initStats(exporter *stackdriver.Exporter) {
view.SetReportingPeriod(60 * time.Second)
view.RegisterExporter(exporter)
if err := view.Register(ocgrpc.DefaultServerViews...); err != nil {
log.Warn("Error registering default server views")
logger.Warn("Error registering default server views")
} else {
log.Info("Registered default server views")
logger.Info("Registered default server views")
}
}
@ -156,20 +157,20 @@ func initStackdriverTracing() {
for i := 1; i <= 3; i++ {
exporter, err := stackdriver.NewExporter(stackdriver.Options{})
if err != nil {
log.Infof("failed to initialize stackdriver exporter: %+v", err)
logger.Infof("failed to initialize stackdriver exporter: %+v", err)
} else {
trace.RegisterExporter(exporter)
log.Info("registered Stackdriver tracing")
logger.Info("registered Stackdriver tracing")
// Register the views to collect server stats.
initStats(exporter)
return
}
d := time.Second * 10 * time.Duration(i)
log.Infof("sleeping %v to retry initializing Stackdriver exporter", d)
logger.Infof("sleeping %v to retry initializing Stackdriver exporter", d)
time.Sleep(d)
}
log.Warn("could not initialize Stackdriver exporter after retrying, giving up")
logger.Warn("could not initialize Stackdriver exporter after retrying, giving up")
}
func initTracing() {
@ -187,16 +188,16 @@ func initProfiling(service, version string) {
// ProjectID must be set if not running on GCP.
// ProjectID: "my-project",
}); err != nil {
log.Warnf("failed to start profiler: %+v", err)
logger.Warnf("failed to start profiler: %+v", err)
} else {
log.Info("started Stackdriver profiler")
logger.Info("started Stackdriver profiler")
return
}
d := time.Second * 10 * time.Duration(i)
log.Infof("sleeping %v to retry initializing Stackdriver profiler", d)
logger.Infof("sleeping %v to retry initializing Stackdriver profiler", d)
time.Sleep(d)
}
log.Warn("could not initialize Stackdriver profiler after retrying, giving up")
logger.Warn("could not initialize Stackdriver profiler after retrying, giving up")
}
func mustMapEnv(target *string, envKey string) {
@ -216,6 +217,7 @@ func (cs *checkoutService) Watch(req *healthpb.HealthCheckRequest, ws healthpb.H
}
func (cs *checkoutService) PlaceOrder(ctx context.Context, req *pb.PlaceOrderRequest) (*pb.PlaceOrderResponse, error) {
log := logger.WithFields(getTraceLogFields(ctx))
log.Infof("[PlaceOrder] user_id=%q user_currency=%q", req.UserId, req.UserCurrency)
orderID, err := uuid.NewUUID()
@ -428,3 +430,16 @@ func (cs *checkoutService) shipOrder(ctx context.Context, address *pb.Address, i
}
// TODO: Dial and create client once, reuse.
func getTraceLogFields(ctx context.Context) logrus.Fields {
span := trace.FromContext(ctx)
if span == nil {
return logrus.Fields{}
}
traceID := span.SpanContext().TraceID
spanID := span.SpanContext().SpanID
return logrus.Fields{
"trace_id": hex.EncodeToString(traceID[:]),
"span_id": hex.EncodeToString(spanID[:]),
}
}

View file

@ -16,11 +16,13 @@ package main
import (
"context"
"encoding/hex"
"net/http"
"time"
"github.com/google/uuid"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)
type ctxKeyLog struct{}
@ -68,6 +70,7 @@ func (lh *logHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if v, ok := r.Context().Value(ctxKeySessionID{}).(string); ok {
log = log.WithField("session", v)
}
log = withTraceContext(log, r)
log.Debug("request started")
defer func() {
log.WithFields(logrus.Fields{
@ -81,6 +84,20 @@ func (lh *logHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
lh.next.ServeHTTP(rr, r)
}
// withTraceContext add trace fields to log messages
func withTraceContext(log *logrus.Entry, r *http.Request) *logrus.Entry {
span := trace.FromContext(r.Context())
if span == nil {
return log
}
traceID := span.SpanContext().TraceID
spanID := span.SpanContext().SpanID
return log.WithFields(logrus.Fields{
"trace_id": hex.EncodeToString(traceID[:]),
"span_id": hex.EncodeToString(spanID[:]),
})
}
func ensureSessionID(next http.Handler) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var sessionID string

View file

@ -17,6 +17,7 @@ package main
import (
"bytes"
"context"
"encoding/hex"
"flag"
"fmt"
"io/ioutil"
@ -48,7 +49,7 @@ import (
var (
cat pb.ListProductsResponse
catalogMutex *sync.Mutex
log *logrus.Logger
logger *logrus.Logger
extraLatency time.Duration
port = "3550"
@ -57,8 +58,8 @@ var (
)
func init() {
log = logrus.New()
log.Formatter = &logrus.JSONFormatter{
logger = logrus.New()
logger.Formatter = &logrus.JSONFormatter{
FieldMap: logrus.FieldMap{
logrus.FieldKeyTime: "timestamp",
logrus.FieldKeyLevel: "severity",
@ -66,27 +67,27 @@ func init() {
},
TimestampFormat: time.RFC3339Nano,
}
log.Out = os.Stdout
logger.Out = os.Stdout
catalogMutex = &sync.Mutex{}
err := readCatalogFile(&cat)
err := readCatalogFile(context.Background(), &cat)
if err != nil {
log.Warnf("could not parse product catalog")
logger.Warnf("could not parse product catalog")
}
}
func main() {
if os.Getenv("DISABLE_TRACING") == "" {
log.Info("Tracing enabled.")
logger.Info("Tracing enabled.")
go initTracing()
} else {
log.Info("Tracing disabled.")
logger.Info("Tracing disabled.")
}
if os.Getenv("DISABLE_PROFILER") == "" {
log.Info("Profiling enabled.")
logger.Info("Profiling enabled.")
go initProfiling("productcatalogservice", "1.0.0")
} else {
log.Info("Profiling disabled.")
logger.Info("Profiling disabled.")
}
flag.Parse()
@ -95,10 +96,10 @@ func main() {
if s := os.Getenv("EXTRA_LATENCY"); s != "" {
v, err := time.ParseDuration(s)
if err != nil {
log.Fatalf("failed to parse EXTRA_LATENCY (%s) as time.Duration: %+v", v, err)
logger.Fatalf("failed to parse EXTRA_LATENCY (%s) as time.Duration: %+v", v, err)
}
extraLatency = v
log.Infof("extra latency enabled (duration: %v)", extraLatency)
logger.Infof("extra latency enabled (duration: %v)", extraLatency)
} else {
extraLatency = time.Duration(0)
}
@ -108,13 +109,13 @@ func main() {
go func() {
for {
sig := <-sigs
log.Printf("Received signal: %s", sig)
logger.Printf("Received signal: %s", sig)
if sig == syscall.SIGUSR1 {
reloadCatalog = true
log.Infof("Enable catalog reloading")
logger.Infof("Enable catalog reloading")
} else {
reloadCatalog = false
log.Infof("Disable catalog reloading")
logger.Infof("Disable catalog reloading")
}
}
}()
@ -122,7 +123,7 @@ func main() {
if os.Getenv("PORT") != "" {
port = os.Getenv("PORT")
}
log.Infof("starting grpc server at :%s", port)
logger.Infof("starting grpc server at :%s", port)
run(port)
select {}
}
@ -130,14 +131,14 @@ func main() {
func run(port string) string {
l, err := net.Listen("tcp", fmt.Sprintf(":%s", port))
if err != nil {
log.Fatal(err)
logger.Fatal(err)
}
var srv *grpc.Server
if os.Getenv("DISABLE_STATS") == "" {
log.Info("Stats enabled.")
logger.Info("Stats enabled.")
srv = grpc.NewServer(grpc.StatsHandler(&ocgrpc.ServerHandler{}))
} else {
log.Info("Stats disabled.")
logger.Info("Stats disabled.")
srv = grpc.NewServer()
}
@ -152,7 +153,7 @@ func run(port string) string {
func initJaegerTracing() {
svcAddr := os.Getenv("JAEGER_SERVICE_ADDR")
if svcAddr == "" {
log.Info("jaeger initialization disabled.")
logger.Info("jaeger initialization disabled.")
return
}
// Register the Jaeger exporter to be able to retrieve
@ -164,19 +165,19 @@ func initJaegerTracing() {
},
})
if err != nil {
log.Fatal(err)
logger.Fatal(err)
}
trace.RegisterExporter(exporter)
log.Info("jaeger initialization completed.")
logger.Info("jaeger initialization completed.")
}
func initStats(exporter *stackdriver.Exporter) {
view.SetReportingPeriod(60 * time.Second)
view.RegisterExporter(exporter)
if err := view.Register(ocgrpc.DefaultServerViews...); err != nil {
log.Info("Error registering default server views")
logger.Info("Error registering default server views")
} else {
log.Info("Registered default server views")
logger.Info("Registered default server views")
}
}
@ -186,21 +187,21 @@ func initStackdriverTracing() {
for i := 1; i <= 3; i++ {
exporter, err := stackdriver.NewExporter(stackdriver.Options{})
if err != nil {
log.Warnf("failed to initialize Stackdriver exporter: %+v", err)
logger.Warnf("failed to initialize Stackdriver exporter: %+v", err)
} else {
trace.RegisterExporter(exporter)
trace.ApplyConfig(trace.Config{DefaultSampler: trace.AlwaysSample()})
log.Info("registered Stackdriver tracing")
logger.Info("registered Stackdriver tracing")
// Register the views to collect server stats.
initStats(exporter)
return
}
d := time.Second * 10 * time.Duration(i)
log.Infof("sleeping %v to retry initializing Stackdriver exporter", d)
logger.Infof("sleeping %v to retry initializing Stackdriver exporter", d)
time.Sleep(d)
}
log.Warn("could not initialize Stackdriver exporter after retrying, giving up")
logger.Warn("could not initialize Stackdriver exporter after retrying, giving up")
}
func initTracing() {
@ -218,21 +219,22 @@ func initProfiling(service, version string) {
// ProjectID must be set if not running on GCP.
// ProjectID: "my-project",
}); err != nil {
log.Warnf("failed to start profiler: %+v", err)
logger.Warnf("failed to start profiler: %+v", err)
} else {
log.Info("started Stackdriver profiler")
logger.Info("started Stackdriver profiler")
return
}
d := time.Second * 10 * time.Duration(i)
log.Infof("sleeping %v to retry initializing Stackdriver profiler", d)
logger.Infof("sleeping %v to retry initializing Stackdriver profiler", d)
time.Sleep(d)
}
log.Warn("could not initialize Stackdriver profiler after retrying, giving up")
logger.Warn("could not initialize Stackdriver profiler after retrying, giving up")
}
type productCatalog struct{}
func readCatalogFile(catalog *pb.ListProductsResponse) error {
func readCatalogFile(ctx context.Context, catalog *pb.ListProductsResponse) error {
log := logger.WithFields(getTraceLogFields(ctx))
catalogMutex.Lock()
defer catalogMutex.Unlock()
catalogJSON, err := ioutil.ReadFile("products.json")
@ -248,9 +250,9 @@ func readCatalogFile(catalog *pb.ListProductsResponse) error {
return nil
}
func parseCatalog() []*pb.Product {
func parseCatalog(ctx context.Context) []*pb.Product {
if reloadCatalog || len(cat.Products) == 0 {
err := readCatalogFile(&cat)
err := readCatalogFile(ctx, &cat)
if err != nil {
return []*pb.Product{}
}
@ -266,17 +268,17 @@ func (p *productCatalog) Watch(req *healthpb.HealthCheckRequest, ws healthpb.Hea
return status.Errorf(codes.Unimplemented, "health check via Watch not implemented")
}
func (p *productCatalog) ListProducts(context.Context, *pb.Empty) (*pb.ListProductsResponse, error) {
func (p *productCatalog) ListProducts(ctx context.Context, _ *pb.Empty) (*pb.ListProductsResponse, error) {
time.Sleep(extraLatency)
return &pb.ListProductsResponse{Products: parseCatalog()}, nil
return &pb.ListProductsResponse{Products: parseCatalog(ctx)}, nil
}
func (p *productCatalog) GetProduct(ctx context.Context, req *pb.GetProductRequest) (*pb.Product, error) {
time.Sleep(extraLatency)
var found *pb.Product
for i := 0; i < len(parseCatalog()); i++ {
if req.Id == parseCatalog()[i].Id {
found = parseCatalog()[i]
for i := 0; i < len(parseCatalog(ctx)); i++ {
if req.Id == parseCatalog(ctx)[i].Id {
found = parseCatalog(ctx)[i]
}
}
if found == nil {
@ -289,7 +291,7 @@ func (p *productCatalog) SearchProducts(ctx context.Context, req *pb.SearchProdu
time.Sleep(extraLatency)
// Intepret query as a substring match in name or description.
var ps []*pb.Product
for _, p := range parseCatalog() {
for _, p := range parseCatalog(ctx) {
if strings.Contains(strings.ToLower(p.Name), strings.ToLower(req.Query)) ||
strings.Contains(strings.ToLower(p.Description), strings.ToLower(req.Query)) {
ps = append(ps, p)
@ -297,3 +299,16 @@ func (p *productCatalog) SearchProducts(ctx context.Context, req *pb.SearchProdu
}
return &pb.SearchProductsResponse{Results: ps}, nil
}
func getTraceLogFields(ctx context.Context) logrus.Fields {
span := trace.FromContext(ctx)
if span == nil {
return logrus.Fields{}
}
traceID := span.SpanContext().TraceID
spanID := span.SpanContext().SpanID
return logrus.Fields{
"trace_id": hex.EncodeToString(traceID[:]),
"span_id": hex.EncodeToString(spanID[:]),
}
}

View file

@ -15,6 +15,7 @@
package main
import (
"encoding/hex"
"fmt"
"net"
"os"
@ -41,12 +42,12 @@ const (
defaultPort = "50051"
)
var log *logrus.Logger
var logger *logrus.Logger
func init() {
log = logrus.New()
log.Level = logrus.DebugLevel
log.Formatter = &logrus.JSONFormatter{
logger = logrus.New()
logger.Level = logrus.DebugLevel
logger.Formatter = &logrus.JSONFormatter{
FieldMap: logrus.FieldMap{
logrus.FieldKeyTime: "timestamp",
logrus.FieldKeyLevel: "severity",
@ -54,22 +55,22 @@ func init() {
},
TimestampFormat: time.RFC3339Nano,
}
log.Out = os.Stdout
logger.Out = os.Stdout
}
func main() {
if os.Getenv("DISABLE_TRACING") == "" {
log.Info("Tracing enabled.")
logger.Info("Tracing enabled.")
go initTracing()
} else {
log.Info("Tracing disabled.")
logger.Info("Tracing disabled.")
}
if os.Getenv("DISABLE_PROFILER") == "" {
log.Info("Profiling enabled.")
logger.Info("Profiling enabled.")
go initProfiling("shippingservice", "1.0.0")
} else {
log.Info("Profiling disabled.")
logger.Info("Profiling disabled.")
}
port := defaultPort
@ -80,26 +81,26 @@ func main() {
lis, err := net.Listen("tcp", port)
if err != nil {
log.Fatalf("failed to listen: %v", err)
logger.Fatalf("failed to listen: %v", err)
}
var srv *grpc.Server
if os.Getenv("DISABLE_STATS") == "" {
log.Info("Stats enabled.")
logger.Info("Stats enabled.")
srv = grpc.NewServer(grpc.StatsHandler(&ocgrpc.ServerHandler{}))
} else {
log.Info("Stats disabled.")
logger.Info("Stats disabled.")
srv = grpc.NewServer()
}
svc := &server{}
pb.RegisterShippingServiceServer(srv, svc)
healthpb.RegisterHealthServer(srv, svc)
log.Infof("Shipping Service listening on port %s", port)
logger.Infof("Shipping Service listening on port %s", port)
// Register reflection service on gRPC server.
reflection.Register(srv)
if err := srv.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
logger.Fatalf("failed to serve: %v", err)
}
}
@ -117,6 +118,7 @@ func (s *server) Watch(req *healthpb.HealthCheckRequest, ws healthpb.Health_Watc
// GetQuote produces a shipping quote (cost) in USD.
func (s *server) GetQuote(ctx context.Context, in *pb.GetQuoteRequest) (*pb.GetQuoteResponse, error) {
log := logger.WithFields(getTraceLogFields(ctx))
log.Info("[GetQuote] received request")
defer log.Info("[GetQuote] completed request")
@ -142,6 +144,7 @@ func (s *server) GetQuote(ctx context.Context, in *pb.GetQuoteRequest) (*pb.GetQ
// ShipOrder mocks that the requested items will be shipped.
// It supplies a tracking ID for notional lookup of shipment delivery status.
func (s *server) ShipOrder(ctx context.Context, in *pb.ShipOrderRequest) (*pb.ShipOrderResponse, error) {
log := logger.WithFields(getTraceLogFields(ctx))
log.Info("[ShipOrder] received request")
defer log.Info("[ShipOrder] completed request")
// 1. Create a Tracking ID
@ -157,7 +160,7 @@ func (s *server) ShipOrder(ctx context.Context, in *pb.ShipOrderRequest) (*pb.Sh
func initJaegerTracing() {
svcAddr := os.Getenv("JAEGER_SERVICE_ADDR")
if svcAddr == "" {
log.Info("jaeger initialization disabled.")
logger.Info("jaeger initialization disabled.")
return
}
@ -170,19 +173,19 @@ func initJaegerTracing() {
},
})
if err != nil {
log.Fatal(err)
logger.Fatal(err)
}
trace.RegisterExporter(exporter)
log.Info("jaeger initialization completed.")
logger.Info("jaeger initialization completed.")
}
func initStats(exporter *stackdriver.Exporter) {
view.SetReportingPeriod(60 * time.Second)
view.RegisterExporter(exporter)
if err := view.Register(ocgrpc.DefaultServerViews...); err != nil {
log.Warn("Error registering default server views")
logger.Warn("Error registering default server views")
} else {
log.Info("Registered default server views")
logger.Info("Registered default server views")
}
}
@ -192,21 +195,21 @@ func initStackdriverTracing() {
for i := 1; i <= 3; i++ {
exporter, err := stackdriver.NewExporter(stackdriver.Options{})
if err != nil {
log.Warnf("failed to initialize Stackdriver exporter: %+v", err)
logger.Warnf("failed to initialize Stackdriver exporter: %+v", err)
} else {
trace.RegisterExporter(exporter)
trace.ApplyConfig(trace.Config{DefaultSampler: trace.AlwaysSample()})
log.Info("registered Stackdriver tracing")
logger.Info("registered Stackdriver tracing")
// Register the views to collect server stats.
initStats(exporter)
return
}
d := time.Second * 10 * time.Duration(i)
log.Infof("sleeping %v to retry initializing Stackdriver exporter", d)
logger.Infof("sleeping %v to retry initializing Stackdriver exporter", d)
time.Sleep(d)
}
log.Warn("could not initialize Stackdriver exporter after retrying, giving up")
logger.Warn("could not initialize Stackdriver exporter after retrying, giving up")
}
func initTracing() {
@ -224,14 +227,27 @@ func initProfiling(service, version string) {
// ProjectID must be set if not running on GCP.
// ProjectID: "my-project",
}); err != nil {
log.Warnf("failed to start profiler: %+v", err)
logger.Warnf("failed to start profiler: %+v", err)
} else {
log.Info("started Stackdriver profiler")
logger.Info("started Stackdriver profiler")
return
}
d := time.Second * 10 * time.Duration(i)
log.Infof("sleeping %v to retry initializing Stackdriver profiler", d)
logger.Infof("sleeping %v to retry initializing Stackdriver profiler", d)
time.Sleep(d)
}
log.Warn("could not initialize Stackdriver profiler after retrying, giving up")
logger.Warn("could not initialize Stackdriver profiler after retrying, giving up")
}
func getTraceLogFields(ctx context.Context) logrus.Fields {
span := trace.FromContext(ctx)
if span == nil {
return logrus.Fields{}
}
traceID := span.SpanContext().TraceID
spanID := span.SpanContext().SpanID
return logrus.Fields{
"trace_id": hex.EncodeToString(traceID[:]),
"span_id": hex.EncodeToString(spanID[:]),
}
}