Integrate NATS with event subsystem

Signed-off-by: Kenfe-Mickael Laventure <mickael.laventure@gmail.com>
This commit is contained in:
Kenfe-Mickael Laventure 2016-12-12 14:26:51 -08:00
parent 934940a96c
commit aa5ff88bbc
10 changed files with 165 additions and 68 deletions

View file

@ -13,14 +13,17 @@ import (
"strings" "strings"
"syscall" "syscall"
gocontext "golang.org/x/net/context"
"google.golang.org/grpc" "google.golang.org/grpc"
"github.com/Sirupsen/logrus"
"github.com/docker/containerd" "github.com/docker/containerd"
api "github.com/docker/containerd/api/execution" api "github.com/docker/containerd/api/execution"
"github.com/docker/containerd/events"
"github.com/docker/containerd/execution" "github.com/docker/containerd/execution"
"github.com/docker/containerd/execution/executors/oci" "github.com/docker/containerd/execution/executors/oci"
"github.com/docker/containerd/log"
metrics "github.com/docker/go-metrics" metrics "github.com/docker/go-metrics"
"github.com/sirupsen/logrus"
"github.com/urfave/cli" "github.com/urfave/cli"
"github.com/nats-io/go-nats" "github.com/nats-io/go-nats"
@ -85,22 +88,10 @@ high performance container runtime
go serveMetrics(address) go serveMetrics(address)
} }
eventsURL, err := url.Parse(context.GlobalString("events-address")) s, err := startNATSServer(context)
if err != nil { if err != nil {
return err return nil
} }
no := stand.DefaultNatsServerOptions
nOpts := &no
nOpts.NoSigs = true
parts := strings.Split(eventsURL.Host, ":")
nOpts.Host = parts[0]
if len(parts) == 2 {
nOpts.Port, err = strconv.Atoi(parts[1])
} else {
nOpts.Port = nats.DefaultPort
}
s := stand.RunServerWithOpts(nil, nOpts)
defer s.Shutdown() defer s.Shutdown()
path := context.GlobalString("socket") path := context.GlobalString("socket")
@ -121,24 +112,31 @@ high performance container runtime
} }
} }
// Start events listener // Get events publisher
nc, err := nats.Connect(context.GlobalString("events-address")) nec, err := getNATSPublisher(context)
if err != nil { if err != nil {
return err return err
} }
nec, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
if err != nil {
nc.Close()
return err
}
defer nec.Close() defer nec.Close()
execService, err := execution.New(executor, nec) execService, err := execution.New(executor)
if err != nil { if err != nil {
return err return err
} }
server := grpc.NewServer() // Intercept the GRPC call in order to populate the correct module path
interceptor := func(ctx gocontext.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
ctx = log.WithModule(ctx, "containerd")
switch info.Server.(type) {
case api.ExecutionServiceServer:
ctx = log.WithModule(ctx, "execution")
ctx = events.WithPoster(ctx, events.GetNATSPoster(nec))
default:
fmt.Println("Unknown type: %#v", info.Server)
}
return handler(ctx, req)
}
server := grpc.NewServer(grpc.UnaryInterceptor(interceptor))
api.RegisterExecutionServiceServer(server, execService) api.RegisterExecutionServiceServer(server, execService)
go serveGRPC(server, l) go serveGRPC(server, l)
@ -201,3 +199,48 @@ func dumpStacks() {
buf = buf[:stackSize] buf = buf[:stackSize]
logrus.Infof("=== BEGIN goroutine stack dump ===\n%s\n=== END goroutine stack dump ===", buf) logrus.Infof("=== BEGIN goroutine stack dump ===\n%s\n=== END goroutine stack dump ===", buf)
} }
func startNATSServer(context *cli.Context) (e *stand.StanServer, err error) {
eventsURL, err := url.Parse(context.GlobalString("events-address"))
if err != nil {
return nil, err
}
no := stand.DefaultNatsServerOptions
nOpts := &no
nOpts.NoSigs = true
parts := strings.Split(eventsURL.Host, ":")
nOpts.Host = parts[0]
if len(parts) == 2 {
nOpts.Port, err = strconv.Atoi(parts[1])
} else {
nOpts.Port = nats.DefaultPort
}
defer func() {
if r := recover(); r != nil {
e = nil
if _, ok := r.(error); !ok {
err = fmt.Errorf("failed to start NATS server: %v", r)
} else {
err = r.(error)
}
}
}()
s := stand.RunServerWithOpts(nil, nOpts)
return s, nil
}
func getNATSPublisher(context *cli.Context) (*nats.EncodedConn, error) {
nc, err := nats.Connect(context.GlobalString("events-address"))
if err != nil {
return nil, err
}
nec, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
if err != nil {
nc.Close()
return nil, err
}
return nec, nil
}

View file

@ -4,8 +4,8 @@ import (
"fmt" "fmt"
"os" "os"
"github.com/Sirupsen/logrus"
"github.com/docker/containerd" "github.com/docker/containerd"
"github.com/sirupsen/logrus"
"github.com/urfave/cli" "github.com/urfave/cli"
) )

31
events/nats.go Normal file
View file

@ -0,0 +1,31 @@
package events
import (
"context"
"strings"
"github.com/docker/containerd/log"
nats "github.com/nats-io/go-nats"
)
type natsPoster struct {
nec *nats.EncodedConn
}
func GetNATSPoster(nec *nats.EncodedConn) Poster {
return &natsPoster{nec}
}
func (p *natsPoster) Post(ctx context.Context, e Event) {
subject := strings.Replace(log.GetModulePath(ctx), "/", ".", -1)
topic := getTopic(ctx)
if topic != "" {
subject = strings.Join([]string{subject, topic}, ".")
}
if subject == "" {
log.GetLogger(ctx).WithField("event", e).Warn("unable to post event, subject is empty")
}
p.nec.Publish(subject, e)
}

View file

@ -3,8 +3,8 @@ package events
import ( import (
"context" "context"
"github.com/Sirupsen/logrus"
"github.com/docker/containerd/log" "github.com/docker/containerd/log"
"github.com/sirupsen/logrus"
) )
var ( var (
@ -13,13 +13,17 @@ var (
// Poster posts the event. // Poster posts the event.
type Poster interface { type Poster interface {
Post(event Event) Post(ctx context.Context, event Event)
} }
type posterKey struct{} type posterKey struct{}
func WithPoster(ctx context.Context, poster Poster) context.Context {
return context.WithValue(ctx, posterKey{}, poster)
}
func GetPoster(ctx context.Context) Poster { func GetPoster(ctx context.Context) Poster {
poster := ctx.Value(ctx) poster := ctx.Value(posterKey{})
if poster == nil { if poster == nil {
logger := log.G(ctx) logger := log.G(ctx)
tx, _ := getTx(ctx) tx, _ := getTx(ctx)
@ -27,7 +31,7 @@ func GetPoster(ctx context.Context) Poster {
// likely means we don't have a configured event system. Just return // likely means we don't have a configured event system. Just return
// the default poster, which merely logs events. // the default poster, which merely logs events.
return posterFunc(func(event Event) { return posterFunc(func(ctx context.Context, event Event) {
fields := logrus.Fields{"event": event} fields := logrus.Fields{"event": event}
if topic != "" { if topic != "" {
@ -48,8 +52,8 @@ func GetPoster(ctx context.Context) Poster {
return poster.(Poster) return poster.(Poster)
} }
type posterFunc func(event Event) type posterFunc func(ctx context.Context, event Event)
func (fn posterFunc) Post(event Event) { func (fn posterFunc) Post(ctx context.Context, event Event) {
fn(event) fn(ctx, event)
} }

View file

@ -18,6 +18,7 @@ func nexttxID() int64 {
} }
type transaction struct { type transaction struct {
ctx context.Context
id int64 id int64
parent *transaction // if nil, no parent transaction parent *transaction // if nil, no parent transaction
finish sync.Once finish sync.Once
@ -25,17 +26,18 @@ type transaction struct {
} }
// begin creates a sub-transaction. // begin creates a sub-transaction.
func (tx *transaction) begin(poster Poster) *transaction { func (tx *transaction) begin(ctx context.Context, poster Poster) *transaction {
id := nexttxID() id := nexttxID()
child := &transaction{ child := &transaction{
ctx: ctx,
id: id, id: id,
parent: tx, parent: tx,
start: time.Now(), start: time.Now(),
} }
// post the transaction started event // post the transaction started event
poster.Post(child.makeTransactionEvent("begin")) // tranactions are really just events poster.Post(ctx, child.makeTransactionEvent("begin")) // tranactions are really just events
return child return child
} }
@ -44,7 +46,7 @@ func (tx *transaction) begin(poster Poster) *transaction {
func (tx *transaction) commit(poster Poster) { func (tx *transaction) commit(poster Poster) {
tx.finish.Do(func() { tx.finish.Do(func() {
tx.end = time.Now() tx.end = time.Now()
poster.Post(tx.makeTransactionEvent("commit")) poster.Post(tx.ctx, tx.makeTransactionEvent("commit"))
}) })
} }
@ -53,7 +55,7 @@ func (tx *transaction) rollback(poster Poster, cause error) {
tx.end = time.Now() tx.end = time.Now()
event := tx.makeTransactionEvent("rollback") event := tx.makeTransactionEvent("rollback")
event = fmt.Sprintf("%s error=%q", event, cause.Error()) event = fmt.Sprintf("%s error=%q", event, cause.Error())
poster.Post(event) poster.Post(tx.ctx, event)
}) })
} }
@ -84,7 +86,7 @@ func getTx(ctx context.Context) (*transaction, bool) {
func WithTx(pctx context.Context) (ctx context.Context, commit func(), rollback func(err error)) { func WithTx(pctx context.Context) (ctx context.Context, commit func(), rollback func(err error)) {
poster := G(pctx) poster := G(pctx)
parent, _ := getTx(pctx) parent, _ := getTx(pctx)
tx := parent.begin(poster) tx := parent.begin(pctx, poster)
return context.WithValue(pctx, txKey{}, tx), func() { return context.WithValue(pctx, txKey{}, tx), func() {
tx.commit(poster) tx.commit(poster)

View file

@ -1,6 +1,9 @@
package execution package execution
import "time"
type ContainerEvent struct { type ContainerEvent struct {
Timestamp time.Time
ID string ID string
Action string Action string
} }
@ -16,6 +19,6 @@ const (
) )
const ( const (
containerEventsSubjectFormat = "containerd.execution.container.%s" containerEventsTopicFormat = "container.%s"
containerProcessEventsSubjectFormat = "containerd.execution.container.%s.%s" containerProcessEventsTopicFormat = "container.%s.%s"
) )

View file

@ -12,7 +12,9 @@ import (
"github.com/docker/containerd/execution" "github.com/docker/containerd/execution"
) )
var ErrRootEmpty = errors.New("oci: runtime root cannot be an empty string") var (
ErrRootEmpty = errors.New("oci: runtime root cannot be an empty string")
)
func New(root string) (*OCIRuntime, error) { func New(root string) (*OCIRuntime, error) {
err := SetSubreaper(1) err := SetSubreaper(1)

19
execution/log.go Normal file
View file

@ -0,0 +1,19 @@
package execution
import (
"context"
"github.com/docker/containerd/log"
"github.com/sirupsen/logrus"
)
var ctx context.Context
func GetLogger(module string) *logrus.Entry {
if ctx == nil {
ctx = log.WithModule(context.Background(), "execution")
}
subCtx := log.WithModule(ctx, module)
return log.GetLogger(subCtx)
}

View file

@ -3,10 +3,11 @@ package execution
import ( import (
"fmt" "fmt"
"syscall" "syscall"
"time"
api "github.com/docker/containerd/api/execution" api "github.com/docker/containerd/api/execution"
"github.com/docker/containerd/events"
google_protobuf "github.com/golang/protobuf/ptypes/empty" google_protobuf "github.com/golang/protobuf/ptypes/empty"
"github.com/nats-io/go-nats"
"github.com/opencontainers/runtime-spec/specs-go" "github.com/opencontainers/runtime-spec/specs-go"
"golang.org/x/net/context" "golang.org/x/net/context"
) )
@ -16,17 +17,15 @@ var (
ErrProcessNotFound = fmt.Errorf("Process not found") ErrProcessNotFound = fmt.Errorf("Process not found")
) )
func New(executor Executor, nec *nats.EncodedConn) (*Service, error) { func New(executor Executor) (*Service, error) {
return &Service{ return &Service{
executor: executor, executor: executor,
nec: nec,
}, nil }, nil
} }
type Service struct { type Service struct {
executor Executor executor Executor
supervisor *Supervisor supervisor *Supervisor
nec *nats.EncodedConn
} }
func (s *Service) Create(ctx context.Context, r *api.CreateContainerRequest) (*api.CreateContainerResponse, error) { func (s *Service) Create(ctx context.Context, r *api.CreateContainerRequest) (*api.CreateContainerResponse, error) {
@ -46,7 +45,7 @@ func (s *Service) Create(ctx context.Context, r *api.CreateContainerRequest) (*a
procs := container.Processes() procs := container.Processes()
initProcess := procs[0] initProcess := procs[0]
s.monitorProcess(container, initProcess) s.monitorProcess(ctx, container, initProcess)
return &api.CreateContainerResponse{ return &api.CreateContainerResponse{
Container: toGRPCContainer(container), Container: toGRPCContainer(container),
@ -145,7 +144,7 @@ func (s *Service) StartProcess(ctx context.Context, r *api.StartProcessRequest)
return nil, err return nil, err
} }
s.monitorProcess(container, process) s.monitorProcess(ctx, container, process)
return &api.StartProcessResponse{ return &api.StartProcessResponse{
Process: toGRPCProcess(process), Process: toGRPCProcess(process),
@ -205,25 +204,19 @@ var (
_ = (api.ExecutionServiceServer)(&Service{}) _ = (api.ExecutionServiceServer)(&Service{})
) )
func (s *Service) publishEvent(name string, v interface{}) { func (s *Service) publishEvent(ctx context.Context, topic string, v interface{}) {
if s.nec == nil { ctx = events.WithTopic(ctx, topic)
return events.GetPoster(ctx).Post(ctx, v)
} }
err := s.nec.Publish(name, v) func (s *Service) monitorProcess(ctx context.Context, container *Container, process Process) {
if err != nil {
// TODO: Use logrus?
fmt.Println("Failed to publish '%s:%#v': %v", name, v, err)
}
}
func (s *Service) monitorProcess(container *Container, process Process) {
go func() { go func() {
status, err := process.Wait() status, err := process.Wait()
if err == nil { if err == nil {
subject := GetContainerProcessEventSubject(container.ID(), process.ID()) topic := GetContainerProcessEventTopic(container.ID(), process.ID())
s.publishEvent(subject, &ContainerExitEvent{ s.publishEvent(ctx, topic, &ContainerExitEvent{
ContainerEvent: ContainerEvent{ ContainerEvent: ContainerEvent{
Timestamp: time.Now(),
ID: container.ID(), ID: container.ID(),
Action: "exit", Action: "exit",
}, },
@ -234,12 +227,12 @@ func (s *Service) monitorProcess(container *Container, process Process) {
}() }()
} }
func GetContainerEventSubject(id string) string { func GetContainerEventTopic(id string) string {
return fmt.Sprintf(containerEventsSubjectFormat, id) return fmt.Sprintf(containerEventsTopicFormat, id)
} }
func GetContainerProcessEventSubject(containerID, processID string) string { func GetContainerProcessEventTopic(containerID, processID string) string {
return fmt.Sprintf(containerProcessEventsSubjectFormat, containerID, processID) return fmt.Sprintf(containerProcessEventsTopicFormat, containerID, processID)
} }
func toGRPCContainer(container *Container) *api.Container { func toGRPCContainer(container *Container) *api.Container {

View file

@ -4,7 +4,7 @@ import (
"context" "context"
"path" "path"
"github.com/Sirupsen/logrus" "github.com/sirupsen/logrus"
) )
var ( var (