Load runtimes dynamically via go1.8 plugins

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>

Add registration for more subsystems via plugins

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>

Move content service to separate package

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
Michael Crosby 2017-02-16 14:45:13 -08:00
parent b7805198b1
commit 3101be93bc
25 changed files with 435 additions and 264 deletions

View file

@ -13,7 +13,7 @@ import (
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
"github.com/docker/containerd" "github.com/docker/containerd"
shimapi "github.com/docker/containerd/api/services/shim" shimapi "github.com/docker/containerd/api/services/shim"
"github.com/docker/containerd/shim" "github.com/docker/containerd/linux/shim"
"github.com/docker/containerd/sys" "github.com/docker/containerd/sys"
"github.com/docker/containerd/utils" "github.com/docker/containerd/utils"
"github.com/urfave/cli" "github.com/urfave/cli"

View file

@ -0,0 +1,7 @@
package main
// register containerd builtins here
import (
_ "github.com/docker/containerd/services/content"
_ "github.com/docker/containerd/services/execution"
)

View file

@ -18,10 +18,11 @@ func defaultConfig() *config {
// loadConfig loads the config from the provided path // loadConfig loads the config from the provided path
func loadConfig(path string) error { func loadConfig(path string) error {
_, err := toml.DecodeFile(path, conf) md, err := toml.DecodeFile(path, conf)
if err != nil { if err != nil {
return err return err
} }
conf.md = md
return nil return nil
} }
@ -38,6 +39,18 @@ type config struct {
Debug debug `toml:"debug"` Debug debug `toml:"debug"`
// Metrics and monitoring settings // Metrics and monitoring settings
Metrics metricsConfig `toml:"metrics"` Metrics metricsConfig `toml:"metrics"`
// Plugins provides plugin specific configuration for the initialization of a plugin
Plugins map[string]toml.Primitive `toml:"plugins"`
md toml.MetaData
}
func (c *config) decodePlugin(name string, v interface{}) error {
p, ok := c.Plugins[name]
if !ok {
return nil
}
return c.md.PrimitiveDecode(p, v)
} }
type grpcConfig struct { type grpcConfig struct {

View file

@ -16,12 +16,9 @@ import (
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
"github.com/docker/containerd" "github.com/docker/containerd"
contentapi "github.com/docker/containerd/api/services/content"
api "github.com/docker/containerd/api/services/execution" api "github.com/docker/containerd/api/services/execution"
"github.com/docker/containerd/content" "github.com/docker/containerd/content"
_ "github.com/docker/containerd/linux"
"github.com/docker/containerd/log" "github.com/docker/containerd/log"
"github.com/docker/containerd/services/execution"
"github.com/docker/containerd/utils" "github.com/docker/containerd/utils"
metrics "github.com/docker/go-metrics" metrics "github.com/docker/go-metrics"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -66,6 +63,10 @@ func main() {
Name: "socket,s", Name: "socket,s",
Usage: "socket path for containerd's GRPC server", Usage: "socket path for containerd's GRPC server",
}, },
cli.StringFlag{
Name: "root",
Usage: "containerd root directory",
},
} }
app.Before = before app.Before = before
app.Action = func(context *cli.Context) error { app.Action = func(context *cli.Context) error {
@ -74,37 +75,41 @@ func main() {
// we don't miss any signals during boot // we don't miss any signals during boot
signals := make(chan os.Signal, 2048) signals := make(chan os.Signal, 2048)
signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT, syscall.SIGUSR1) signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT, syscall.SIGUSR1)
log.G(global).Info("starting containerd boot...") log.G(global).Info("starting containerd boot...")
runtimes, err := loadRuntimes()
if err != nil { // load all plugins into containerd
return err if err := containerd.Load(filepath.Join(conf.Root, "plugins")); err != nil {
}
supervisor, err := containerd.NewSupervisor(log.WithModule(global, "execution"), runtimes)
if err != nil {
return err return err
} }
// start debug and metrics APIs // start debug and metrics APIs
if err := serveDebugAPI(); err != nil { if err := serveDebugAPI(); err != nil {
return err return err
} }
serveMetricsAPI() runtimes, err := loadRuntimes()
if err != nil {
contentStore, err := resolveContentStore() return err
}
store, err := resolveContentStore()
if err != nil {
return err
}
services, err := loadServices(runtimes, store)
if err != nil { if err != nil {
return err return err
} }
// start the GRPC api with the execution service registered // start the GRPC api with the execution service registered
server := newGRPCServer() server := newGRPCServer()
for _, service := range services {
api.RegisterContainerServiceServer(server, execution.New(supervisor)) if err := service.Register(server); err != nil {
contentapi.RegisterContentServer(server, content.NewService(contentStore)) return err
}
// start the GRPC api with registered services }
if err := serveGRPC(server); err != nil { if err := serveGRPC(server); err != nil {
return err return err
} }
// start the prometheus metrics API for containerd
serveMetricsAPI()
log.G(global).Infof("containerd successfully booted in %fs", time.Now().Sub(start).Seconds()) log.G(global).Infof("containerd successfully booted in %fs", time.Now().Sub(start).Seconds())
return handleSignals(signals, server) return handleSignals(signals, server)
} }
@ -209,14 +214,28 @@ func resolveContentStore() (*content.Store, error) {
} }
func loadRuntimes() (map[string]containerd.Runtime, error) { func loadRuntimes() (map[string]containerd.Runtime, error) {
o := map[string]containerd.Runtime{} o := make(map[string]containerd.Runtime)
for _, name := range containerd.Runtimes() { for name, rr := range containerd.Registrations() {
r, err := containerd.NewRuntime(name, conf.State) if rr.Type != containerd.RuntimePlugin {
continue
}
log.G(global).Infof("loading runtime plugin %q...", name)
ic := &containerd.InitContext{
Root: conf.Root,
State: conf.State,
Context: log.WithModule(global, fmt.Sprintf("runtime-%s", name)),
}
if rr.Config != nil {
if err := conf.decodePlugin(name, rr.Config); err != nil {
return nil, err
}
ic.Config = rr.Config
}
vr, err := rr.Init(ic)
if err != nil { if err != nil {
return nil, err return nil, err
} }
o[name] = r o[name] = vr.(containerd.Runtime)
log.G(global).WithField("runtime", name).Info("load runtime")
} }
return o, nil return o, nil
} }
@ -226,6 +245,35 @@ func newGRPCServer() *grpc.Server {
return s return s
} }
func loadServices(runtimes map[string]containerd.Runtime, store *content.Store) ([]containerd.Service, error) {
var o []containerd.Service
for name, sr := range containerd.Registrations() {
if sr.Type != containerd.GRPCPlugin {
continue
}
log.G(global).Infof("loading grpc service plugin %q...", name)
ic := &containerd.InitContext{
Root: conf.Root,
State: conf.State,
Context: log.WithModule(global, fmt.Sprintf("service-%s", name)),
Runtimes: runtimes,
Store: store,
}
if sr.Config != nil {
if err := conf.decodePlugin(name, sr.Config); err != nil {
return nil, err
}
ic.Config = sr.Config
}
vs, err := sr.Init(ic)
if err != nil {
return nil, err
}
o = append(o, vs.(containerd.Service))
}
return o, nil
}
func serveGRPC(server *grpc.Server) error { func serveGRPC(server *grpc.Server) error {
path := conf.GRPC.Socket path := conf.GRPC.Socket
if path == "" { if path == "" {

View file

@ -32,3 +32,7 @@ type State interface {
// Pid is the main process id for the container // Pid is the main process id for the container
Pid() uint32 Pid() uint32
} }
type ContainerMonitor interface {
Monitor(context.Context, Container) error
}

View file

@ -13,7 +13,7 @@ import (
var ( var (
errNotFound = errors.New("content: not found") errNotFound = errors.New("content: not found")
bufPool = sync.Pool{ BufPool = sync.Pool{
New: func() interface{} { New: func() interface{} {
return make([]byte, 1<<20) return make([]byte, 1<<20)
}, },

View file

@ -33,8 +33,8 @@ func WriteBlob(ctx context.Context, cs Ingester, r io.Reader, ref string, size i
return errors.Errorf("cannot resume already started write") return errors.Errorf("cannot resume already started write")
} }
buf := bufPool.Get().([]byte) buf := BufPool.Get().([]byte)
defer bufPool.Put(buf) defer BufPool.Put(buf)
nn, err := io.CopyBuffer(cw, r, buf) nn, err := io.CopyBuffer(cw, r, buf)
if err != nil { if err != nil {

View file

@ -221,8 +221,8 @@ func (s *Store) Writer(ctx context.Context, ref string) (Writer, error) {
} }
defer fp.Close() defer fp.Close()
p := bufPool.Get().([]byte) p := BufPool.Get().([]byte)
defer bufPool.Put(p) defer BufPool.Put(p)
offset, err = io.CopyBuffer(digester.Hash(), fp, p) offset, err = io.CopyBuffer(digester.Hash(), fp, p)
if err != nil { if err != nil {

66
design/plugins.md Normal file
View file

@ -0,0 +1,66 @@
# containerd Plugin Model
With go 1.8 we now have dynamically loaded plugins via go packages. This seems to be a very easy and clean way to extent containerd. It does have the drawback of only working on Linux right now but there is where we see the most need for swapping out defaults.
## core
To be extended the core of containerd has to provide go packages and interfaces that can be extended with third-party packages. The core should be small but provide value for people building on top of containerd.
The core should be comprised of the following:
* Snapshotters - Provide way to manage the filesystems of containers and images on a host.
* Runtime - Provide a way to launch containers via the OCI runtime specification.
* Distribution - Provide a way to fetch and push content to external sources/registries.
* Content Store - Provide a generic content addressed store for bridging the gap between registries and snapshotters.
* Metadata - Provide a consistent way for the core and various subsystems to store metadata.
* Monitoring - Provide a way to monitor different subsystems, containers, and operations throughout the core with metrics and events.
### Runtime
The runtime code in the core provides API to create, list, and manage containers on the system. It provides a runtime type that is responsible for creating, deleting, and loading containers.
```go
type Runtime interface {
Create(ctx context.Context, id string, opts CreateOpts) (Container, error)
Containers() ([]Container, error)
Delete(ctx context.Context, c Container) error
}
```
There is a common `Container` interface with common methods for interacting with the container as well as platform specific container interfaces.
```go
type Container interface {
Info() ContainerInfo
Start(context.Context) error
State(context.Context) (State, error)
Events(context.Context) (<-chan ContainerEvent, error)
Kill(context.Context) error
}
type LinuxContainer interface {
Pause(context.Context) error
Resume(context.Context) error
Exec(context.Context, ExecOpts) (uint32, error)
Signal(c context.Context, pid uint32, s os.Signal) error
}
type WindowsContainer interface {
Exec(context.Context, ExecOpts) (uint32, error)
Signal(c context.Context, pid uint32, s os.Signal) error
}
```
### Monitoring
The monitoring subsystem is a way to collect events and metrics from various subsystems.
With the monitoring subsystem you can monitor various types, subsystems, and objects within the core.
This can be use to collect metrics for containers and monitor OOM events when supported.
An example of this is a prometheus monitor that exports container metrics such as cpu, memory, io, and network information.
```go
type ContainerMonitor interface {
Monitor(context.Context, Container) error
}
```

View file

@ -39,32 +39,3 @@ type Event struct {
Pid uint32 Pid uint32
ExitStatus uint32 ExitStatus uint32
} }
type EventWriter interface {
Write(*Event) error
}
type EventFilter func(*Event) bool
// NewFilterEventWriter returns an EventWriter that runs the provided filters on the events.
// If all the filters pass then the event is written to the wrapped EventWriter
func NewFilterEventWriter(w EventWriter, filters ...EventFilter) EventWriter {
return &filteredEventWriter{
w: w,
filters: filters,
}
}
type filteredEventWriter struct {
w EventWriter
filters []EventFilter
}
func (f *filteredEventWriter) Write(e *Event) error {
for _, filter := range f.filters {
if !filter(e) {
return nil
}
}
return f.w.Write(e)
}

7
linux/Makefile Normal file
View file

@ -0,0 +1,7 @@
all:
go build -buildmode=plugin -o shim-linux-amd64.so
install:
mkdir -p /var/lib/containerd/plugins
cp shim-linux-amd64.so /var/lib/containerd/plugins/

View file

@ -1,4 +1,4 @@
package linux package main
import ( import (
"github.com/docker/containerd" "github.com/docker/containerd"

View file

@ -1,4 +1,4 @@
package linux package main
import ( import (
"bytes" "bytes"
@ -24,16 +24,19 @@ const (
) )
func init() { func init() {
containerd.RegisterRuntime(runtimeName, New) containerd.Register(runtimeName, &containerd.Registration{
Type: containerd.RuntimePlugin,
Init: New,
})
} }
func New(root string) (containerd.Runtime, error) { func New(ic *containerd.InitContext) (interface{}, error) {
if err := os.MkdirAll(root, 0700); err != nil { if err := os.MkdirAll(ic.State, 0700); err != nil {
return nil, err return nil, err
} }
c, cancel := context.WithCancel(context.Background()) c, cancel := context.WithCancel(ic.Context)
return &Runtime{ return &Runtime{
root: root, root: ic.State,
events: make(chan *containerd.Event, 2048), events: make(chan *containerd.Event, 2048),
eventsContext: c, eventsContext: c,
eventsCancel: cancel, eventsCancel: cancel,
@ -110,7 +113,7 @@ func (r *Runtime) Containers() ([]containerd.Container, error) {
if !fi.IsDir() { if !fi.IsDir() {
continue continue
} }
c, err := r.loadContainer(fi.Name()) c, err := r.loadContainer(filepath.Join(r.root, fi.Name()))
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -1,4 +1,4 @@
package linux package main
import ( import (
"fmt" "fmt"

111
plugin.go Normal file
View file

@ -0,0 +1,111 @@
package containerd
import (
"fmt"
"path/filepath"
"plugin"
"runtime"
"sync"
"github.com/docker/containerd/content"
"golang.org/x/net/context"
"google.golang.org/grpc"
)
type PluginType int
const (
RuntimePlugin PluginType = iota + 1
GRPCPlugin
)
type Registration struct {
Type PluginType
Config interface{}
Init func(*InitContext) (interface{}, error)
}
type InitContext struct {
Root string
State string
Runtimes map[string]Runtime
Store *content.Store
Config interface{}
Context context.Context
}
type Service interface {
Register(*grpc.Server) error
}
var register = struct {
sync.Mutex
r map[string]*Registration
}{
r: make(map[string]*Registration),
}
// Load loads all plugins at the provided that into containerd
func Load(path string) (err error) {
defer func() {
if v := recover(); v != nil {
rerr, ok := v.(error)
if !ok {
panic(v)
}
err = rerr
}
}()
return loadPlugins(path)
}
func Register(name string, r *Registration) error {
register.Lock()
defer register.Unlock()
if _, ok := register.r[name]; ok {
return fmt.Errorf("plugin already registered as %q", name)
}
register.r[name] = r
return nil
}
func Registrations() map[string]*Registration {
return register.r
}
// loadPlugins loads all plugins for the OS and Arch
// that containerd is built for inside the provided path
func loadPlugins(path string) error {
abs, err := filepath.Abs(path)
if err != nil {
return err
}
pattern := filepath.Join(abs, fmt.Sprintf(
"*-%s-%s.%s",
runtime.GOOS,
runtime.GOARCH,
getLibExt(),
))
libs, err := filepath.Glob(pattern)
if err != nil {
return err
}
for _, lib := range libs {
if _, err := plugin.Open(lib); err != nil {
return err
}
}
return nil
}
// getLibExt returns a platform specific lib extension for
// the platform that containerd is running on
func getLibExt() string {
switch runtime.GOOS {
case "windows":
return "dll"
default:
return "so"
}
}

View file

@ -1,53 +1,6 @@
package containerd package containerd
import ( import "golang.org/x/net/context"
"fmt"
"sync"
"golang.org/x/net/context"
)
// NewRuntimeFunc is the runtime's constructor
type NewRuntimeFunc func(root string) (Runtime, error)
var runtimeRegistration = struct {
mu sync.Mutex
runtimes map[string]NewRuntimeFunc
}{
runtimes: make(map[string]NewRuntimeFunc),
}
// RegisterRuntime is not external packages registers Runtimes for use with containerd
func RegisterRuntime(name string, f NewRuntimeFunc) {
runtimeRegistration.mu.Lock()
defer runtimeRegistration.mu.Unlock()
if _, ok := runtimeRegistration.runtimes[name]; ok {
panic(fmt.Errorf("runtime already registered as %q", name))
}
runtimeRegistration.runtimes[name] = f
}
// Runtimes returns a slice of all registered runtime names for containerd
func Runtimes() (o []string) {
runtimeRegistration.mu.Lock()
defer runtimeRegistration.mu.Unlock()
for k := range runtimeRegistration.runtimes {
o = append(o, k)
}
return o
}
// NewRuntime calls the runtime's constructor with the provided root
func NewRuntime(name, root string) (Runtime, error) {
runtimeRegistration.mu.Lock()
defer runtimeRegistration.mu.Unlock()
f, ok := runtimeRegistration.runtimes[name]
if !ok {
return nil, ErrRuntimeNotExist
}
return f(root)
}
type IO struct { type IO struct {
Stdin string Stdin string
@ -73,7 +26,7 @@ type Runtime interface {
// Containers returns all the current containers for the runtime // Containers returns all the current containers for the runtime
Containers() ([]Container, error) Containers() ([]Container, error)
// Delete removes the container in the runtime // Delete removes the container in the runtime
Delete(ctx context.Context, c Container) error Delete(context.Context, Container) error
// Events returns events for the runtime and all containers created by the runtime // Events returns events for the runtime and all containers created by the runtime
Events(context.Context) <-chan *Event Events(context.Context) <-chan *Event
} }

View file

@ -4,23 +4,39 @@ import (
"errors" "errors"
"io" "io"
contentapi "github.com/docker/containerd/api/services/content" "github.com/docker/containerd"
api "github.com/docker/containerd/api/services/content"
"github.com/docker/containerd/content"
"golang.org/x/net/context" "golang.org/x/net/context"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
) )
type Service struct { type Service struct {
store *Store store *content.Store
} }
var _ contentapi.ContentServer = &Service{} var _ api.ContentServer = &Service{}
func NewService(store *Store) contentapi.ContentServer { func init() {
return &Service{store: store} containerd.Register("content-grpc", &containerd.Registration{
Type: containerd.GRPCPlugin,
Init: NewService,
})
} }
func (s *Service) Info(ctx context.Context, req *contentapi.InfoRequest) (*contentapi.InfoResponse, error) { func NewService(ic *containerd.InitContext) (interface{}, error) {
return &Service{
store: ic.Store,
}, nil
}
func (s *Service) Register(server *grpc.Server) error {
api.RegisterContentServer(server, s)
return nil
}
func (s *Service) Info(ctx context.Context, req *api.InfoRequest) (*api.InfoResponse, error) {
if err := req.Digest.Validate(); err != nil { if err := req.Digest.Validate(); err != nil {
return nil, grpc.Errorf(codes.InvalidArgument, "%q failed validation", req.Digest) return nil, grpc.Errorf(codes.InvalidArgument, "%q failed validation", req.Digest)
} }
@ -30,14 +46,14 @@ func (s *Service) Info(ctx context.Context, req *contentapi.InfoRequest) (*conte
return nil, maybeNotFoundGRPC(err, req.Digest.String()) return nil, maybeNotFoundGRPC(err, req.Digest.String())
} }
return &contentapi.InfoResponse{ return &api.InfoResponse{
Digest: req.Digest, Digest: req.Digest,
Size_: bi.Size, Size_: bi.Size,
CommittedAt: bi.CommittedAt, CommittedAt: bi.CommittedAt,
}, nil }, nil
} }
func (s *Service) Read(req *contentapi.ReadRequest, session contentapi.Content_ReadServer) error { func (s *Service) Read(req *api.ReadRequest, session api.Content_ReadServer) error {
if err := req.Digest.Validate(); err != nil { if err := req.Digest.Validate(); err != nil {
return grpc.Errorf(codes.InvalidArgument, "%v: %v", req.Digest, err) return grpc.Errorf(codes.InvalidArgument, "%v: %v", req.Digest, err)
} }
@ -68,9 +84,9 @@ func (s *Service) Read(req *contentapi.ReadRequest, session contentapi.Content_R
// TODO(stevvooe): Using the global buffer pool. At 32KB, it is probably // TODO(stevvooe): Using the global buffer pool. At 32KB, it is probably
// little inefficient for work over a fast network. We can tune this later. // little inefficient for work over a fast network. We can tune this later.
p = bufPool.Get().([]byte) p = content.BufPool.Get().([]byte)
) )
defer bufPool.Put(p) defer content.BufPool.Put(p)
if offset < 0 { if offset < 0 {
offset = 0 offset = 0
@ -95,11 +111,11 @@ func (s *Service) Read(req *contentapi.ReadRequest, session contentapi.Content_R
type readResponseWriter struct { type readResponseWriter struct {
offset int64 offset int64
session contentapi.Content_ReadServer session api.Content_ReadServer
} }
func (rw *readResponseWriter) Write(p []byte) (n int, err error) { func (rw *readResponseWriter) Write(p []byte) (n int, err error) {
if err := rw.session.Send(&contentapi.ReadResponse{ if err := rw.session.Send(&api.ReadResponse{
Offset: rw.offset, Offset: rw.offset,
Data: p, Data: p,
}); err != nil { }); err != nil {
@ -110,14 +126,14 @@ func (rw *readResponseWriter) Write(p []byte) (n int, err error) {
return len(p), nil return len(p), nil
} }
func (s *Service) Write(session contentapi.Content_WriteServer) (err error) { func (s *Service) Write(session api.Content_WriteServer) (err error) {
var ( var (
ref string ref string
msg contentapi.WriteResponse msg api.WriteResponse
req *contentapi.WriteRequest req *api.WriteRequest
) )
defer func(msg *contentapi.WriteResponse) { defer func(msg *api.WriteResponse) {
// pump through the last message if no error was encountered // pump through the last message if no error was encountered
if err != nil { if err != nil {
return return
@ -153,7 +169,7 @@ func (s *Service) Write(session contentapi.Content_WriteServer) (err error) {
// cost of the move when they collide. // cost of the move when they collide.
if req.ExpectedDigest != "" { if req.ExpectedDigest != "" {
if _, err := s.store.Info(req.ExpectedDigest); err != nil { if _, err := s.store.Info(req.ExpectedDigest); err != nil {
if !IsNotFound(err) { if !content.IsNotFound(err) {
return err return err
} }
@ -172,9 +188,9 @@ func (s *Service) Write(session contentapi.Content_WriteServer) (err error) {
msg.UpdatedAt = ws.UpdatedAt msg.UpdatedAt = ws.UpdatedAt
switch req.Action { switch req.Action {
case contentapi.WriteActionStat: case api.WriteActionStat:
msg.Digest = wr.Digest() msg.Digest = wr.Digest()
case contentapi.WriteActionWrite, contentapi.WriteActionCommit: case api.WriteActionWrite, api.WriteActionCommit:
if req.Offset > 0 { if req.Offset > 0 {
// validate the offset if provided // validate the offset if provided
if req.Offset != ws.Offset { if req.Offset != ws.Offset {
@ -200,10 +216,10 @@ func (s *Service) Write(session contentapi.Content_WriteServer) (err error) {
msg.Offset += int64(n) msg.Offset += int64(n)
} }
if req.Action == contentapi.WriteActionCommit { if req.Action == api.WriteActionCommit {
return wr.Commit(req.ExpectedSize, req.ExpectedDigest) return wr.Commit(req.ExpectedSize, req.ExpectedDigest)
} }
case contentapi.WriteActionAbort: case api.WriteActionAbort:
return s.store.Abort(ref) return s.store.Abort(ref)
} }
@ -220,12 +236,12 @@ func (s *Service) Write(session contentapi.Content_WriteServer) (err error) {
return nil return nil
} }
func (s *Service) Status(*contentapi.StatusRequest, contentapi.Content_StatusServer) error { func (s *Service) Status(*api.StatusRequest, api.Content_StatusServer) error {
return grpc.Errorf(codes.Unimplemented, "not implemented") return grpc.Errorf(codes.Unimplemented, "not implemented")
} }
func maybeNotFoundGRPC(err error, id string) error { func maybeNotFoundGRPC(err error, id string) error {
if IsNotFound(err) { if content.IsNotFound(err) {
return grpc.Errorf(codes.NotFound, "%v: not found", id) return grpc.Errorf(codes.NotFound, "%v: not found", id)
} }

View file

@ -1,15 +1,17 @@
package containerd package execution
import ( import (
"sync" "sync"
"github.com/docker/containerd"
"golang.org/x/net/context" "golang.org/x/net/context"
) )
func newCollector(ctx context.Context, runtimes map[string]Runtime) (*collector, error) { func newCollector(ctx context.Context, runtimes map[string]containerd.Runtime) (*collector, error) {
c := &collector{ c := &collector{
context: ctx, context: ctx,
ch: make(chan *Event, 2048), ch: make(chan *containerd.Event, 2048),
eventClients: make(map[*eventClient]struct{}), eventClients: make(map[*eventClient]struct{}),
} }
for _, r := range runtimes { for _, r := range runtimes {
@ -27,7 +29,7 @@ func newCollector(ctx context.Context, runtimes map[string]Runtime) (*collector,
type eventClient struct { type eventClient struct {
eCh chan error eCh chan error
w EventWriter w *grpcEventWriter
} }
type collector struct { type collector struct {
@ -35,12 +37,12 @@ type collector struct {
wg sync.WaitGroup wg sync.WaitGroup
context context.Context context context.Context
ch chan *Event ch chan *containerd.Event
eventClients map[*eventClient]struct{} eventClients map[*eventClient]struct{}
} }
// collect collects events from the provided runtime // collect collects events from the provided runtime
func (c *collector) collect(r Runtime) error { func (c *collector) collect(r containerd.Runtime) error {
c.wg.Add(1) c.wg.Add(1)
go func() { go func() {
defer c.wg.Done() defer c.wg.Done()
@ -51,12 +53,7 @@ func (c *collector) collect(r Runtime) error {
return nil return nil
} }
// Forward forwards all events from the collector to the EventWriters func (c *collector) forward(w *grpcEventWriter) error {
//
// It forwards events until the channels are closed or the EventWriter
// returns an error
// This is a blocking call
func (c *collector) forward(w EventWriter) error {
client := &eventClient{ client := &eventClient{
w: w, w: w,
eCh: make(chan error, 1), eCh: make(chan error, 1),

View file

@ -1,11 +1,14 @@
package execution package execution
import ( import (
"sync"
"github.com/docker/containerd" "github.com/docker/containerd"
api "github.com/docker/containerd/api/services/execution" api "github.com/docker/containerd/api/services/execution"
"github.com/docker/containerd/api/types/container" "github.com/docker/containerd/api/types/container"
google_protobuf "github.com/golang/protobuf/ptypes/empty" google_protobuf "github.com/golang/protobuf/ptypes/empty"
"golang.org/x/net/context" "golang.org/x/net/context"
"google.golang.org/grpc"
) )
var ( var (
@ -13,15 +16,46 @@ var (
empty = &google_protobuf.Empty{} empty = &google_protobuf.Empty{}
) )
// New creates a new GRPC service for the ContainerService func init() {
func New(s *containerd.Supervisor) *Service { containerd.Register("runtime-grpc", &containerd.Registration{
return &Service{ Type: containerd.GRPCPlugin,
s: s, Init: New,
})
}
func New(ic *containerd.InitContext) (interface{}, error) {
c, err := newCollector(ic.Context, ic.Runtimes)
if err != nil {
return nil, err
} }
return &Service{
runtimes: ic.Runtimes,
containers: make(map[string]containerd.Container),
collector: c,
}, nil
} }
type Service struct { type Service struct {
s *containerd.Supervisor mu sync.Mutex
runtimes map[string]containerd.Runtime
containers map[string]containerd.Container
collector *collector
}
func (s *Service) Register(server *grpc.Server) error {
api.RegisterContainerServiceServer(server, s)
// load all containers
for _, r := range s.runtimes {
containers, err := r.Containers()
if err != nil {
return err
}
for _, c := range containers {
s.containers[c.Info().ID] = c
}
}
return nil
} }
func (s *Service) Create(ctx context.Context, r *api.CreateRequest) (*api.CreateResponse, error) { func (s *Service) Create(ctx context.Context, r *api.CreateRequest) (*api.CreateResponse, error) {
@ -41,13 +75,28 @@ func (s *Service) Create(ctx context.Context, r *api.CreateRequest) (*api.Create
Options: m.Options, Options: m.Options,
}) })
} }
c, err := s.s.Create(ctx, r.ID, r.Runtime, opts) runtime, err := s.getRuntime(r.Runtime)
if err != nil { if err != nil {
return nil, err return nil, err
} }
s.mu.Lock()
if _, ok := s.containers[r.ID]; ok {
s.mu.Unlock()
return nil, containerd.ErrContainerExists
}
c, err := runtime.Create(ctx, r.ID, opts)
if err != nil {
s.mu.Unlock()
return nil, err
}
s.containers[r.ID] = c
s.mu.Unlock()
state, err := c.State(ctx) state, err := c.State(ctx)
if err != nil { if err != nil {
s.s.Delete(ctx, r.ID) s.mu.Lock()
delete(s.containers, r.ID)
runtime.Delete(ctx, c)
s.mu.Unlock()
return nil, err return nil, err
} }
return &api.CreateResponse{ return &api.CreateResponse{
@ -57,7 +106,7 @@ func (s *Service) Create(ctx context.Context, r *api.CreateRequest) (*api.Create
} }
func (s *Service) Start(ctx context.Context, r *api.StartRequest) (*google_protobuf.Empty, error) { func (s *Service) Start(ctx context.Context, r *api.StartRequest) (*google_protobuf.Empty, error) {
c, err := s.s.Get(r.ID) c, err := s.getContainer(r.ID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -68,7 +117,15 @@ func (s *Service) Start(ctx context.Context, r *api.StartRequest) (*google_proto
} }
func (s *Service) Delete(ctx context.Context, r *api.DeleteRequest) (*google_protobuf.Empty, error) { func (s *Service) Delete(ctx context.Context, r *api.DeleteRequest) (*google_protobuf.Empty, error) {
if err := s.s.Delete(ctx, r.ID); err != nil { c, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
runtime, err := s.getRuntime(c.Info().Runtime)
if err != nil {
return nil, err
}
if err := runtime.Delete(ctx, c); err != nil {
return nil, err return nil, err
} }
return empty, nil return empty, nil
@ -76,7 +133,7 @@ func (s *Service) Delete(ctx context.Context, r *api.DeleteRequest) (*google_pro
func (s *Service) List(ctx context.Context, r *api.ListRequest) (*api.ListResponse, error) { func (s *Service) List(ctx context.Context, r *api.ListRequest) (*api.ListResponse, error) {
resp := &api.ListResponse{} resp := &api.ListResponse{}
for _, c := range s.s.Containers() { for _, c := range s.containers {
state, err := c.State(ctx) state, err := c.State(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
@ -105,7 +162,25 @@ func (s *Service) Events(r *api.EventsRequest, server api.ContainerService_Event
w := &grpcEventWriter{ w := &grpcEventWriter{
server: server, server: server,
} }
return s.s.ForwardEvents(w) return s.collector.forward(w)
}
func (s *Service) getContainer(id string) (containerd.Container, error) {
s.mu.Lock()
c, ok := s.containers[id]
s.mu.Unlock()
if !ok {
return nil, containerd.ErrContainerNotExist
}
return c, nil
}
func (s *Service) getRuntime(name string) (containerd.Runtime, error) {
runtime, ok := s.runtimes[name]
if !ok {
return nil, containerd.ErrUnknownRuntime
}
return runtime, nil
} }
type grpcEventWriter struct { type grpcEventWriter struct {

View file

@ -1,100 +0,0 @@
package containerd
import (
"sync"
"golang.org/x/net/context"
)
func NewSupervisor(ctx context.Context, runtimes map[string]Runtime) (*Supervisor, error) {
c, err := newCollector(ctx, runtimes)
if err != nil {
return nil, err
}
s := &Supervisor{
containers: make(map[string]Container),
runtimes: runtimes,
collector: c,
}
for _, r := range runtimes {
containers, err := r.Containers()
if err != nil {
return nil, err
}
for _, c := range containers {
s.containers[c.Info().ID] = c
}
}
return s, nil
}
// Supervisor supervises containers and events from multiple runtimes
type Supervisor struct {
mu sync.Mutex
containers map[string]Container
runtimes map[string]Runtime
collector *collector
}
// ForwardEvents is a blocking method that will forward all events from the supervisor
// to the EventWriter provided by the caller
func (s *Supervisor) ForwardEvents(w EventWriter) error {
return s.collector.forward(w)
}
// Create creates a new container with the provided runtime
func (s *Supervisor) Create(ctx context.Context, id, runtime string, opts CreateOpts) (Container, error) {
r, ok := s.runtimes[runtime]
if !ok {
return nil, ErrUnknownRuntime
}
// check to make sure the container's id is unique across the entire system
s.mu.Lock()
defer s.mu.Unlock()
if _, ok := s.containers[id]; ok {
return nil, ErrContainerExists
}
c, err := r.Create(ctx, id, opts)
if err != nil {
return nil, err
}
s.containers[c.Info().ID] = c
return c, nil
}
// Delete deletes the container
func (s *Supervisor) Delete(ctx context.Context, id string) error {
s.mu.Lock()
defer s.mu.Unlock()
c, ok := s.containers[id]
if !ok {
return ErrContainerNotExist
}
err := s.runtimes[c.Info().Runtime].Delete(ctx, c)
if err != nil {
return err
}
delete(s.containers, id)
return nil
}
// Containers returns all the containers for the supervisor
func (s *Supervisor) Containers() (o []Container) {
s.mu.Lock()
defer s.mu.Unlock()
for _, c := range s.containers {
o = append(o, c)
}
return o
}
func (s *Supervisor) Get(id string) (Container, error) {
s.mu.Lock()
c, ok := s.containers[id]
s.mu.Unlock()
if !ok {
return nil, ErrContainerNotExist
}
return c, nil
}