Merge pull request #556 from crosbymichael/plugins
Use go 1.8 plugins for extending core functionality
This commit is contained in:
commit
0a5544d8c4
27 changed files with 410 additions and 265 deletions
|
@ -4,7 +4,6 @@ sudo: required
|
|||
language: go
|
||||
|
||||
go:
|
||||
- 1.7.x
|
||||
- 1.8.x
|
||||
- tip
|
||||
|
||||
|
|
|
@ -13,7 +13,7 @@ import (
|
|||
"github.com/Sirupsen/logrus"
|
||||
"github.com/docker/containerd"
|
||||
shimapi "github.com/docker/containerd/api/services/shim"
|
||||
"github.com/docker/containerd/shim"
|
||||
"github.com/docker/containerd/linux/shim"
|
||||
"github.com/docker/containerd/sys"
|
||||
"github.com/docker/containerd/utils"
|
||||
"github.com/urfave/cli"
|
||||
|
|
7
cmd/containerd/builtins.go
Normal file
7
cmd/containerd/builtins.go
Normal file
|
@ -0,0 +1,7 @@
|
|||
package main
|
||||
|
||||
// register containerd builtins here
|
||||
import (
|
||||
_ "github.com/docker/containerd/services/content"
|
||||
_ "github.com/docker/containerd/services/execution"
|
||||
)
|
|
@ -18,10 +18,11 @@ func defaultConfig() *config {
|
|||
|
||||
// loadConfig loads the config from the provided path
|
||||
func loadConfig(path string) error {
|
||||
_, err := toml.DecodeFile(path, conf)
|
||||
md, err := toml.DecodeFile(path, conf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
conf.md = md
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -38,6 +39,18 @@ type config struct {
|
|||
Debug debug `toml:"debug"`
|
||||
// Metrics and monitoring settings
|
||||
Metrics metricsConfig `toml:"metrics"`
|
||||
// Plugins provides plugin specific configuration for the initialization of a plugin
|
||||
Plugins map[string]toml.Primitive `toml:"plugins"`
|
||||
|
||||
md toml.MetaData
|
||||
}
|
||||
|
||||
func (c *config) decodePlugin(name string, v interface{}) error {
|
||||
p, ok := c.Plugins[name]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
return c.md.PrimitiveDecode(p, v)
|
||||
}
|
||||
|
||||
type grpcConfig struct {
|
||||
|
|
|
@ -16,12 +16,9 @@ import (
|
|||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/docker/containerd"
|
||||
contentapi "github.com/docker/containerd/api/services/content"
|
||||
api "github.com/docker/containerd/api/services/execution"
|
||||
"github.com/docker/containerd/content"
|
||||
_ "github.com/docker/containerd/linux"
|
||||
"github.com/docker/containerd/log"
|
||||
"github.com/docker/containerd/services/execution"
|
||||
"github.com/docker/containerd/utils"
|
||||
metrics "github.com/docker/go-metrics"
|
||||
"github.com/pkg/errors"
|
||||
|
@ -66,6 +63,10 @@ func main() {
|
|||
Name: "socket,s",
|
||||
Usage: "socket path for containerd's GRPC server",
|
||||
},
|
||||
cli.StringFlag{
|
||||
Name: "root",
|
||||
Usage: "containerd root directory",
|
||||
},
|
||||
}
|
||||
app.Before = before
|
||||
app.Action = func(context *cli.Context) error {
|
||||
|
@ -74,37 +75,41 @@ func main() {
|
|||
// we don't miss any signals during boot
|
||||
signals := make(chan os.Signal, 2048)
|
||||
signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT, syscall.SIGUSR1)
|
||||
|
||||
log.G(global).Info("starting containerd boot...")
|
||||
runtimes, err := loadRuntimes()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
supervisor, err := containerd.NewSupervisor(log.WithModule(global, "execution"), runtimes)
|
||||
if err != nil {
|
||||
|
||||
// load all plugins into containerd
|
||||
if err := containerd.Load(filepath.Join(conf.Root, "plugins")); err != nil {
|
||||
return err
|
||||
}
|
||||
// start debug and metrics APIs
|
||||
if err := serveDebugAPI(); err != nil {
|
||||
return err
|
||||
}
|
||||
serveMetricsAPI()
|
||||
|
||||
contentStore, err := resolveContentStore()
|
||||
runtimes, err := loadRuntimes()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
store, err := resolveContentStore()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
services, err := loadServices(runtimes, store)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// start the GRPC api with the execution service registered
|
||||
server := newGRPCServer()
|
||||
|
||||
api.RegisterContainerServiceServer(server, execution.New(supervisor))
|
||||
contentapi.RegisterContentServer(server, content.NewService(contentStore))
|
||||
|
||||
// start the GRPC api with registered services
|
||||
for _, service := range services {
|
||||
if err := service.Register(server); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if err := serveGRPC(server); err != nil {
|
||||
return err
|
||||
}
|
||||
// start the prometheus metrics API for containerd
|
||||
serveMetricsAPI()
|
||||
|
||||
log.G(global).Infof("containerd successfully booted in %fs", time.Now().Sub(start).Seconds())
|
||||
return handleSignals(signals, server)
|
||||
}
|
||||
|
@ -209,14 +214,28 @@ func resolveContentStore() (*content.Store, error) {
|
|||
}
|
||||
|
||||
func loadRuntimes() (map[string]containerd.Runtime, error) {
|
||||
o := map[string]containerd.Runtime{}
|
||||
for _, name := range containerd.Runtimes() {
|
||||
r, err := containerd.NewRuntime(name, conf.State)
|
||||
o := make(map[string]containerd.Runtime)
|
||||
for name, rr := range containerd.Registrations() {
|
||||
if rr.Type != containerd.RuntimePlugin {
|
||||
continue
|
||||
}
|
||||
log.G(global).Infof("loading runtime plugin %q...", name)
|
||||
ic := &containerd.InitContext{
|
||||
Root: conf.Root,
|
||||
State: conf.State,
|
||||
Context: log.WithModule(global, fmt.Sprintf("runtime-%s", name)),
|
||||
}
|
||||
if rr.Config != nil {
|
||||
if err := conf.decodePlugin(name, rr.Config); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ic.Config = rr.Config
|
||||
}
|
||||
vr, err := rr.Init(ic)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
o[name] = r
|
||||
log.G(global).WithField("runtime", name).Info("load runtime")
|
||||
o[name] = vr.(containerd.Runtime)
|
||||
}
|
||||
return o, nil
|
||||
}
|
||||
|
@ -226,6 +245,35 @@ func newGRPCServer() *grpc.Server {
|
|||
return s
|
||||
}
|
||||
|
||||
func loadServices(runtimes map[string]containerd.Runtime, store *content.Store) ([]containerd.Service, error) {
|
||||
var o []containerd.Service
|
||||
for name, sr := range containerd.Registrations() {
|
||||
if sr.Type != containerd.GRPCPlugin {
|
||||
continue
|
||||
}
|
||||
log.G(global).Infof("loading grpc service plugin %q...", name)
|
||||
ic := &containerd.InitContext{
|
||||
Root: conf.Root,
|
||||
State: conf.State,
|
||||
Context: log.WithModule(global, fmt.Sprintf("service-%s", name)),
|
||||
Runtimes: runtimes,
|
||||
Store: store,
|
||||
}
|
||||
if sr.Config != nil {
|
||||
if err := conf.decodePlugin(name, sr.Config); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ic.Config = sr.Config
|
||||
}
|
||||
vs, err := sr.Init(ic)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
o = append(o, vs.(containerd.Service))
|
||||
}
|
||||
return o, nil
|
||||
}
|
||||
|
||||
func serveGRPC(server *grpc.Server) error {
|
||||
path := conf.GRPC.Socket
|
||||
if path == "" {
|
||||
|
|
|
@ -32,3 +32,7 @@ type State interface {
|
|||
// Pid is the main process id for the container
|
||||
Pid() uint32
|
||||
}
|
||||
|
||||
type ContainerMonitor interface {
|
||||
Monitor(context.Context, Container) error
|
||||
}
|
||||
|
|
|
@ -13,7 +13,7 @@ import (
|
|||
var (
|
||||
errNotFound = errors.New("content: not found")
|
||||
|
||||
bufPool = sync.Pool{
|
||||
BufPool = sync.Pool{
|
||||
New: func() interface{} {
|
||||
return make([]byte, 1<<20)
|
||||
},
|
||||
|
|
|
@ -33,8 +33,8 @@ func WriteBlob(ctx context.Context, cs Ingester, r io.Reader, ref string, size i
|
|||
return errors.Errorf("cannot resume already started write")
|
||||
}
|
||||
|
||||
buf := bufPool.Get().([]byte)
|
||||
defer bufPool.Put(buf)
|
||||
buf := BufPool.Get().([]byte)
|
||||
defer BufPool.Put(buf)
|
||||
|
||||
nn, err := io.CopyBuffer(cw, r, buf)
|
||||
if err != nil {
|
||||
|
|
|
@ -221,8 +221,8 @@ func (s *Store) Writer(ctx context.Context, ref string) (Writer, error) {
|
|||
}
|
||||
defer fp.Close()
|
||||
|
||||
p := bufPool.Get().([]byte)
|
||||
defer bufPool.Put(p)
|
||||
p := BufPool.Get().([]byte)
|
||||
defer BufPool.Put(p)
|
||||
|
||||
offset, err = io.CopyBuffer(digester.Hash(), fp, p)
|
||||
if err != nil {
|
||||
|
|
33
design/plugins.md
Normal file
33
design/plugins.md
Normal file
|
@ -0,0 +1,33 @@
|
|||
# containerd Plugin Model
|
||||
|
||||
With go 1.8 we now have dynamically loaded plugins via go packages. This seems to be a very easy and clean way to extend containerd. It does have the drawback of only working on Linux right now but this is where we see the most need for swapping out defaults.
|
||||
|
||||
## core
|
||||
|
||||
To be extended the core of containerd has to provide go packages and interfaces that can be extended with third-party packages. The core should be small but provide value for people building on top of containerd.
|
||||
|
||||
The core should be comprised of the following:
|
||||
|
||||
* Snapshotters - Provide way to manage the filesystems of containers and images on a host.
|
||||
* Runtime - Provide a way to launch containers via the OCI runtime specification.
|
||||
* Distribution - Provide a way to fetch and push content to external sources/registries.
|
||||
* Content Store - Provide a generic content addressed store for bridging the gap between registries and snapshotters.
|
||||
* Metadata - Provide a consistent way for the core and various subsystems to store metadata.
|
||||
* Monitoring - Provide a way to monitor different subsystems, containers, and operations throughout the core with metrics and events.
|
||||
|
||||
### Runtime
|
||||
|
||||
The runtime code in the core provides API to create, list, and manage containers on the system. It provides a runtime type that is responsible for creating, deleting, and loading containers.
|
||||
|
||||
### Monitoring
|
||||
|
||||
The monitoring subsystem is a way to collect events and metrics from various subsystems.
|
||||
With the monitoring subsystem you can monitor various types, subsystems, and objects within the core.
|
||||
This can be used to collect metrics for containers and monitor OOM events when supported.
|
||||
An example of this is a prometheus monitor that exports container metrics such as cpu, memory, io, and network information.
|
||||
|
||||
```go
|
||||
type ContainerMonitor interface {
|
||||
Monitor(context.Context, Container) error
|
||||
}
|
||||
```
|
29
event.go
29
event.go
|
@ -39,32 +39,3 @@ type Event struct {
|
|||
Pid uint32
|
||||
ExitStatus uint32
|
||||
}
|
||||
|
||||
type EventWriter interface {
|
||||
Write(*Event) error
|
||||
}
|
||||
|
||||
type EventFilter func(*Event) bool
|
||||
|
||||
// NewFilterEventWriter returns an EventWriter that runs the provided filters on the events.
|
||||
// If all the filters pass then the event is written to the wrapped EventWriter
|
||||
func NewFilterEventWriter(w EventWriter, filters ...EventFilter) EventWriter {
|
||||
return &filteredEventWriter{
|
||||
w: w,
|
||||
filters: filters,
|
||||
}
|
||||
}
|
||||
|
||||
type filteredEventWriter struct {
|
||||
w EventWriter
|
||||
filters []EventFilter
|
||||
}
|
||||
|
||||
func (f *filteredEventWriter) Write(e *Event) error {
|
||||
for _, filter := range f.filters {
|
||||
if !filter(e) {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return f.w.Write(e)
|
||||
}
|
||||
|
|
7
linux/Makefile
Normal file
7
linux/Makefile
Normal file
|
@ -0,0 +1,7 @@
|
|||
|
||||
all:
|
||||
go build -buildmode=plugin -o shim-linux-amd64.so
|
||||
|
||||
install:
|
||||
mkdir -p /var/lib/containerd/plugins
|
||||
cp shim-linux-amd64.so /var/lib/containerd/plugins/
|
|
@ -1,4 +1,4 @@
|
|||
package linux
|
||||
package main
|
||||
|
||||
import (
|
||||
"github.com/docker/containerd"
|
||||
|
|
5
linux/main.go
Normal file
5
linux/main.go
Normal file
|
@ -0,0 +1,5 @@
|
|||
package main
|
||||
|
||||
func main() {
|
||||
panic("build as plugin not executable")
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package linux
|
||||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
|
@ -24,16 +24,20 @@ const (
|
|||
)
|
||||
|
||||
func init() {
|
||||
containerd.RegisterRuntime(runtimeName, New)
|
||||
containerd.Register(runtimeName, &containerd.Registration{
|
||||
Type: containerd.RuntimePlugin,
|
||||
Init: New,
|
||||
})
|
||||
}
|
||||
|
||||
func New(root string) (containerd.Runtime, error) {
|
||||
if err := os.MkdirAll(root, 0700); err != nil {
|
||||
func New(ic *containerd.InitContext) (interface{}, error) {
|
||||
path := filepath.Join(ic.State, runtimeName)
|
||||
if err := os.MkdirAll(path, 0700); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c, cancel := context.WithCancel(context.Background())
|
||||
c, cancel := context.WithCancel(ic.Context)
|
||||
return &Runtime{
|
||||
root: root,
|
||||
root: path,
|
||||
events: make(chan *containerd.Event, 2048),
|
||||
eventsContext: c,
|
||||
eventsCancel: cancel,
|
||||
|
@ -110,7 +114,7 @@ func (r *Runtime) Containers() ([]containerd.Container, error) {
|
|||
if !fi.IsDir() {
|
||||
continue
|
||||
}
|
||||
c, err := r.loadContainer(fi.Name())
|
||||
c, err := r.loadContainer(filepath.Join(r.root, fi.Name()))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
package linux
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
|
111
plugin.go
Normal file
111
plugin.go
Normal file
|
@ -0,0 +1,111 @@
|
|||
package containerd
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"plugin"
|
||||
"runtime"
|
||||
"sync"
|
||||
|
||||
"github.com/docker/containerd/content"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
type PluginType int
|
||||
|
||||
const (
|
||||
RuntimePlugin PluginType = iota + 1
|
||||
GRPCPlugin
|
||||
)
|
||||
|
||||
type Registration struct {
|
||||
Type PluginType
|
||||
Config interface{}
|
||||
Init func(*InitContext) (interface{}, error)
|
||||
}
|
||||
|
||||
type InitContext struct {
|
||||
Root string
|
||||
State string
|
||||
Runtimes map[string]Runtime
|
||||
Store *content.Store
|
||||
Config interface{}
|
||||
Context context.Context
|
||||
}
|
||||
|
||||
type Service interface {
|
||||
Register(*grpc.Server) error
|
||||
}
|
||||
|
||||
var register = struct {
|
||||
sync.Mutex
|
||||
r map[string]*Registration
|
||||
}{
|
||||
r: make(map[string]*Registration),
|
||||
}
|
||||
|
||||
// Load loads all plugins at the provided path into containerd
|
||||
func Load(path string) (err error) {
|
||||
defer func() {
|
||||
if v := recover(); v != nil {
|
||||
rerr, ok := v.(error)
|
||||
if !ok {
|
||||
rerr = fmt.Errorf("%s", v)
|
||||
}
|
||||
err = rerr
|
||||
}
|
||||
}()
|
||||
return loadPlugins(path)
|
||||
}
|
||||
|
||||
func Register(name string, r *Registration) error {
|
||||
register.Lock()
|
||||
defer register.Unlock()
|
||||
if _, ok := register.r[name]; ok {
|
||||
return fmt.Errorf("plugin already registered as %q", name)
|
||||
}
|
||||
register.r[name] = r
|
||||
return nil
|
||||
}
|
||||
|
||||
func Registrations() map[string]*Registration {
|
||||
return register.r
|
||||
}
|
||||
|
||||
// loadPlugins loads all plugins for the OS and Arch
|
||||
// that containerd is built for inside the provided path
|
||||
func loadPlugins(path string) error {
|
||||
abs, err := filepath.Abs(path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
pattern := filepath.Join(abs, fmt.Sprintf(
|
||||
"*-%s-%s.%s",
|
||||
runtime.GOOS,
|
||||
runtime.GOARCH,
|
||||
getLibExt(),
|
||||
))
|
||||
libs, err := filepath.Glob(pattern)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, lib := range libs {
|
||||
if _, err := plugin.Open(lib); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// getLibExt returns a platform specific lib extension for
|
||||
// the platform that containerd is running on
|
||||
func getLibExt() string {
|
||||
switch runtime.GOOS {
|
||||
case "windows":
|
||||
return "dll"
|
||||
default:
|
||||
return "so"
|
||||
}
|
||||
}
|
51
runtime.go
51
runtime.go
|
@ -1,53 +1,6 @@
|
|||
package containerd
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
// NewRuntimeFunc is the runtime's constructor
|
||||
type NewRuntimeFunc func(root string) (Runtime, error)
|
||||
|
||||
var runtimeRegistration = struct {
|
||||
mu sync.Mutex
|
||||
runtimes map[string]NewRuntimeFunc
|
||||
}{
|
||||
runtimes: make(map[string]NewRuntimeFunc),
|
||||
}
|
||||
|
||||
// RegisterRuntime is not external packages registers Runtimes for use with containerd
|
||||
func RegisterRuntime(name string, f NewRuntimeFunc) {
|
||||
runtimeRegistration.mu.Lock()
|
||||
defer runtimeRegistration.mu.Unlock()
|
||||
if _, ok := runtimeRegistration.runtimes[name]; ok {
|
||||
panic(fmt.Errorf("runtime already registered as %q", name))
|
||||
}
|
||||
runtimeRegistration.runtimes[name] = f
|
||||
}
|
||||
|
||||
// Runtimes returns a slice of all registered runtime names for containerd
|
||||
func Runtimes() (o []string) {
|
||||
runtimeRegistration.mu.Lock()
|
||||
defer runtimeRegistration.mu.Unlock()
|
||||
|
||||
for k := range runtimeRegistration.runtimes {
|
||||
o = append(o, k)
|
||||
}
|
||||
return o
|
||||
}
|
||||
|
||||
// NewRuntime calls the runtime's constructor with the provided root
|
||||
func NewRuntime(name, root string) (Runtime, error) {
|
||||
runtimeRegistration.mu.Lock()
|
||||
defer runtimeRegistration.mu.Unlock()
|
||||
f, ok := runtimeRegistration.runtimes[name]
|
||||
if !ok {
|
||||
return nil, ErrRuntimeNotExist
|
||||
}
|
||||
return f(root)
|
||||
}
|
||||
import "golang.org/x/net/context"
|
||||
|
||||
type IO struct {
|
||||
Stdin string
|
||||
|
@ -73,7 +26,7 @@ type Runtime interface {
|
|||
// Containers returns all the current containers for the runtime
|
||||
Containers() ([]Container, error)
|
||||
// Delete removes the container in the runtime
|
||||
Delete(ctx context.Context, c Container) error
|
||||
Delete(context.Context, Container) error
|
||||
// Events returns events for the runtime and all containers created by the runtime
|
||||
Events(context.Context) <-chan *Event
|
||||
}
|
||||
|
|
|
@ -4,23 +4,39 @@ import (
|
|||
"errors"
|
||||
"io"
|
||||
|
||||
contentapi "github.com/docker/containerd/api/services/content"
|
||||
"github.com/docker/containerd"
|
||||
api "github.com/docker/containerd/api/services/content"
|
||||
"github.com/docker/containerd/content"
|
||||
"golang.org/x/net/context"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
)
|
||||
|
||||
type Service struct {
|
||||
store *Store
|
||||
store *content.Store
|
||||
}
|
||||
|
||||
var _ contentapi.ContentServer = &Service{}
|
||||
var _ api.ContentServer = &Service{}
|
||||
|
||||
func NewService(store *Store) contentapi.ContentServer {
|
||||
return &Service{store: store}
|
||||
func init() {
|
||||
containerd.Register("content-grpc", &containerd.Registration{
|
||||
Type: containerd.GRPCPlugin,
|
||||
Init: NewService,
|
||||
})
|
||||
}
|
||||
|
||||
func (s *Service) Info(ctx context.Context, req *contentapi.InfoRequest) (*contentapi.InfoResponse, error) {
|
||||
func NewService(ic *containerd.InitContext) (interface{}, error) {
|
||||
return &Service{
|
||||
store: ic.Store,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *Service) Register(server *grpc.Server) error {
|
||||
api.RegisterContentServer(server, s)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) Info(ctx context.Context, req *api.InfoRequest) (*api.InfoResponse, error) {
|
||||
if err := req.Digest.Validate(); err != nil {
|
||||
return nil, grpc.Errorf(codes.InvalidArgument, "%q failed validation", req.Digest)
|
||||
}
|
||||
|
@ -30,14 +46,14 @@ func (s *Service) Info(ctx context.Context, req *contentapi.InfoRequest) (*conte
|
|||
return nil, maybeNotFoundGRPC(err, req.Digest.String())
|
||||
}
|
||||
|
||||
return &contentapi.InfoResponse{
|
||||
return &api.InfoResponse{
|
||||
Digest: req.Digest,
|
||||
Size_: bi.Size,
|
||||
CommittedAt: bi.CommittedAt,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *Service) Read(req *contentapi.ReadRequest, session contentapi.Content_ReadServer) error {
|
||||
func (s *Service) Read(req *api.ReadRequest, session api.Content_ReadServer) error {
|
||||
if err := req.Digest.Validate(); err != nil {
|
||||
return grpc.Errorf(codes.InvalidArgument, "%v: %v", req.Digest, err)
|
||||
}
|
||||
|
@ -68,9 +84,9 @@ func (s *Service) Read(req *contentapi.ReadRequest, session contentapi.Content_R
|
|||
|
||||
// TODO(stevvooe): Using the global buffer pool. At 32KB, it is probably
|
||||
// little inefficient for work over a fast network. We can tune this later.
|
||||
p = bufPool.Get().([]byte)
|
||||
p = content.BufPool.Get().([]byte)
|
||||
)
|
||||
defer bufPool.Put(p)
|
||||
defer content.BufPool.Put(p)
|
||||
|
||||
if offset < 0 {
|
||||
offset = 0
|
||||
|
@ -95,11 +111,11 @@ func (s *Service) Read(req *contentapi.ReadRequest, session contentapi.Content_R
|
|||
|
||||
type readResponseWriter struct {
|
||||
offset int64
|
||||
session contentapi.Content_ReadServer
|
||||
session api.Content_ReadServer
|
||||
}
|
||||
|
||||
func (rw *readResponseWriter) Write(p []byte) (n int, err error) {
|
||||
if err := rw.session.Send(&contentapi.ReadResponse{
|
||||
if err := rw.session.Send(&api.ReadResponse{
|
||||
Offset: rw.offset,
|
||||
Data: p,
|
||||
}); err != nil {
|
||||
|
@ -110,14 +126,14 @@ func (rw *readResponseWriter) Write(p []byte) (n int, err error) {
|
|||
return len(p), nil
|
||||
}
|
||||
|
||||
func (s *Service) Write(session contentapi.Content_WriteServer) (err error) {
|
||||
func (s *Service) Write(session api.Content_WriteServer) (err error) {
|
||||
var (
|
||||
ref string
|
||||
msg contentapi.WriteResponse
|
||||
req *contentapi.WriteRequest
|
||||
msg api.WriteResponse
|
||||
req *api.WriteRequest
|
||||
)
|
||||
|
||||
defer func(msg *contentapi.WriteResponse) {
|
||||
defer func(msg *api.WriteResponse) {
|
||||
// pump through the last message if no error was encountered
|
||||
if err != nil {
|
||||
return
|
||||
|
@ -153,7 +169,7 @@ func (s *Service) Write(session contentapi.Content_WriteServer) (err error) {
|
|||
// cost of the move when they collide.
|
||||
if req.ExpectedDigest != "" {
|
||||
if _, err := s.store.Info(req.ExpectedDigest); err != nil {
|
||||
if !IsNotFound(err) {
|
||||
if !content.IsNotFound(err) {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -172,9 +188,9 @@ func (s *Service) Write(session contentapi.Content_WriteServer) (err error) {
|
|||
msg.UpdatedAt = ws.UpdatedAt
|
||||
|
||||
switch req.Action {
|
||||
case contentapi.WriteActionStat:
|
||||
case api.WriteActionStat:
|
||||
msg.Digest = wr.Digest()
|
||||
case contentapi.WriteActionWrite, contentapi.WriteActionCommit:
|
||||
case api.WriteActionWrite, api.WriteActionCommit:
|
||||
if req.Offset > 0 {
|
||||
// validate the offset if provided
|
||||
if req.Offset != ws.Offset {
|
||||
|
@ -200,10 +216,10 @@ func (s *Service) Write(session contentapi.Content_WriteServer) (err error) {
|
|||
msg.Offset += int64(n)
|
||||
}
|
||||
|
||||
if req.Action == contentapi.WriteActionCommit {
|
||||
if req.Action == api.WriteActionCommit {
|
||||
return wr.Commit(req.ExpectedSize, req.ExpectedDigest)
|
||||
}
|
||||
case contentapi.WriteActionAbort:
|
||||
case api.WriteActionAbort:
|
||||
return s.store.Abort(ref)
|
||||
}
|
||||
|
||||
|
@ -220,12 +236,12 @@ func (s *Service) Write(session contentapi.Content_WriteServer) (err error) {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) Status(*contentapi.StatusRequest, contentapi.Content_StatusServer) error {
|
||||
func (s *Service) Status(*api.StatusRequest, api.Content_StatusServer) error {
|
||||
return grpc.Errorf(codes.Unimplemented, "not implemented")
|
||||
}
|
||||
|
||||
func maybeNotFoundGRPC(err error, id string) error {
|
||||
if IsNotFound(err) {
|
||||
if content.IsNotFound(err) {
|
||||
return grpc.Errorf(codes.NotFound, "%v: not found", id)
|
||||
}
|
||||
|
|
@ -1,15 +1,17 @@
|
|||
package containerd
|
||||
package execution
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/docker/containerd"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
func newCollector(ctx context.Context, runtimes map[string]Runtime) (*collector, error) {
|
||||
func newCollector(ctx context.Context, runtimes map[string]containerd.Runtime) (*collector, error) {
|
||||
c := &collector{
|
||||
context: ctx,
|
||||
ch: make(chan *Event, 2048),
|
||||
ch: make(chan *containerd.Event, 2048),
|
||||
eventClients: make(map[*eventClient]struct{}),
|
||||
}
|
||||
for _, r := range runtimes {
|
||||
|
@ -27,7 +29,7 @@ func newCollector(ctx context.Context, runtimes map[string]Runtime) (*collector,
|
|||
|
||||
type eventClient struct {
|
||||
eCh chan error
|
||||
w EventWriter
|
||||
w *grpcEventWriter
|
||||
}
|
||||
|
||||
type collector struct {
|
||||
|
@ -35,12 +37,12 @@ type collector struct {
|
|||
wg sync.WaitGroup
|
||||
|
||||
context context.Context
|
||||
ch chan *Event
|
||||
ch chan *containerd.Event
|
||||
eventClients map[*eventClient]struct{}
|
||||
}
|
||||
|
||||
// collect collects events from the provided runtime
|
||||
func (c *collector) collect(r Runtime) error {
|
||||
func (c *collector) collect(r containerd.Runtime) error {
|
||||
c.wg.Add(1)
|
||||
go func() {
|
||||
defer c.wg.Done()
|
||||
|
@ -51,12 +53,7 @@ func (c *collector) collect(r Runtime) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Forward forwards all events from the collector to the EventWriters
|
||||
//
|
||||
// It forwards events until the channels are closed or the EventWriter
|
||||
// returns an error
|
||||
// This is a blocking call
|
||||
func (c *collector) forward(w EventWriter) error {
|
||||
func (c *collector) forward(w *grpcEventWriter) error {
|
||||
client := &eventClient{
|
||||
w: w,
|
||||
eCh: make(chan error, 1),
|
|
@ -1,11 +1,14 @@
|
|||
package execution
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/docker/containerd"
|
||||
api "github.com/docker/containerd/api/services/execution"
|
||||
"github.com/docker/containerd/api/types/container"
|
||||
google_protobuf "github.com/golang/protobuf/ptypes/empty"
|
||||
"golang.org/x/net/context"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -13,15 +16,46 @@ var (
|
|||
empty = &google_protobuf.Empty{}
|
||||
)
|
||||
|
||||
// New creates a new GRPC service for the ContainerService
|
||||
func New(s *containerd.Supervisor) *Service {
|
||||
return &Service{
|
||||
s: s,
|
||||
func init() {
|
||||
containerd.Register("runtime-grpc", &containerd.Registration{
|
||||
Type: containerd.GRPCPlugin,
|
||||
Init: New,
|
||||
})
|
||||
}
|
||||
|
||||
func New(ic *containerd.InitContext) (interface{}, error) {
|
||||
c, err := newCollector(ic.Context, ic.Runtimes)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &Service{
|
||||
runtimes: ic.Runtimes,
|
||||
containers: make(map[string]containerd.Container),
|
||||
collector: c,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type Service struct {
|
||||
s *containerd.Supervisor
|
||||
mu sync.Mutex
|
||||
|
||||
runtimes map[string]containerd.Runtime
|
||||
containers map[string]containerd.Container
|
||||
collector *collector
|
||||
}
|
||||
|
||||
func (s *Service) Register(server *grpc.Server) error {
|
||||
api.RegisterContainerServiceServer(server, s)
|
||||
// load all containers
|
||||
for _, r := range s.runtimes {
|
||||
containers, err := r.Containers()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, c := range containers {
|
||||
s.containers[c.Info().ID] = c
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) Create(ctx context.Context, r *api.CreateRequest) (*api.CreateResponse, error) {
|
||||
|
@ -41,13 +75,28 @@ func (s *Service) Create(ctx context.Context, r *api.CreateRequest) (*api.Create
|
|||
Options: m.Options,
|
||||
})
|
||||
}
|
||||
c, err := s.s.Create(ctx, r.ID, r.Runtime, opts)
|
||||
runtime, err := s.getRuntime(r.Runtime)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s.mu.Lock()
|
||||
if _, ok := s.containers[r.ID]; ok {
|
||||
s.mu.Unlock()
|
||||
return nil, containerd.ErrContainerExists
|
||||
}
|
||||
c, err := runtime.Create(ctx, r.ID, opts)
|
||||
if err != nil {
|
||||
s.mu.Unlock()
|
||||
return nil, err
|
||||
}
|
||||
s.containers[r.ID] = c
|
||||
s.mu.Unlock()
|
||||
state, err := c.State(ctx)
|
||||
if err != nil {
|
||||
s.s.Delete(ctx, r.ID)
|
||||
s.mu.Lock()
|
||||
delete(s.containers, r.ID)
|
||||
runtime.Delete(ctx, c)
|
||||
s.mu.Unlock()
|
||||
return nil, err
|
||||
}
|
||||
return &api.CreateResponse{
|
||||
|
@ -57,7 +106,7 @@ func (s *Service) Create(ctx context.Context, r *api.CreateRequest) (*api.Create
|
|||
}
|
||||
|
||||
func (s *Service) Start(ctx context.Context, r *api.StartRequest) (*google_protobuf.Empty, error) {
|
||||
c, err := s.s.Get(r.ID)
|
||||
c, err := s.getContainer(r.ID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -68,7 +117,15 @@ func (s *Service) Start(ctx context.Context, r *api.StartRequest) (*google_proto
|
|||
}
|
||||
|
||||
func (s *Service) Delete(ctx context.Context, r *api.DeleteRequest) (*google_protobuf.Empty, error) {
|
||||
if err := s.s.Delete(ctx, r.ID); err != nil {
|
||||
c, err := s.getContainer(r.ID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
runtime, err := s.getRuntime(c.Info().Runtime)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := runtime.Delete(ctx, c); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return empty, nil
|
||||
|
@ -76,7 +133,9 @@ func (s *Service) Delete(ctx context.Context, r *api.DeleteRequest) (*google_pro
|
|||
|
||||
func (s *Service) List(ctx context.Context, r *api.ListRequest) (*api.ListResponse, error) {
|
||||
resp := &api.ListResponse{}
|
||||
for _, c := range s.s.Containers() {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
for _, c := range s.containers {
|
||||
state, err := c.State(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -105,7 +164,25 @@ func (s *Service) Events(r *api.EventsRequest, server api.ContainerService_Event
|
|||
w := &grpcEventWriter{
|
||||
server: server,
|
||||
}
|
||||
return s.s.ForwardEvents(w)
|
||||
return s.collector.forward(w)
|
||||
}
|
||||
|
||||
func (s *Service) getContainer(id string) (containerd.Container, error) {
|
||||
s.mu.Lock()
|
||||
c, ok := s.containers[id]
|
||||
s.mu.Unlock()
|
||||
if !ok {
|
||||
return nil, containerd.ErrContainerNotExist
|
||||
}
|
||||
return c, nil
|
||||
}
|
||||
|
||||
func (s *Service) getRuntime(name string) (containerd.Runtime, error) {
|
||||
runtime, ok := s.runtimes[name]
|
||||
if !ok {
|
||||
return nil, containerd.ErrUnknownRuntime
|
||||
}
|
||||
return runtime, nil
|
||||
}
|
||||
|
||||
type grpcEventWriter struct {
|
||||
|
|
100
supervisor.go
100
supervisor.go
|
@ -1,100 +0,0 @@
|
|||
package containerd
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
func NewSupervisor(ctx context.Context, runtimes map[string]Runtime) (*Supervisor, error) {
|
||||
c, err := newCollector(ctx, runtimes)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s := &Supervisor{
|
||||
containers: make(map[string]Container),
|
||||
runtimes: runtimes,
|
||||
collector: c,
|
||||
}
|
||||
for _, r := range runtimes {
|
||||
containers, err := r.Containers()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, c := range containers {
|
||||
s.containers[c.Info().ID] = c
|
||||
}
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// Supervisor supervises containers and events from multiple runtimes
|
||||
type Supervisor struct {
|
||||
mu sync.Mutex
|
||||
|
||||
containers map[string]Container
|
||||
runtimes map[string]Runtime
|
||||
collector *collector
|
||||
}
|
||||
|
||||
// ForwardEvents is a blocking method that will forward all events from the supervisor
|
||||
// to the EventWriter provided by the caller
|
||||
func (s *Supervisor) ForwardEvents(w EventWriter) error {
|
||||
return s.collector.forward(w)
|
||||
}
|
||||
|
||||
// Create creates a new container with the provided runtime
|
||||
func (s *Supervisor) Create(ctx context.Context, id, runtime string, opts CreateOpts) (Container, error) {
|
||||
r, ok := s.runtimes[runtime]
|
||||
if !ok {
|
||||
return nil, ErrUnknownRuntime
|
||||
}
|
||||
// check to make sure the container's id is unique across the entire system
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if _, ok := s.containers[id]; ok {
|
||||
return nil, ErrContainerExists
|
||||
}
|
||||
c, err := r.Create(ctx, id, opts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s.containers[c.Info().ID] = c
|
||||
return c, nil
|
||||
}
|
||||
|
||||
// Delete deletes the container
|
||||
func (s *Supervisor) Delete(ctx context.Context, id string) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
c, ok := s.containers[id]
|
||||
if !ok {
|
||||
return ErrContainerNotExist
|
||||
}
|
||||
err := s.runtimes[c.Info().Runtime].Delete(ctx, c)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
delete(s.containers, id)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Containers returns all the containers for the supervisor
|
||||
func (s *Supervisor) Containers() (o []Container) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
for _, c := range s.containers {
|
||||
o = append(o, c)
|
||||
}
|
||||
return o
|
||||
}
|
||||
|
||||
func (s *Supervisor) Get(id string) (Container, error) {
|
||||
s.mu.Lock()
|
||||
c, ok := s.containers[id]
|
||||
s.mu.Unlock()
|
||||
if !ok {
|
||||
return nil, ErrContainerNotExist
|
||||
}
|
||||
return c, nil
|
||||
}
|
Loading…
Reference in a new issue