Added GRPC Health service to Ad Service

Also added
1. timeout to getAd RPC call in frontend.
2. Async thread for stackdriver init.
This commit is contained in:
rahulpa 2018-08-22 16:54:42 -07:00
parent f5b00ea47d
commit d74a5ae497
4 changed files with 33 additions and 9 deletions

View file

@ -41,12 +41,12 @@ spec:
readinessProbe: readinessProbe:
tcpSocket: tcpSocket:
port: 9555 port: 9555
initialDelaySeconds: 15 initialDelaySeconds: 5
periodSeconds: 10 periodSeconds: 10
livenessProbe: livenessProbe:
tcpSocket: tcpSocket:
port: 9555 port: 9555
initialDelaySeconds: 15 initialDelaySeconds: 10
periodSeconds: 10 periodSeconds: 10
--- ---
apiVersion: v1 apiVersion: v1

View file

@ -25,7 +25,7 @@ repositories {
group = "adservice" group = "adservice"
version = "0.1.0-SNAPSHOT" // CURRENT_OPENCENSUS_VERSION version = "0.1.0-SNAPSHOT" // CURRENT_OPENCENSUS_VERSION
def opencensusVersion = "0.14.0" // LATEST_OPENCENSUS_RELEASE_VERSION def opencensusVersion = "0.15.0" // LATEST_OPENCENSUS_RELEASE_VERSION
def grpcVersion = "1.10.1" // CURRENT_GRPC_VERSION def grpcVersion = "1.10.1" // CURRENT_GRPC_VERSION
def prometheusVersion = "0.3.0" def prometheusVersion = "0.3.0"
@ -51,6 +51,7 @@ dependencies {
"io.grpc:grpc-protobuf:${grpcVersion}", "io.grpc:grpc-protobuf:${grpcVersion}",
"io.grpc:grpc-stub:${grpcVersion}", "io.grpc:grpc-stub:${grpcVersion}",
"io.grpc:grpc-netty:${grpcVersion}", "io.grpc:grpc-netty:${grpcVersion}",
"io.grpc:grpc-services:${grpcVersion}",
"io.prometheus:simpleclient_httpserver:${prometheusVersion}" "io.prometheus:simpleclient_httpserver:${prometheusVersion}"
runtime "io.opencensus:opencensus-impl:${opencensusVersion}", runtime "io.opencensus:opencensus-impl:${opencensusVersion}",

View file

@ -23,7 +23,9 @@ import hipstershop.Demo.AdResponse;
import io.grpc.Server; import io.grpc.Server;
import io.grpc.ServerBuilder; import io.grpc.ServerBuilder;
import io.grpc.StatusRuntimeException; import io.grpc.StatusRuntimeException;
import io.grpc.health.v1.HealthCheckResponse.ServingStatus;
import io.grpc.stub.StreamObserver; import io.grpc.stub.StreamObserver;
import io.grpc.services.*;
import io.opencensus.common.Duration; import io.opencensus.common.Duration;
import io.opencensus.common.Scope; import io.opencensus.common.Scope;
import io.opencensus.contrib.grpc.metrics.RpcViews; import io.opencensus.contrib.grpc.metrics.RpcViews;
@ -55,11 +57,15 @@ public class AdService {
private int MAX_ADS_TO_SERVE = 2; private int MAX_ADS_TO_SERVE = 2;
private Server server; private Server server;
private HealthStatusManager healthMgr;
static final AdService service = new AdService(); static final AdService service = new AdService();
private void start() throws IOException { private void start() throws IOException {
int port = Integer.parseInt(System.getenv("PORT")); int port = Integer.parseInt(System.getenv("PORT"));
server = ServerBuilder.forPort(port).addService(new AdServiceImpl()).build().start(); healthMgr = new HealthStatusManager();
server = ServerBuilder.forPort(port).addService(new AdServiceImpl())
.addService(healthMgr.getHealthService()).build().start();
logger.info("Ad Service started, listening on " + port); logger.info("Ad Service started, listening on " + port);
Runtime.getRuntime() Runtime.getRuntime()
.addShutdownHook( .addShutdownHook(
@ -72,10 +78,12 @@ public class AdService {
System.err.println("*** server shut down"); System.err.println("*** server shut down");
} }
}); });
healthMgr.setStatus("", ServingStatus.SERVING);
} }
private void stop() { private void stop() {
if (server != null) { if (server != null) {
healthMgr.clearStatus("");
server.shutdown(); server.shutdown();
} }
} }
@ -173,11 +181,8 @@ public class AdService {
logger.info("Default Ads initialized"); logger.info("Default Ads initialized");
} }
/** Main launches the server from the command line. */ public static void initStackdriver() {
public static void main(String[] args) throws IOException, InterruptedException { logger.info("Initialize StackDriver");
// Add final keyword to pass checkStyle.
initializeAds();
// Registers all RPC views. // Registers all RPC views.
RpcViews.registerAllViews(); RpcViews.registerAllViews();
@ -210,6 +215,20 @@ public class AdService {
} }
} }
} }
logger.info("StackDriver initialization complete.");
}
/** Main launches the server from the command line. */
public static void main(String[] args) throws IOException, InterruptedException {
// Add final keyword to pass checkStyle.
initializeAds();
new Thread( new Runnable() {
public void run(){
initStackdriver();
}
}).start();
// Register Prometheus exporters and export metrics to a Prometheus HTTPServer. // Register Prometheus exporters and export metrics to a Prometheus HTTPServer.
PrometheusStatsCollector.createAndRegister(); PrometheusStatsCollector.createAndRegister();

View file

@ -16,6 +16,7 @@ package main
import ( import (
"context" "context"
"time"
pb "github.com/GoogleCloudPlatform/microservices-demo/src/frontend/genproto" pb "github.com/GoogleCloudPlatform/microservices-demo/src/frontend/genproto"
@ -116,6 +117,9 @@ func (fe *frontendServer) getRecommendations(ctx context.Context, userID string,
} }
func (fe *frontendServer) getAd(ctx context.Context) ([]*pb.Ad, error) { func (fe *frontendServer) getAd(ctx context.Context) ([]*pb.Ad, error) {
ctx, cancel := context.WithTimeout(ctx, time.Millisecond*100)
defer cancel()
resp, err := pb.NewAdServiceClient(fe.adSvcConn).GetAds(ctx, &pb.AdRequest{ resp, err := pb.NewAdServiceClient(fe.adSvcConn).GetAds(ctx, &pb.AdRequest{
ContextKeys: nil, ContextKeys: nil,
}) })