Add event support to execution subsystem

The implementation relies on nats.io

Signed-off-by: Kenfe-Mickael Laventure <mickael.laventure@gmail.com>
This commit is contained in:
Kenfe-Mickael Laventure 2016-12-11 11:07:32 -08:00
parent dd39b4dcf0
commit 2ef399b315
11 changed files with 441 additions and 125 deletions

View file

@ -3,9 +3,14 @@ package main
import (
"fmt"
"net"
"net/http"
"net/url"
"os"
"os/signal"
"path/filepath"
"runtime"
"strconv"
"strings"
"syscall"
"google.golang.org/grpc"
@ -15,8 +20,11 @@ import (
api "github.com/docker/containerd/api/execution"
"github.com/docker/containerd/execution"
"github.com/docker/containerd/execution/executors/oci"
// metrics "github.com/docker/go-metrics"
metrics "github.com/docker/go-metrics"
"github.com/urfave/cli"
"github.com/nats-io/go-nats"
stand "github.com/nats-io/nats-streaming-server/server"
)
func main() {
@ -57,6 +65,11 @@ high performance container runtime
Usage: "tcp address to serve metrics on",
Value: "127.0.0.1:7897",
},
cli.StringFlag{
Name: "events-address, e",
Usage: "nats address to serve events on",
Value: nats.DefaultURL,
},
}
app.Before = func(context *cli.Context) error {
if context.GlobalBool("debug") {
@ -66,11 +79,29 @@ high performance container runtime
}
app.Action = func(context *cli.Context) error {
signals := make(chan os.Signal, 2048)
signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT)
signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT, syscall.SIGUSR1)
// if address := context.GlobalString("metrics-address"); address != "" {
// go serveMetrics(address)
// }
if address := context.GlobalString("metrics-address"); address != "" {
go serveMetrics(address)
}
eventsURL, err := url.Parse(context.GlobalString("events-address"))
if err != nil {
return err
}
no := stand.DefaultNatsServerOptions
nOpts := &no
nOpts.NoSigs = true
parts := strings.Split(eventsURL.Host, ":")
nOpts.Host = parts[0]
if len(parts) == 2 {
nOpts.Port, err = strconv.Atoi(parts[1])
} else {
nOpts.Port = nats.DefaultPort
}
s := stand.RunServerWithOpts(nil, nOpts)
defer s.Shutdown()
path := context.GlobalString("socket")
if path == "" {
@ -84,10 +115,25 @@ high performance container runtime
var executor execution.Executor
switch context.GlobalString("runtime") {
case "runc":
executor = oci.New(context.GlobalString("root"))
executor, err = oci.New(context.GlobalString("root"))
if err != nil {
return err
}
}
execService, err := execution.New(executor)
// Start events listener
nc, err := nats.Connect(context.GlobalString("events-address"))
if err != nil {
return err
}
nec, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
if err != nil {
nc.Close()
return err
}
defer nec.Close()
execService, err := execution.New(executor, nec)
if err != nil {
return err
}
@ -98,6 +144,8 @@ high performance container runtime
for s := range signals {
switch s {
case syscall.SIGUSR1:
dumpStacks()
default:
logrus.WithField("signal", s).Info("containerd: stopping GRPC server")
server.Stop()
@ -122,13 +170,13 @@ func createUnixSocket(path string) (net.Listener, error) {
return net.Listen("unix", path)
}
// func serveMetrics(address string) {
// m := http.NewServeMux()
// m.Handle("/metrics", metrics.Handler())
// if err := http.ListenAndServe(address, m); err != nil {
// logrus.WithError(err).Fatal("containerd: metrics server failure")
// }
// }
func serveMetrics(address string) {
m := http.NewServeMux()
m.Handle("/metrics", metrics.Handler())
if err := http.ListenAndServe(address, m); err != nil {
logrus.WithError(err).Fatal("containerd: metrics server failure")
}
}
func serveGRPC(server *grpc.Server, l net.Listener) {
defer l.Close()
@ -137,3 +185,19 @@ func serveGRPC(server *grpc.Server, l net.Listener) {
logrus.WithError(err).Fatal("containerd: GRPC server failure")
}
}
// DumpStacks dumps the runtime stack.
func dumpStacks() {
var (
buf []byte
stackSize int
)
bufferLen := 16384
for stackSize == len(buf) {
buf = make([]byte, bufferLen)
stackSize = runtime.Stack(buf, true)
bufferLen *= 2
}
buf = buf[:stackSize]
logrus.Infof("=== BEGIN goroutine stack dump ===\n%s\n=== END goroutine stack dump ===", buf)
}

View file

@ -4,11 +4,12 @@ import (
"fmt"
"os"
"path/filepath"
"time"
gocontext "context"
"github.com/docker/containerd/api/execution"
execEvents "github.com/docker/containerd/execution"
"github.com/nats-io/go-nats"
"github.com/urfave/cli"
)
@ -56,6 +57,27 @@ var runCommand = cli.Command{
return err
}
// setup our event subscriber
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
return err
}
nec, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
if err != nil {
nc.Close()
return err
}
defer nec.Close()
evCh := make(chan *execEvents.ContainerExitEvent, 64)
sub, err := nec.Subscribe(execEvents.ContainersEventsSubjectSubscriber, func(e *execEvents.ContainerExitEvent) {
evCh <- e
})
if err != nil {
return err
}
defer sub.Unsubscribe()
tmpDir, err := getTempDir(id)
if err != nil {
return err
@ -87,18 +109,17 @@ var runCommand = cli.Command{
return err
}
// wait for it to die
var ec uint32
for {
gcr, err := executionService.Get(gocontext.Background(), &execution.GetContainerRequest{
ID: cr.Container.ID,
})
if err != nil {
return err
}
if gcr.Container.Status != execution.Status_RUNNING {
e, more := <-evCh
if !more {
break
}
if e.ID == cr.Container.ID && e.PID == cr.InitProcess.ID {
ec = e.StatusCode
break
}
time.Sleep(100 * time.Millisecond)
}
if _, err := executionService.Delete(gocontext.Background(), &execution.DeleteContainerRequest{
@ -110,6 +131,8 @@ var runCommand = cli.Command{
// Ensure we read all io
fwg.Wait()
os.Exit(int(ec))
return nil
},
}