diff --git a/cmd/containerd-shim/main.go b/cmd/containerd-shim/main.go index a4ca473..a9b2296 100644 --- a/cmd/containerd-shim/main.go +++ b/cmd/containerd-shim/main.go @@ -13,7 +13,7 @@ import ( "github.com/Sirupsen/logrus" "github.com/docker/containerd" shimapi "github.com/docker/containerd/api/services/shim" - "github.com/docker/containerd/shim" + "github.com/docker/containerd/linux/shim" "github.com/docker/containerd/sys" "github.com/docker/containerd/utils" "github.com/urfave/cli" diff --git a/cmd/containerd/builtins.go b/cmd/containerd/builtins.go new file mode 100644 index 0000000..44de4b2 --- /dev/null +++ b/cmd/containerd/builtins.go @@ -0,0 +1,7 @@ +package main + +// register containerd builtins here +import ( + _ "github.com/docker/containerd/services/content" + _ "github.com/docker/containerd/services/execution" +) diff --git a/cmd/containerd/config.go b/cmd/containerd/config.go index 9d2ba05..c091bc6 100644 --- a/cmd/containerd/config.go +++ b/cmd/containerd/config.go @@ -18,10 +18,11 @@ func defaultConfig() *config { // loadConfig loads the config from the provided path func loadConfig(path string) error { - _, err := toml.DecodeFile(path, conf) + md, err := toml.DecodeFile(path, conf) if err != nil { return err } + conf.md = md return nil } @@ -38,6 +39,18 @@ type config struct { Debug debug `toml:"debug"` // Metrics and monitoring settings Metrics metricsConfig `toml:"metrics"` + // Plugins provides plugin specific configuration for the initialization of a plugin + Plugins map[string]toml.Primitive `toml:"plugins"` + + md toml.MetaData +} + +func (c *config) decodePlugin(name string, v interface{}) error { + p, ok := c.Plugins[name] + if !ok { + return nil + } + return c.md.PrimitiveDecode(p, v) } type grpcConfig struct { diff --git a/cmd/containerd/main.go b/cmd/containerd/main.go index ba19aad..efe1c01 100644 --- a/cmd/containerd/main.go +++ b/cmd/containerd/main.go @@ -16,12 +16,9 @@ import ( "github.com/Sirupsen/logrus" "github.com/docker/containerd" - contentapi "github.com/docker/containerd/api/services/content" api "github.com/docker/containerd/api/services/execution" "github.com/docker/containerd/content" - _ "github.com/docker/containerd/linux" "github.com/docker/containerd/log" - "github.com/docker/containerd/services/execution" "github.com/docker/containerd/utils" metrics "github.com/docker/go-metrics" "github.com/pkg/errors" @@ -66,6 +63,10 @@ func main() { Name: "socket,s", Usage: "socket path for containerd's GRPC server", }, + cli.StringFlag{ + Name: "root", + Usage: "containerd root directory", + }, } app.Before = before app.Action = func(context *cli.Context) error { @@ -74,37 +75,41 @@ func main() { // we don't miss any signals during boot signals := make(chan os.Signal, 2048) signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT, syscall.SIGUSR1) - log.G(global).Info("starting containerd boot...") - runtimes, err := loadRuntimes() - if err != nil { - return err - } - supervisor, err := containerd.NewSupervisor(log.WithModule(global, "execution"), runtimes) - if err != nil { + + // load all plugins into containerd + if err := containerd.Load(filepath.Join(conf.Root, "plugins")); err != nil { return err } // start debug and metrics APIs if err := serveDebugAPI(); err != nil { return err } - serveMetricsAPI() - - contentStore, err := resolveContentStore() + runtimes, err := loadRuntimes() + if err != nil { + return err + } + store, err := resolveContentStore() + if err != nil { + return err + } + services, err := loadServices(runtimes, store) if err != nil { return err } - // start the GRPC api with the execution service registered server := newGRPCServer() - - api.RegisterContainerServiceServer(server, execution.New(supervisor)) - contentapi.RegisterContentServer(server, content.NewService(contentStore)) - - // start the GRPC api with registered services + for _, service := range services { + if err := service.Register(server); err != nil { + return err + } + } if err := serveGRPC(server); err != nil { return err } + // start the prometheus metrics API for containerd + serveMetricsAPI() + log.G(global).Infof("containerd successfully booted in %fs", time.Now().Sub(start).Seconds()) return handleSignals(signals, server) } @@ -209,14 +214,28 @@ func resolveContentStore() (*content.Store, error) { } func loadRuntimes() (map[string]containerd.Runtime, error) { - o := map[string]containerd.Runtime{} - for _, name := range containerd.Runtimes() { - r, err := containerd.NewRuntime(name, conf.State) + o := make(map[string]containerd.Runtime) + for name, rr := range containerd.Registrations() { + if rr.Type != containerd.RuntimePlugin { + continue + } + log.G(global).Infof("loading runtime plugin %q...", name) + ic := &containerd.InitContext{ + Root: conf.Root, + State: conf.State, + Context: log.WithModule(global, fmt.Sprintf("runtime-%s", name)), + } + if rr.Config != nil { + if err := conf.decodePlugin(name, rr.Config); err != nil { + return nil, err + } + ic.Config = rr.Config + } + vr, err := rr.Init(ic) if err != nil { return nil, err } - o[name] = r - log.G(global).WithField("runtime", name).Info("load runtime") + o[name] = vr.(containerd.Runtime) } return o, nil } @@ -226,6 +245,35 @@ func newGRPCServer() *grpc.Server { return s } +func loadServices(runtimes map[string]containerd.Runtime, store *content.Store) ([]containerd.Service, error) { + var o []containerd.Service + for name, sr := range containerd.Registrations() { + if sr.Type != containerd.GRPCPlugin { + continue + } + log.G(global).Infof("loading grpc service plugin %q...", name) + ic := &containerd.InitContext{ + Root: conf.Root, + State: conf.State, + Context: log.WithModule(global, fmt.Sprintf("service-%s", name)), + Runtimes: runtimes, + Store: store, + } + if sr.Config != nil { + if err := conf.decodePlugin(name, sr.Config); err != nil { + return nil, err + } + ic.Config = sr.Config + } + vs, err := sr.Init(ic) + if err != nil { + return nil, err + } + o = append(o, vs.(containerd.Service)) + } + return o, nil +} + func serveGRPC(server *grpc.Server) error { path := conf.GRPC.Socket if path == "" { diff --git a/container.go b/container.go index be44e7b..d1283f0 100644 --- a/container.go +++ b/container.go @@ -32,3 +32,7 @@ type State interface { // Pid is the main process id for the container Pid() uint32 } + +type ContainerMonitor interface { + Monitor(context.Context, Container) error +} diff --git a/content/content.go b/content/content.go index b63ed1d..884e4e3 100644 --- a/content/content.go +++ b/content/content.go @@ -13,7 +13,7 @@ import ( var ( errNotFound = errors.New("content: not found") - bufPool = sync.Pool{ + BufPool = sync.Pool{ New: func() interface{} { return make([]byte, 1<<20) }, diff --git a/content/helpers.go b/content/helpers.go index 4209350..be1d943 100644 --- a/content/helpers.go +++ b/content/helpers.go @@ -33,8 +33,8 @@ func WriteBlob(ctx context.Context, cs Ingester, r io.Reader, ref string, size i return errors.Errorf("cannot resume already started write") } - buf := bufPool.Get().([]byte) - defer bufPool.Put(buf) + buf := BufPool.Get().([]byte) + defer BufPool.Put(buf) nn, err := io.CopyBuffer(cw, r, buf) if err != nil { diff --git a/content/store.go b/content/store.go index dfe1993..efc0b6f 100644 --- a/content/store.go +++ b/content/store.go @@ -221,8 +221,8 @@ func (s *Store) Writer(ctx context.Context, ref string) (Writer, error) { } defer fp.Close() - p := bufPool.Get().([]byte) - defer bufPool.Put(p) + p := BufPool.Get().([]byte) + defer BufPool.Put(p) offset, err = io.CopyBuffer(digester.Hash(), fp, p) if err != nil { diff --git a/design/plugins.md b/design/plugins.md new file mode 100644 index 0000000..c681880 --- /dev/null +++ b/design/plugins.md @@ -0,0 +1,66 @@ +# containerd Plugin Model + +With go 1.8 we now have dynamically loaded plugins via go packages. This seems to be a very easy and clean way to extent containerd. It does have the drawback of only working on Linux right now but there is where we see the most need for swapping out defaults. + +## core + +To be extended the core of containerd has to provide go packages and interfaces that can be extended with third-party packages. The core should be small but provide value for people building on top of containerd. + +The core should be comprised of the following: + +* Snapshotters - Provide way to manage the filesystems of containers and images on a host. +* Runtime - Provide a way to launch containers via the OCI runtime specification. +* Distribution - Provide a way to fetch and push content to external sources/registries. +* Content Store - Provide a generic content addressed store for bridging the gap between registries and snapshotters. +* Metadata - Provide a consistent way for the core and various subsystems to store metadata. +* Monitoring - Provide a way to monitor different subsystems, containers, and operations throughout the core with metrics and events. + +### Runtime + +The runtime code in the core provides API to create, list, and manage containers on the system. It provides a runtime type that is responsible for creating, deleting, and loading containers. + +```go +type Runtime interface { + Create(ctx context.Context, id string, opts CreateOpts) (Container, error) + Containers() ([]Container, error) + Delete(ctx context.Context, c Container) error +} +``` + +There is a common `Container` interface with common methods for interacting with the container as well as platform specific container interfaces. + + +```go +type Container interface { + Info() ContainerInfo + Start(context.Context) error + State(context.Context) (State, error) + Events(context.Context) (<-chan ContainerEvent, error) + Kill(context.Context) error +} + +type LinuxContainer interface { + Pause(context.Context) error + Resume(context.Context) error + Exec(context.Context, ExecOpts) (uint32, error) + Signal(c context.Context, pid uint32, s os.Signal) error +} + +type WindowsContainer interface { + Exec(context.Context, ExecOpts) (uint32, error) + Signal(c context.Context, pid uint32, s os.Signal) error +} +``` + +### Monitoring + +The monitoring subsystem is a way to collect events and metrics from various subsystems. +With the monitoring subsystem you can monitor various types, subsystems, and objects within the core. +This can be use to collect metrics for containers and monitor OOM events when supported. +An example of this is a prometheus monitor that exports container metrics such as cpu, memory, io, and network information. + +```go +type ContainerMonitor interface { + Monitor(context.Context, Container) error +} +``` diff --git a/event.go b/event.go index 822892a..6b9f31b 100644 --- a/event.go +++ b/event.go @@ -39,32 +39,3 @@ type Event struct { Pid uint32 ExitStatus uint32 } - -type EventWriter interface { - Write(*Event) error -} - -type EventFilter func(*Event) bool - -// NewFilterEventWriter returns an EventWriter that runs the provided filters on the events. -// If all the filters pass then the event is written to the wrapped EventWriter -func NewFilterEventWriter(w EventWriter, filters ...EventFilter) EventWriter { - return &filteredEventWriter{ - w: w, - filters: filters, - } -} - -type filteredEventWriter struct { - w EventWriter - filters []EventFilter -} - -func (f *filteredEventWriter) Write(e *Event) error { - for _, filter := range f.filters { - if !filter(e) { - return nil - } - } - return f.w.Write(e) -} diff --git a/linux/Makefile b/linux/Makefile new file mode 100644 index 0000000..9c0d447 --- /dev/null +++ b/linux/Makefile @@ -0,0 +1,7 @@ + +all: + go build -buildmode=plugin -o shim-linux-amd64.so + +install: + mkdir -p /var/lib/containerd/plugins + cp shim-linux-amd64.so /var/lib/containerd/plugins/ diff --git a/linux/container.go b/linux/container.go index 6e4ae9f..17ad0ba 100644 --- a/linux/container.go +++ b/linux/container.go @@ -1,4 +1,4 @@ -package linux +package main import ( "github.com/docker/containerd" diff --git a/linux/runtime.go b/linux/runtime.go index 99bbb78..85faa64 100644 --- a/linux/runtime.go +++ b/linux/runtime.go @@ -1,4 +1,4 @@ -package linux +package main import ( "bytes" @@ -24,16 +24,19 @@ const ( ) func init() { - containerd.RegisterRuntime(runtimeName, New) + containerd.Register(runtimeName, &containerd.Registration{ + Type: containerd.RuntimePlugin, + Init: New, + }) } -func New(root string) (containerd.Runtime, error) { - if err := os.MkdirAll(root, 0700); err != nil { +func New(ic *containerd.InitContext) (interface{}, error) { + if err := os.MkdirAll(ic.State, 0700); err != nil { return nil, err } - c, cancel := context.WithCancel(context.Background()) + c, cancel := context.WithCancel(ic.Context) return &Runtime{ - root: root, + root: ic.State, events: make(chan *containerd.Event, 2048), eventsContext: c, eventsCancel: cancel, @@ -110,7 +113,7 @@ func (r *Runtime) Containers() ([]containerd.Container, error) { if !fi.IsDir() { continue } - c, err := r.loadContainer(fi.Name()) + c, err := r.loadContainer(filepath.Join(r.root, fi.Name())) if err != nil { return nil, err } diff --git a/linux/shim.go b/linux/shim.go index 62cb032..9a74bb5 100644 --- a/linux/shim.go +++ b/linux/shim.go @@ -1,4 +1,4 @@ -package linux +package main import ( "fmt" diff --git a/shim/exec.go b/linux/shim/exec.go similarity index 100% rename from shim/exec.go rename to linux/shim/exec.go diff --git a/shim/init.go b/linux/shim/init.go similarity index 100% rename from shim/init.go rename to linux/shim/init.go diff --git a/shim/io.go b/linux/shim/io.go similarity index 100% rename from shim/io.go rename to linux/shim/io.go diff --git a/shim/process.go b/linux/shim/process.go similarity index 100% rename from shim/process.go rename to linux/shim/process.go diff --git a/shim/service.go b/linux/shim/service.go similarity index 100% rename from shim/service.go rename to linux/shim/service.go diff --git a/plugin.go b/plugin.go new file mode 100644 index 0000000..7397b39 --- /dev/null +++ b/plugin.go @@ -0,0 +1,111 @@ +package containerd + +import ( + "fmt" + "path/filepath" + "plugin" + "runtime" + "sync" + + "github.com/docker/containerd/content" + + "golang.org/x/net/context" + "google.golang.org/grpc" +) + +type PluginType int + +const ( + RuntimePlugin PluginType = iota + 1 + GRPCPlugin +) + +type Registration struct { + Type PluginType + Config interface{} + Init func(*InitContext) (interface{}, error) +} + +type InitContext struct { + Root string + State string + Runtimes map[string]Runtime + Store *content.Store + Config interface{} + Context context.Context +} + +type Service interface { + Register(*grpc.Server) error +} + +var register = struct { + sync.Mutex + r map[string]*Registration +}{ + r: make(map[string]*Registration), +} + +// Load loads all plugins at the provided that into containerd +func Load(path string) (err error) { + defer func() { + if v := recover(); v != nil { + rerr, ok := v.(error) + if !ok { + panic(v) + } + err = rerr + } + }() + return loadPlugins(path) +} + +func Register(name string, r *Registration) error { + register.Lock() + defer register.Unlock() + if _, ok := register.r[name]; ok { + return fmt.Errorf("plugin already registered as %q", name) + } + register.r[name] = r + return nil +} + +func Registrations() map[string]*Registration { + return register.r +} + +// loadPlugins loads all plugins for the OS and Arch +// that containerd is built for inside the provided path +func loadPlugins(path string) error { + abs, err := filepath.Abs(path) + if err != nil { + return err + } + pattern := filepath.Join(abs, fmt.Sprintf( + "*-%s-%s.%s", + runtime.GOOS, + runtime.GOARCH, + getLibExt(), + )) + libs, err := filepath.Glob(pattern) + if err != nil { + return err + } + for _, lib := range libs { + if _, err := plugin.Open(lib); err != nil { + return err + } + } + return nil +} + +// getLibExt returns a platform specific lib extension for +// the platform that containerd is running on +func getLibExt() string { + switch runtime.GOOS { + case "windows": + return "dll" + default: + return "so" + } +} diff --git a/runtime.go b/runtime.go index 1ab8d7d..6683329 100644 --- a/runtime.go +++ b/runtime.go @@ -1,53 +1,6 @@ package containerd -import ( - "fmt" - "sync" - - "golang.org/x/net/context" -) - -// NewRuntimeFunc is the runtime's constructor -type NewRuntimeFunc func(root string) (Runtime, error) - -var runtimeRegistration = struct { - mu sync.Mutex - runtimes map[string]NewRuntimeFunc -}{ - runtimes: make(map[string]NewRuntimeFunc), -} - -// RegisterRuntime is not external packages registers Runtimes for use with containerd -func RegisterRuntime(name string, f NewRuntimeFunc) { - runtimeRegistration.mu.Lock() - defer runtimeRegistration.mu.Unlock() - if _, ok := runtimeRegistration.runtimes[name]; ok { - panic(fmt.Errorf("runtime already registered as %q", name)) - } - runtimeRegistration.runtimes[name] = f -} - -// Runtimes returns a slice of all registered runtime names for containerd -func Runtimes() (o []string) { - runtimeRegistration.mu.Lock() - defer runtimeRegistration.mu.Unlock() - - for k := range runtimeRegistration.runtimes { - o = append(o, k) - } - return o -} - -// NewRuntime calls the runtime's constructor with the provided root -func NewRuntime(name, root string) (Runtime, error) { - runtimeRegistration.mu.Lock() - defer runtimeRegistration.mu.Unlock() - f, ok := runtimeRegistration.runtimes[name] - if !ok { - return nil, ErrRuntimeNotExist - } - return f(root) -} +import "golang.org/x/net/context" type IO struct { Stdin string @@ -73,7 +26,7 @@ type Runtime interface { // Containers returns all the current containers for the runtime Containers() ([]Container, error) // Delete removes the container in the runtime - Delete(ctx context.Context, c Container) error + Delete(context.Context, Container) error // Events returns events for the runtime and all containers created by the runtime Events(context.Context) <-chan *Event } diff --git a/content/service.go b/services/content/service.go similarity index 77% rename from content/service.go rename to services/content/service.go index a3c8184..ece427a 100644 --- a/content/service.go +++ b/services/content/service.go @@ -4,23 +4,39 @@ import ( "errors" "io" - contentapi "github.com/docker/containerd/api/services/content" + "github.com/docker/containerd" + api "github.com/docker/containerd/api/services/content" + "github.com/docker/containerd/content" "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/codes" ) type Service struct { - store *Store + store *content.Store } -var _ contentapi.ContentServer = &Service{} +var _ api.ContentServer = &Service{} -func NewService(store *Store) contentapi.ContentServer { - return &Service{store: store} +func init() { + containerd.Register("content-grpc", &containerd.Registration{ + Type: containerd.GRPCPlugin, + Init: NewService, + }) } -func (s *Service) Info(ctx context.Context, req *contentapi.InfoRequest) (*contentapi.InfoResponse, error) { +func NewService(ic *containerd.InitContext) (interface{}, error) { + return &Service{ + store: ic.Store, + }, nil +} + +func (s *Service) Register(server *grpc.Server) error { + api.RegisterContentServer(server, s) + return nil +} + +func (s *Service) Info(ctx context.Context, req *api.InfoRequest) (*api.InfoResponse, error) { if err := req.Digest.Validate(); err != nil { return nil, grpc.Errorf(codes.InvalidArgument, "%q failed validation", req.Digest) } @@ -30,14 +46,14 @@ func (s *Service) Info(ctx context.Context, req *contentapi.InfoRequest) (*conte return nil, maybeNotFoundGRPC(err, req.Digest.String()) } - return &contentapi.InfoResponse{ + return &api.InfoResponse{ Digest: req.Digest, Size_: bi.Size, CommittedAt: bi.CommittedAt, }, nil } -func (s *Service) Read(req *contentapi.ReadRequest, session contentapi.Content_ReadServer) error { +func (s *Service) Read(req *api.ReadRequest, session api.Content_ReadServer) error { if err := req.Digest.Validate(); err != nil { return grpc.Errorf(codes.InvalidArgument, "%v: %v", req.Digest, err) } @@ -68,9 +84,9 @@ func (s *Service) Read(req *contentapi.ReadRequest, session contentapi.Content_R // TODO(stevvooe): Using the global buffer pool. At 32KB, it is probably // little inefficient for work over a fast network. We can tune this later. - p = bufPool.Get().([]byte) + p = content.BufPool.Get().([]byte) ) - defer bufPool.Put(p) + defer content.BufPool.Put(p) if offset < 0 { offset = 0 @@ -95,11 +111,11 @@ func (s *Service) Read(req *contentapi.ReadRequest, session contentapi.Content_R type readResponseWriter struct { offset int64 - session contentapi.Content_ReadServer + session api.Content_ReadServer } func (rw *readResponseWriter) Write(p []byte) (n int, err error) { - if err := rw.session.Send(&contentapi.ReadResponse{ + if err := rw.session.Send(&api.ReadResponse{ Offset: rw.offset, Data: p, }); err != nil { @@ -110,14 +126,14 @@ func (rw *readResponseWriter) Write(p []byte) (n int, err error) { return len(p), nil } -func (s *Service) Write(session contentapi.Content_WriteServer) (err error) { +func (s *Service) Write(session api.Content_WriteServer) (err error) { var ( ref string - msg contentapi.WriteResponse - req *contentapi.WriteRequest + msg api.WriteResponse + req *api.WriteRequest ) - defer func(msg *contentapi.WriteResponse) { + defer func(msg *api.WriteResponse) { // pump through the last message if no error was encountered if err != nil { return @@ -153,7 +169,7 @@ func (s *Service) Write(session contentapi.Content_WriteServer) (err error) { // cost of the move when they collide. if req.ExpectedDigest != "" { if _, err := s.store.Info(req.ExpectedDigest); err != nil { - if !IsNotFound(err) { + if !content.IsNotFound(err) { return err } @@ -172,9 +188,9 @@ func (s *Service) Write(session contentapi.Content_WriteServer) (err error) { msg.UpdatedAt = ws.UpdatedAt switch req.Action { - case contentapi.WriteActionStat: + case api.WriteActionStat: msg.Digest = wr.Digest() - case contentapi.WriteActionWrite, contentapi.WriteActionCommit: + case api.WriteActionWrite, api.WriteActionCommit: if req.Offset > 0 { // validate the offset if provided if req.Offset != ws.Offset { @@ -200,10 +216,10 @@ func (s *Service) Write(session contentapi.Content_WriteServer) (err error) { msg.Offset += int64(n) } - if req.Action == contentapi.WriteActionCommit { + if req.Action == api.WriteActionCommit { return wr.Commit(req.ExpectedSize, req.ExpectedDigest) } - case contentapi.WriteActionAbort: + case api.WriteActionAbort: return s.store.Abort(ref) } @@ -220,12 +236,12 @@ func (s *Service) Write(session contentapi.Content_WriteServer) (err error) { return nil } -func (s *Service) Status(*contentapi.StatusRequest, contentapi.Content_StatusServer) error { +func (s *Service) Status(*api.StatusRequest, api.Content_StatusServer) error { return grpc.Errorf(codes.Unimplemented, "not implemented") } func maybeNotFoundGRPC(err error, id string) error { - if IsNotFound(err) { + if content.IsNotFound(err) { return grpc.Errorf(codes.NotFound, "%v: not found", id) } diff --git a/collector.go b/services/execution/collector.go similarity index 75% rename from collector.go rename to services/execution/collector.go index e69e227..b8ed395 100644 --- a/collector.go +++ b/services/execution/collector.go @@ -1,15 +1,17 @@ -package containerd +package execution import ( "sync" + "github.com/docker/containerd" + "golang.org/x/net/context" ) -func newCollector(ctx context.Context, runtimes map[string]Runtime) (*collector, error) { +func newCollector(ctx context.Context, runtimes map[string]containerd.Runtime) (*collector, error) { c := &collector{ context: ctx, - ch: make(chan *Event, 2048), + ch: make(chan *containerd.Event, 2048), eventClients: make(map[*eventClient]struct{}), } for _, r := range runtimes { @@ -27,7 +29,7 @@ func newCollector(ctx context.Context, runtimes map[string]Runtime) (*collector, type eventClient struct { eCh chan error - w EventWriter + w *grpcEventWriter } type collector struct { @@ -35,12 +37,12 @@ type collector struct { wg sync.WaitGroup context context.Context - ch chan *Event + ch chan *containerd.Event eventClients map[*eventClient]struct{} } // collect collects events from the provided runtime -func (c *collector) collect(r Runtime) error { +func (c *collector) collect(r containerd.Runtime) error { c.wg.Add(1) go func() { defer c.wg.Done() @@ -51,12 +53,7 @@ func (c *collector) collect(r Runtime) error { return nil } -// Forward forwards all events from the collector to the EventWriters -// -// It forwards events until the channels are closed or the EventWriter -// returns an error -// This is a blocking call -func (c *collector) forward(w EventWriter) error { +func (c *collector) forward(w *grpcEventWriter) error { client := &eventClient{ w: w, eCh: make(chan error, 1), diff --git a/services/execution/service.go b/services/execution/service.go index 84c07b4..7661b24 100644 --- a/services/execution/service.go +++ b/services/execution/service.go @@ -1,11 +1,14 @@ package execution import ( + "sync" + "github.com/docker/containerd" api "github.com/docker/containerd/api/services/execution" "github.com/docker/containerd/api/types/container" google_protobuf "github.com/golang/protobuf/ptypes/empty" "golang.org/x/net/context" + "google.golang.org/grpc" ) var ( @@ -13,15 +16,46 @@ var ( empty = &google_protobuf.Empty{} ) -// New creates a new GRPC service for the ContainerService -func New(s *containerd.Supervisor) *Service { - return &Service{ - s: s, +func init() { + containerd.Register("runtime-grpc", &containerd.Registration{ + Type: containerd.GRPCPlugin, + Init: New, + }) +} + +func New(ic *containerd.InitContext) (interface{}, error) { + c, err := newCollector(ic.Context, ic.Runtimes) + if err != nil { + return nil, err } + return &Service{ + runtimes: ic.Runtimes, + containers: make(map[string]containerd.Container), + collector: c, + }, nil } type Service struct { - s *containerd.Supervisor + mu sync.Mutex + + runtimes map[string]containerd.Runtime + containers map[string]containerd.Container + collector *collector +} + +func (s *Service) Register(server *grpc.Server) error { + api.RegisterContainerServiceServer(server, s) + // load all containers + for _, r := range s.runtimes { + containers, err := r.Containers() + if err != nil { + return err + } + for _, c := range containers { + s.containers[c.Info().ID] = c + } + } + return nil } func (s *Service) Create(ctx context.Context, r *api.CreateRequest) (*api.CreateResponse, error) { @@ -41,13 +75,28 @@ func (s *Service) Create(ctx context.Context, r *api.CreateRequest) (*api.Create Options: m.Options, }) } - c, err := s.s.Create(ctx, r.ID, r.Runtime, opts) + runtime, err := s.getRuntime(r.Runtime) if err != nil { return nil, err } + s.mu.Lock() + if _, ok := s.containers[r.ID]; ok { + s.mu.Unlock() + return nil, containerd.ErrContainerExists + } + c, err := runtime.Create(ctx, r.ID, opts) + if err != nil { + s.mu.Unlock() + return nil, err + } + s.containers[r.ID] = c + s.mu.Unlock() state, err := c.State(ctx) if err != nil { - s.s.Delete(ctx, r.ID) + s.mu.Lock() + delete(s.containers, r.ID) + runtime.Delete(ctx, c) + s.mu.Unlock() return nil, err } return &api.CreateResponse{ @@ -57,7 +106,7 @@ func (s *Service) Create(ctx context.Context, r *api.CreateRequest) (*api.Create } func (s *Service) Start(ctx context.Context, r *api.StartRequest) (*google_protobuf.Empty, error) { - c, err := s.s.Get(r.ID) + c, err := s.getContainer(r.ID) if err != nil { return nil, err } @@ -68,7 +117,15 @@ func (s *Service) Start(ctx context.Context, r *api.StartRequest) (*google_proto } func (s *Service) Delete(ctx context.Context, r *api.DeleteRequest) (*google_protobuf.Empty, error) { - if err := s.s.Delete(ctx, r.ID); err != nil { + c, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + runtime, err := s.getRuntime(c.Info().Runtime) + if err != nil { + return nil, err + } + if err := runtime.Delete(ctx, c); err != nil { return nil, err } return empty, nil @@ -76,7 +133,7 @@ func (s *Service) Delete(ctx context.Context, r *api.DeleteRequest) (*google_pro func (s *Service) List(ctx context.Context, r *api.ListRequest) (*api.ListResponse, error) { resp := &api.ListResponse{} - for _, c := range s.s.Containers() { + for _, c := range s.containers { state, err := c.State(ctx) if err != nil { return nil, err @@ -105,7 +162,25 @@ func (s *Service) Events(r *api.EventsRequest, server api.ContainerService_Event w := &grpcEventWriter{ server: server, } - return s.s.ForwardEvents(w) + return s.collector.forward(w) +} + +func (s *Service) getContainer(id string) (containerd.Container, error) { + s.mu.Lock() + c, ok := s.containers[id] + s.mu.Unlock() + if !ok { + return nil, containerd.ErrContainerNotExist + } + return c, nil +} + +func (s *Service) getRuntime(name string) (containerd.Runtime, error) { + runtime, ok := s.runtimes[name] + if !ok { + return nil, containerd.ErrUnknownRuntime + } + return runtime, nil } type grpcEventWriter struct { diff --git a/supervisor.go b/supervisor.go deleted file mode 100644 index dedd534..0000000 --- a/supervisor.go +++ /dev/null @@ -1,100 +0,0 @@ -package containerd - -import ( - "sync" - - "golang.org/x/net/context" -) - -func NewSupervisor(ctx context.Context, runtimes map[string]Runtime) (*Supervisor, error) { - c, err := newCollector(ctx, runtimes) - if err != nil { - return nil, err - } - s := &Supervisor{ - containers: make(map[string]Container), - runtimes: runtimes, - collector: c, - } - for _, r := range runtimes { - containers, err := r.Containers() - if err != nil { - return nil, err - } - for _, c := range containers { - s.containers[c.Info().ID] = c - } - } - return s, nil -} - -// Supervisor supervises containers and events from multiple runtimes -type Supervisor struct { - mu sync.Mutex - - containers map[string]Container - runtimes map[string]Runtime - collector *collector -} - -// ForwardEvents is a blocking method that will forward all events from the supervisor -// to the EventWriter provided by the caller -func (s *Supervisor) ForwardEvents(w EventWriter) error { - return s.collector.forward(w) -} - -// Create creates a new container with the provided runtime -func (s *Supervisor) Create(ctx context.Context, id, runtime string, opts CreateOpts) (Container, error) { - r, ok := s.runtimes[runtime] - if !ok { - return nil, ErrUnknownRuntime - } - // check to make sure the container's id is unique across the entire system - s.mu.Lock() - defer s.mu.Unlock() - if _, ok := s.containers[id]; ok { - return nil, ErrContainerExists - } - c, err := r.Create(ctx, id, opts) - if err != nil { - return nil, err - } - s.containers[c.Info().ID] = c - return c, nil -} - -// Delete deletes the container -func (s *Supervisor) Delete(ctx context.Context, id string) error { - s.mu.Lock() - defer s.mu.Unlock() - c, ok := s.containers[id] - if !ok { - return ErrContainerNotExist - } - err := s.runtimes[c.Info().Runtime].Delete(ctx, c) - if err != nil { - return err - } - delete(s.containers, id) - return nil -} - -// Containers returns all the containers for the supervisor -func (s *Supervisor) Containers() (o []Container) { - s.mu.Lock() - defer s.mu.Unlock() - for _, c := range s.containers { - o = append(o, c) - } - return o -} - -func (s *Supervisor) Get(id string) (Container, error) { - s.mu.Lock() - c, ok := s.containers[id] - s.mu.Unlock() - if !ok { - return nil, ErrContainerNotExist - } - return c, nil -}