Merge pull request #43 from LK4D4/supervisor_package

Move supervisor to it's own package
This commit is contained in:
Michael Crosby 2015-12-17 16:23:47 -08:00
commit 1d63236c27
24 changed files with 61 additions and 59 deletions

View file

@ -8,19 +8,19 @@ import (
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
"github.com/docker/containerd"
"github.com/docker/containerd/api/grpc/types" "github.com/docker/containerd/api/grpc/types"
"github.com/docker/containerd/runtime" "github.com/docker/containerd/runtime"
"github.com/docker/containerd/supervisor"
"github.com/opencontainers/specs" "github.com/opencontainers/specs"
"golang.org/x/net/context" "golang.org/x/net/context"
) )
type apiServer struct { type apiServer struct {
sv *containerd.Supervisor sv *supervisor.Supervisor
} }
// NewServer returns grpc server instance // NewServer returns grpc server instance
func NewServer(sv *containerd.Supervisor) types.APIServer { func NewServer(sv *supervisor.Supervisor) types.APIServer {
return &apiServer{ return &apiServer{
sv: sv, sv: sv,
} }
@ -30,14 +30,14 @@ func (s *apiServer) CreateContainer(ctx context.Context, c *types.CreateContaine
if c.BundlePath == "" { if c.BundlePath == "" {
return nil, errors.New("empty bundle path") return nil, errors.New("empty bundle path")
} }
e := containerd.NewEvent(containerd.StartContainerEventType) e := supervisor.NewEvent(supervisor.StartContainerEventType)
e.ID = c.Id e.ID = c.Id
e.BundlePath = c.BundlePath e.BundlePath = c.BundlePath
e.Stdout = c.Stdout e.Stdout = c.Stdout
e.Stderr = c.Stderr e.Stderr = c.Stderr
e.Stdin = c.Stdin e.Stdin = c.Stdin
e.Console = c.Console e.Console = c.Console
e.StartResponse = make(chan containerd.StartResponse, 1) e.StartResponse = make(chan supervisor.StartResponse, 1)
if c.Checkpoint != "" { if c.Checkpoint != "" {
e.Checkpoint = &runtime.Checkpoint{ e.Checkpoint = &runtime.Checkpoint{
Name: c.Checkpoint, Name: c.Checkpoint,
@ -54,7 +54,7 @@ func (s *apiServer) CreateContainer(ctx context.Context, c *types.CreateContaine
} }
func (s *apiServer) Signal(ctx context.Context, r *types.SignalRequest) (*types.SignalResponse, error) { func (s *apiServer) Signal(ctx context.Context, r *types.SignalRequest) (*types.SignalResponse, error) {
e := containerd.NewEvent(containerd.SignalEventType) e := supervisor.NewEvent(supervisor.SignalEventType)
e.ID = r.Id e.ID = r.Id
e.Pid = int(r.Pid) e.Pid = int(r.Pid)
e.Signal = syscall.Signal(int(r.Signal)) e.Signal = syscall.Signal(int(r.Signal))
@ -77,7 +77,7 @@ func (s *apiServer) AddProcess(ctx context.Context, r *types.AddProcessRequest)
AdditionalGids: r.User.AdditionalGids, AdditionalGids: r.User.AdditionalGids,
}, },
} }
e := containerd.NewEvent(containerd.AddProcessEventType) e := supervisor.NewEvent(supervisor.AddProcessEventType)
e.ID = r.Id e.ID = r.Id
e.Process = process e.Process = process
e.Console = r.Console e.Console = r.Console
@ -92,7 +92,7 @@ func (s *apiServer) AddProcess(ctx context.Context, r *types.AddProcessRequest)
} }
func (s *apiServer) CreateCheckpoint(ctx context.Context, r *types.CreateCheckpointRequest) (*types.CreateCheckpointResponse, error) { func (s *apiServer) CreateCheckpoint(ctx context.Context, r *types.CreateCheckpointRequest) (*types.CreateCheckpointResponse, error) {
e := containerd.NewEvent(containerd.CreateCheckpointEventType) e := supervisor.NewEvent(supervisor.CreateCheckpointEventType)
e.ID = r.Id e.ID = r.Id
e.Checkpoint = &runtime.Checkpoint{ e.Checkpoint = &runtime.Checkpoint{
Name: r.Checkpoint.Name, Name: r.Checkpoint.Name,
@ -112,7 +112,7 @@ func (s *apiServer) DeleteCheckpoint(ctx context.Context, r *types.DeleteCheckpo
if r.Name == "" { if r.Name == "" {
return nil, errors.New("checkpoint name cannot be empty") return nil, errors.New("checkpoint name cannot be empty")
} }
e := containerd.NewEvent(containerd.DeleteCheckpointEventType) e := supervisor.NewEvent(supervisor.DeleteCheckpointEventType)
e.ID = r.Id e.ID = r.Id
e.Checkpoint = &runtime.Checkpoint{ e.Checkpoint = &runtime.Checkpoint{
Name: r.Name, Name: r.Name,
@ -125,7 +125,7 @@ func (s *apiServer) DeleteCheckpoint(ctx context.Context, r *types.DeleteCheckpo
} }
func (s *apiServer) ListCheckpoint(ctx context.Context, r *types.ListCheckpointRequest) (*types.ListCheckpointResponse, error) { func (s *apiServer) ListCheckpoint(ctx context.Context, r *types.ListCheckpointRequest) (*types.ListCheckpointResponse, error) {
e := containerd.NewEvent(containerd.GetContainerEventType) e := supervisor.NewEvent(supervisor.GetContainerEventType)
s.sv.SendEvent(e) s.sv.SendEvent(e)
if err := <-e.Err; err != nil { if err := <-e.Err; err != nil {
return nil, err return nil, err
@ -159,7 +159,7 @@ func (s *apiServer) ListCheckpoint(ctx context.Context, r *types.ListCheckpointR
} }
func (s *apiServer) State(ctx context.Context, r *types.StateRequest) (*types.StateResponse, error) { func (s *apiServer) State(ctx context.Context, r *types.StateRequest) (*types.StateResponse, error) {
e := containerd.NewEvent(containerd.GetContainerEventType) e := supervisor.NewEvent(supervisor.GetContainerEventType)
s.sv.SendEvent(e) s.sv.SendEvent(e)
if err := <-e.Err; err != nil { if err := <-e.Err; err != nil {
return nil, err return nil, err
@ -208,7 +208,7 @@ func (s *apiServer) State(ctx context.Context, r *types.StateRequest) (*types.St
} }
func (s *apiServer) UpdateContainer(ctx context.Context, r *types.UpdateContainerRequest) (*types.UpdateContainerResponse, error) { func (s *apiServer) UpdateContainer(ctx context.Context, r *types.UpdateContainerRequest) (*types.UpdateContainerResponse, error) {
e := containerd.NewEvent(containerd.UpdateContainerEventType) e := supervisor.NewEvent(supervisor.UpdateContainerEventType)
e.ID = r.Id e.ID = r.Id
if r.Signal != 0 { if r.Signal != 0 {
e.Signal = syscall.Signal(r.Signal) e.Signal = syscall.Signal(r.Signal)
@ -229,14 +229,14 @@ func (s *apiServer) Events(r *types.EventsRequest, stream types.API_EventsServer
for evt := range events { for evt := range events {
var ev *types.Event var ev *types.Event
switch evt.Type { switch evt.Type {
case containerd.ExitEventType, containerd.ExecExitEventType: case supervisor.ExitEventType, supervisor.ExecExitEventType:
ev = &types.Event{ ev = &types.Event{
Type: "exit", Type: "exit",
Id: evt.ID, Id: evt.ID,
Pid: uint32(evt.Pid), Pid: uint32(evt.Pid),
Status: uint32(evt.Status), Status: uint32(evt.Status),
} }
case containerd.OOMEventType: case supervisor.OOMEventType:
ev = &types.Event{ ev = &types.Event{
Type: "oom", Type: "oom",
Id: evt.ID, Id: evt.ID,
@ -253,17 +253,17 @@ func (s *apiServer) Events(r *types.EventsRequest, stream types.API_EventsServer
} }
func (s *apiServer) GetStats(r *types.StatsRequest, stream types.API_GetStatsServer) error { func (s *apiServer) GetStats(r *types.StatsRequest, stream types.API_GetStatsServer) error {
e := containerd.NewEvent(containerd.StatsEventType) e := supervisor.NewEvent(supervisor.StatsEventType)
e.ID = r.Id e.ID = r.Id
s.sv.SendEvent(e) s.sv.SendEvent(e)
if err := <-e.Err; err != nil { if err := <-e.Err; err != nil {
if err == containerd.ErrContainerNotFound { if err == supervisor.ErrContainerNotFound {
return grpc.Errorf(codes.NotFound, err.Error()) return grpc.Errorf(codes.NotFound, err.Error())
} }
return err return err
} }
defer func() { defer func() {
ue := containerd.NewEvent(containerd.UnsubscribeStatsEventType) ue := supervisor.NewEvent(supervisor.UnsubscribeStatsEventType)
ue.ID = e.ID ue.ID = e.ID
ue.Stats = e.Stats ue.Stats = e.Stats
s.sv.SendEvent(ue) s.sv.SendEvent(ue)

View file

@ -1,4 +0,0 @@
package containerd
// DefaultBufferSize is the default size for a channel's buffer
const DefaultBufferSize = 2048

View file

@ -17,6 +17,7 @@ import (
"github.com/docker/containerd" "github.com/docker/containerd"
"github.com/docker/containerd/api/grpc/server" "github.com/docker/containerd/api/grpc/server"
"github.com/docker/containerd/api/grpc/types" "github.com/docker/containerd/api/grpc/types"
"github.com/docker/containerd/supervisor"
"github.com/docker/containerd/util" "github.com/docker/containerd/util"
"github.com/rcrowley/go-metrics" "github.com/rcrowley/go-metrics"
) )
@ -125,7 +126,7 @@ func checkLimits() error {
} }
func debugMetrics(interval time.Duration, graphiteAddr string) error { func debugMetrics(interval time.Duration, graphiteAddr string) error {
for name, m := range containerd.Metrics() { for name, m := range supervisor.Metrics() {
if err := metrics.DefaultRegistry.Register(name, m); err != nil { if err := metrics.DefaultRegistry.Register(name, m); err != nil {
return err return err
} }
@ -166,15 +167,15 @@ func processMetrics() {
} }
func daemon(id, address, stateDir string, concurrency int, oom bool) error { func daemon(id, address, stateDir string, concurrency int, oom bool) error {
tasks := make(chan *containerd.StartTask, concurrency*100) tasks := make(chan *supervisor.StartTask, concurrency*100)
supervisor, err := containerd.NewSupervisor(id, stateDir, tasks, oom) sv, err := supervisor.New(id, stateDir, tasks, oom)
if err != nil { if err != nil {
return err return err
} }
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
for i := 0; i < concurrency; i++ { for i := 0; i < concurrency; i++ {
wg.Add(1) wg.Add(1)
w := containerd.NewWorker(supervisor, wg) w := supervisor.NewWorker(sv, wg)
go w.Start() go w.Start()
} }
// only set containerd as the subreaper if it is not an init process // only set containerd as the subreaper if it is not an init process
@ -187,8 +188,8 @@ func daemon(id, address, stateDir string, concurrency int, oom bool) error {
} }
} }
// start the signal handler in the background. // start the signal handler in the background.
go startSignalHandler(supervisor, containerd.DefaultBufferSize) go startSignalHandler(sv)
if err := supervisor.Start(); err != nil { if err := sv.Start(); err != nil {
return err return err
} }
if err := os.RemoveAll(address); err != nil { if err := os.RemoveAll(address); err != nil {
@ -199,7 +200,7 @@ func daemon(id, address, stateDir string, concurrency int, oom bool) error {
return err return err
} }
s := grpc.NewServer() s := grpc.NewServer()
types.RegisterAPIServer(s, server.NewServer(supervisor)) types.RegisterAPIServer(s, server.NewServer(sv))
logrus.Debugf("GRPC API listen on %s", address) logrus.Debugf("GRPC API listen on %s", address)
return s.Serve(l) return s.Serve(l)
} }

View file

@ -8,16 +8,18 @@ import (
"syscall" "syscall"
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
"github.com/docker/containerd" "github.com/docker/containerd/supervisor"
"github.com/docker/containerd/util" "github.com/docker/containerd/util"
"github.com/opencontainers/runc/libcontainer/utils" "github.com/opencontainers/runc/libcontainer/utils"
) )
func startSignalHandler(supervisor *containerd.Supervisor, bufferSize int) { const signalBufferSize = 2048
func startSignalHandler(supervisor *supervisor.Supervisor) {
logrus.WithFields(logrus.Fields{ logrus.WithFields(logrus.Fields{
"bufferSize": bufferSize, "bufferSize": signalBufferSize,
}).Debug("containerd: starting signal handler") }).Debug("containerd: starting signal handler")
signals := make(chan os.Signal, bufferSize) signals := make(chan os.Signal, signalBufferSize)
signal.Notify(signals) signal.Notify(signals)
for s := range signals { for s := range signals {
switch s { switch s {
@ -37,7 +39,7 @@ func startSignalHandler(supervisor *containerd.Supervisor, bufferSize int) {
os.Exit(0) os.Exit(0)
} }
func reap() (exits []*containerd.Event, err error) { func reap() (exits []*supervisor.Event, err error) {
var ( var (
ws syscall.WaitStatus ws syscall.WaitStatus
rus syscall.Rusage rus syscall.Rusage
@ -53,7 +55,7 @@ func reap() (exits []*containerd.Event, err error) {
if pid <= 0 { if pid <= 0 {
return exits, nil return exits, nil
} }
e := containerd.NewEvent(containerd.ExitEventType) e := supervisor.NewEvent(supervisor.ExitEventType)
e.Pid = pid e.Pid = pid
e.Status = utils.ExitStatus(ws) e.Status = utils.ExitStatus(ws)
exits = append(exits, e) exits = append(exits, e)

View file

@ -1,4 +1,4 @@
package containerd package supervisor
import "github.com/Sirupsen/logrus" import "github.com/Sirupsen/logrus"

View file

@ -1,4 +1,4 @@
package containerd package supervisor
type CreateCheckpointEvent struct { type CreateCheckpointEvent struct {
s *Supervisor s *Supervisor

View file

@ -1,4 +1,4 @@
package containerd package supervisor
type StartEvent struct { type StartEvent struct {
s *Supervisor s *Supervisor

View file

@ -1,4 +1,4 @@
package containerd package supervisor
import ( import (
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"

View file

@ -1,4 +1,4 @@
package containerd package supervisor
import "errors" import "errors"

View file

@ -1,4 +1,4 @@
package containerd package supervisor
import ( import (
"os" "os"

View file

@ -1,4 +1,4 @@
package containerd package supervisor
import "github.com/Sirupsen/logrus" import "github.com/Sirupsen/logrus"

View file

@ -1,4 +1,4 @@
package containerd package supervisor
type GetContainersEvent struct { type GetContainersEvent struct {
s *Supervisor s *Supervisor

View file

@ -1,4 +1,4 @@
package containerd package supervisor
import ( import (
"io" "io"

View file

@ -1,4 +1,4 @@
package containerd package supervisor
import "github.com/cloudfoundry/gosigar" import "github.com/cloudfoundry/gosigar"

View file

@ -1,4 +1,4 @@
package containerd package supervisor
import ( import (
"reflect" "reflect"

View file

@ -1,4 +1,4 @@
package containerd package supervisor
type SignalEvent struct { type SignalEvent struct {
s *Supervisor s *Supervisor

View file

@ -1,4 +1,4 @@
package containerd package supervisor
import "github.com/rcrowley/go-metrics" import "github.com/rcrowley/go-metrics"

View file

@ -1,4 +1,4 @@
package containerd package supervisor
import ( import (
"bufio" "bufio"

View file

@ -1,4 +1,4 @@
package containerd package supervisor
import ( import (
"os" "os"
@ -14,10 +14,13 @@ import (
"github.com/opencontainers/runc/libcontainer" "github.com/opencontainers/runc/libcontainer"
) )
const statsInterval = 1 * time.Second const (
statsInterval = 1 * time.Second
defaultBufferSize = 2048 // size of queue in eventloop
)
// NewSupervisor returns an initialized Process supervisor. // New returns an initialized Process supervisor.
func NewSupervisor(id, stateDir string, tasks chan *StartTask, oom bool) (*Supervisor, error) { func New(id, stateDir string, tasks chan *StartTask, oom bool) (*Supervisor, error) {
if err := os.MkdirAll(stateDir, 0755); err != nil { if err := os.MkdirAll(stateDir, 0755); err != nil {
return nil, err return nil, err
} }
@ -39,7 +42,7 @@ func NewSupervisor(id, stateDir string, tasks chan *StartTask, oom bool) (*Super
machine: machine, machine: machine,
subscribers: make(map[chan *Event]struct{}), subscribers: make(map[chan *Event]struct{}),
statsCollector: newStatsCollector(statsInterval), statsCollector: newStatsCollector(statsInterval),
el: eventloop.NewChanLoop(DefaultBufferSize), el: eventloop.NewChanLoop(defaultBufferSize),
} }
if oom { if oom {
s.notifier = newNotifier(s) s.notifier = newNotifier(s)
@ -138,7 +141,7 @@ func (s *Supervisor) Close() error {
func (s *Supervisor) Events() chan *Event { func (s *Supervisor) Events() chan *Event {
s.subscriberLock.Lock() s.subscriberLock.Lock()
defer s.subscriberLock.Unlock() defer s.subscriberLock.Unlock()
c := make(chan *Event, DefaultBufferSize) c := make(chan *Event, defaultBufferSize)
EventSubscriberCounter.Inc(1) EventSubscriberCounter.Inc(1)
s.subscribers[c] = struct{}{} s.subscribers[c] = struct{}{}
return c return c

View file

@ -1,6 +1,6 @@
// +build libcontainer // +build libcontainer
package containerd package supervisor
import ( import (
"github.com/docker/containerd/linux" "github.com/docker/containerd/linux"

View file

@ -1,6 +1,6 @@
// +build runc // +build runc
package containerd package supervisor
import ( import (
"github.com/docker/containerd/runc" "github.com/docker/containerd/runc"

View file

@ -1,6 +1,6 @@
// +build !libcontainer,!runc // +build !libcontainer,!runc
package containerd package supervisor
import ( import (
"errors" "errors"

View file

@ -1,4 +1,4 @@
package containerd package supervisor
import "github.com/docker/containerd/runtime" import "github.com/docker/containerd/runtime"

View file

@ -1,4 +1,4 @@
package containerd package supervisor
import ( import (
"sync" "sync"