diff --git a/api/grpc/server/server.go b/api/grpc/server/server.go index 371fe71..7e4461b 100644 --- a/api/grpc/server/server.go +++ b/api/grpc/server/server.go @@ -8,19 +8,19 @@ import ( "google.golang.org/grpc/codes" "github.com/Sirupsen/logrus" - "github.com/docker/containerd" "github.com/docker/containerd/api/grpc/types" "github.com/docker/containerd/runtime" + "github.com/docker/containerd/supervisor" "github.com/opencontainers/specs" "golang.org/x/net/context" ) type apiServer struct { - sv *containerd.Supervisor + sv *supervisor.Supervisor } // NewServer returns grpc server instance -func NewServer(sv *containerd.Supervisor) types.APIServer { +func NewServer(sv *supervisor.Supervisor) types.APIServer { return &apiServer{ sv: sv, } @@ -30,14 +30,14 @@ func (s *apiServer) CreateContainer(ctx context.Context, c *types.CreateContaine if c.BundlePath == "" { return nil, errors.New("empty bundle path") } - e := containerd.NewEvent(containerd.StartContainerEventType) + e := supervisor.NewEvent(supervisor.StartContainerEventType) e.ID = c.Id e.BundlePath = c.BundlePath e.Stdout = c.Stdout e.Stderr = c.Stderr e.Stdin = c.Stdin e.Console = c.Console - e.StartResponse = make(chan containerd.StartResponse, 1) + e.StartResponse = make(chan supervisor.StartResponse, 1) if c.Checkpoint != "" { e.Checkpoint = &runtime.Checkpoint{ Name: c.Checkpoint, @@ -54,7 +54,7 @@ func (s *apiServer) CreateContainer(ctx context.Context, c *types.CreateContaine } func (s *apiServer) Signal(ctx context.Context, r *types.SignalRequest) (*types.SignalResponse, error) { - e := containerd.NewEvent(containerd.SignalEventType) + e := supervisor.NewEvent(supervisor.SignalEventType) e.ID = r.Id e.Pid = int(r.Pid) e.Signal = syscall.Signal(int(r.Signal)) @@ -77,7 +77,7 @@ func (s *apiServer) AddProcess(ctx context.Context, r *types.AddProcessRequest) AdditionalGids: r.User.AdditionalGids, }, } - e := containerd.NewEvent(containerd.AddProcessEventType) + e := supervisor.NewEvent(supervisor.AddProcessEventType) e.ID = r.Id e.Process = process e.Console = r.Console @@ -92,7 +92,7 @@ func (s *apiServer) AddProcess(ctx context.Context, r *types.AddProcessRequest) } func (s *apiServer) CreateCheckpoint(ctx context.Context, r *types.CreateCheckpointRequest) (*types.CreateCheckpointResponse, error) { - e := containerd.NewEvent(containerd.CreateCheckpointEventType) + e := supervisor.NewEvent(supervisor.CreateCheckpointEventType) e.ID = r.Id e.Checkpoint = &runtime.Checkpoint{ Name: r.Checkpoint.Name, @@ -112,7 +112,7 @@ func (s *apiServer) DeleteCheckpoint(ctx context.Context, r *types.DeleteCheckpo if r.Name == "" { return nil, errors.New("checkpoint name cannot be empty") } - e := containerd.NewEvent(containerd.DeleteCheckpointEventType) + e := supervisor.NewEvent(supervisor.DeleteCheckpointEventType) e.ID = r.Id e.Checkpoint = &runtime.Checkpoint{ Name: r.Name, @@ -125,7 +125,7 @@ func (s *apiServer) DeleteCheckpoint(ctx context.Context, r *types.DeleteCheckpo } func (s *apiServer) ListCheckpoint(ctx context.Context, r *types.ListCheckpointRequest) (*types.ListCheckpointResponse, error) { - e := containerd.NewEvent(containerd.GetContainerEventType) + e := supervisor.NewEvent(supervisor.GetContainerEventType) s.sv.SendEvent(e) if err := <-e.Err; err != nil { return nil, err @@ -159,7 +159,7 @@ func (s *apiServer) ListCheckpoint(ctx context.Context, r *types.ListCheckpointR } func (s *apiServer) State(ctx context.Context, r *types.StateRequest) (*types.StateResponse, error) { - e := containerd.NewEvent(containerd.GetContainerEventType) + e := supervisor.NewEvent(supervisor.GetContainerEventType) s.sv.SendEvent(e) if err := <-e.Err; err != nil { return nil, err @@ -208,7 +208,7 @@ func (s *apiServer) State(ctx context.Context, r *types.StateRequest) (*types.St } func (s *apiServer) UpdateContainer(ctx context.Context, r *types.UpdateContainerRequest) (*types.UpdateContainerResponse, error) { - e := containerd.NewEvent(containerd.UpdateContainerEventType) + e := supervisor.NewEvent(supervisor.UpdateContainerEventType) e.ID = r.Id if r.Signal != 0 { e.Signal = syscall.Signal(r.Signal) @@ -229,14 +229,14 @@ func (s *apiServer) Events(r *types.EventsRequest, stream types.API_EventsServer for evt := range events { var ev *types.Event switch evt.Type { - case containerd.ExitEventType, containerd.ExecExitEventType: + case supervisor.ExitEventType, supervisor.ExecExitEventType: ev = &types.Event{ Type: "exit", Id: evt.ID, Pid: uint32(evt.Pid), Status: uint32(evt.Status), } - case containerd.OOMEventType: + case supervisor.OOMEventType: ev = &types.Event{ Type: "oom", Id: evt.ID, @@ -253,17 +253,17 @@ func (s *apiServer) Events(r *types.EventsRequest, stream types.API_EventsServer } func (s *apiServer) GetStats(r *types.StatsRequest, stream types.API_GetStatsServer) error { - e := containerd.NewEvent(containerd.StatsEventType) + e := supervisor.NewEvent(supervisor.StatsEventType) e.ID = r.Id s.sv.SendEvent(e) if err := <-e.Err; err != nil { - if err == containerd.ErrContainerNotFound { + if err == supervisor.ErrContainerNotFound { return grpc.Errorf(codes.NotFound, err.Error()) } return err } defer func() { - ue := containerd.NewEvent(containerd.UnsubscribeStatsEventType) + ue := supervisor.NewEvent(supervisor.UnsubscribeStatsEventType) ue.ID = e.ID ue.Stats = e.Stats s.sv.SendEvent(ue) diff --git a/buffer.go b/buffer.go deleted file mode 100644 index 96fadaf..0000000 --- a/buffer.go +++ /dev/null @@ -1,4 +0,0 @@ -package containerd - -// DefaultBufferSize is the default size for a channel's buffer -const DefaultBufferSize = 2048 diff --git a/containerd/main.go b/containerd/main.go index 50149b7..eebeb9f 100644 --- a/containerd/main.go +++ b/containerd/main.go @@ -17,6 +17,7 @@ import ( "github.com/docker/containerd" "github.com/docker/containerd/api/grpc/server" "github.com/docker/containerd/api/grpc/types" + "github.com/docker/containerd/supervisor" "github.com/docker/containerd/util" "github.com/rcrowley/go-metrics" ) @@ -125,7 +126,7 @@ func checkLimits() error { } func debugMetrics(interval time.Duration, graphiteAddr string) error { - for name, m := range containerd.Metrics() { + for name, m := range supervisor.Metrics() { if err := metrics.DefaultRegistry.Register(name, m); err != nil { return err } @@ -166,15 +167,15 @@ func processMetrics() { } func daemon(id, address, stateDir string, concurrency int, oom bool) error { - tasks := make(chan *containerd.StartTask, concurrency*100) - supervisor, err := containerd.NewSupervisor(id, stateDir, tasks, oom) + tasks := make(chan *supervisor.StartTask, concurrency*100) + sv, err := supervisor.New(id, stateDir, tasks, oom) if err != nil { return err } wg := &sync.WaitGroup{} for i := 0; i < concurrency; i++ { wg.Add(1) - w := containerd.NewWorker(supervisor, wg) + w := supervisor.NewWorker(sv, wg) go w.Start() } // only set containerd as the subreaper if it is not an init process @@ -187,8 +188,8 @@ func daemon(id, address, stateDir string, concurrency int, oom bool) error { } } // start the signal handler in the background. - go startSignalHandler(supervisor, containerd.DefaultBufferSize) - if err := supervisor.Start(); err != nil { + go startSignalHandler(sv) + if err := sv.Start(); err != nil { return err } if err := os.RemoveAll(address); err != nil { @@ -199,7 +200,7 @@ func daemon(id, address, stateDir string, concurrency int, oom bool) error { return err } s := grpc.NewServer() - types.RegisterAPIServer(s, server.NewServer(supervisor)) + types.RegisterAPIServer(s, server.NewServer(sv)) logrus.Debugf("GRPC API listen on %s", address) return s.Serve(l) } diff --git a/containerd/reap_linux.go b/containerd/reap_linux.go index d739287..0459c46 100644 --- a/containerd/reap_linux.go +++ b/containerd/reap_linux.go @@ -8,16 +8,18 @@ import ( "syscall" "github.com/Sirupsen/logrus" - "github.com/docker/containerd" + "github.com/docker/containerd/supervisor" "github.com/docker/containerd/util" "github.com/opencontainers/runc/libcontainer/utils" ) -func startSignalHandler(supervisor *containerd.Supervisor, bufferSize int) { +const signalBufferSize = 2048 + +func startSignalHandler(supervisor *supervisor.Supervisor) { logrus.WithFields(logrus.Fields{ - "bufferSize": bufferSize, + "bufferSize": signalBufferSize, }).Debug("containerd: starting signal handler") - signals := make(chan os.Signal, bufferSize) + signals := make(chan os.Signal, signalBufferSize) signal.Notify(signals) for s := range signals { switch s { @@ -37,7 +39,7 @@ func startSignalHandler(supervisor *containerd.Supervisor, bufferSize int) { os.Exit(0) } -func reap() (exits []*containerd.Event, err error) { +func reap() (exits []*supervisor.Event, err error) { var ( ws syscall.WaitStatus rus syscall.Rusage @@ -53,7 +55,7 @@ func reap() (exits []*containerd.Event, err error) { if pid <= 0 { return exits, nil } - e := containerd.NewEvent(containerd.ExitEventType) + e := supervisor.NewEvent(supervisor.ExitEventType) e.Pid = pid e.Status = utils.ExitStatus(ws) exits = append(exits, e) diff --git a/add_process.go b/supervisor/add_process.go similarity index 97% rename from add_process.go rename to supervisor/add_process.go index bc7d72e..ab1fb4d 100644 --- a/add_process.go +++ b/supervisor/add_process.go @@ -1,4 +1,4 @@ -package containerd +package supervisor import "github.com/Sirupsen/logrus" diff --git a/checkpoint.go b/supervisor/checkpoint.go similarity index 96% rename from checkpoint.go rename to supervisor/checkpoint.go index 3fb7f0d..a5ae540 100644 --- a/checkpoint.go +++ b/supervisor/checkpoint.go @@ -1,4 +1,4 @@ -package containerd +package supervisor type CreateCheckpointEvent struct { s *Supervisor diff --git a/create.go b/supervisor/create.go similarity index 97% rename from create.go rename to supervisor/create.go index 657a684..b9e6572 100644 --- a/create.go +++ b/supervisor/create.go @@ -1,4 +1,4 @@ -package containerd +package supervisor type StartEvent struct { s *Supervisor diff --git a/delete.go b/supervisor/delete.go similarity index 97% rename from delete.go rename to supervisor/delete.go index c940cb8..94e4225 100644 --- a/delete.go +++ b/supervisor/delete.go @@ -1,4 +1,4 @@ -package containerd +package supervisor import ( "github.com/Sirupsen/logrus" diff --git a/errors.go b/supervisor/errors.go similarity index 98% rename from errors.go rename to supervisor/errors.go index 7e7eb42..9645ef1 100644 --- a/errors.go +++ b/supervisor/errors.go @@ -1,4 +1,4 @@ -package containerd +package supervisor import "errors" diff --git a/event.go b/supervisor/event.go similarity index 99% rename from event.go rename to supervisor/event.go index 540dac2..08a2415 100644 --- a/event.go +++ b/supervisor/event.go @@ -1,4 +1,4 @@ -package containerd +package supervisor import ( "os" diff --git a/exit.go b/supervisor/exit.go similarity index 98% rename from exit.go rename to supervisor/exit.go index 3163878..32a0fbb 100644 --- a/exit.go +++ b/supervisor/exit.go @@ -1,4 +1,4 @@ -package containerd +package supervisor import "github.com/Sirupsen/logrus" diff --git a/get_containers.go b/supervisor/get_containers.go similarity index 91% rename from get_containers.go rename to supervisor/get_containers.go index f8d898e..0b21e36 100644 --- a/get_containers.go +++ b/supervisor/get_containers.go @@ -1,4 +1,4 @@ -package containerd +package supervisor type GetContainersEvent struct { s *Supervisor diff --git a/io.go b/supervisor/io.go similarity index 98% rename from io.go rename to supervisor/io.go index e75f194..fb43f07 100644 --- a/io.go +++ b/supervisor/io.go @@ -1,4 +1,4 @@ -package containerd +package supervisor import ( "io" diff --git a/machine.go b/supervisor/machine.go similarity index 95% rename from machine.go rename to supervisor/machine.go index 7d5cee3..e25932a 100644 --- a/machine.go +++ b/supervisor/machine.go @@ -1,4 +1,4 @@ -package containerd +package supervisor import "github.com/cloudfoundry/gosigar" diff --git a/oom.go b/supervisor/oom.go similarity index 98% rename from oom.go rename to supervisor/oom.go index 9044979..1455b35 100644 --- a/oom.go +++ b/supervisor/oom.go @@ -1,4 +1,4 @@ -package containerd +package supervisor import ( "reflect" diff --git a/signal.go b/supervisor/signal.go similarity index 95% rename from signal.go rename to supervisor/signal.go index 77abc30..7d0fcab 100644 --- a/signal.go +++ b/supervisor/signal.go @@ -1,4 +1,4 @@ -package containerd +package supervisor type SignalEvent struct { s *Supervisor diff --git a/stats.go b/supervisor/stats.go similarity index 98% rename from stats.go rename to supervisor/stats.go index c4c23f8..379dab4 100644 --- a/stats.go +++ b/supervisor/stats.go @@ -1,4 +1,4 @@ -package containerd +package supervisor import "github.com/rcrowley/go-metrics" diff --git a/stats_collector.go b/supervisor/stats_collector.go similarity index 99% rename from stats_collector.go rename to supervisor/stats_collector.go index f802cad..f45b4ae 100644 --- a/stats_collector.go +++ b/supervisor/stats_collector.go @@ -1,4 +1,4 @@ -package containerd +package supervisor import ( "bufio" diff --git a/supervisor.go b/supervisor/supervisor.go similarity index 94% rename from supervisor.go rename to supervisor/supervisor.go index 9406899..df8bcfc 100644 --- a/supervisor.go +++ b/supervisor/supervisor.go @@ -1,4 +1,4 @@ -package containerd +package supervisor import ( "os" @@ -14,10 +14,13 @@ import ( "github.com/opencontainers/runc/libcontainer" ) -const statsInterval = 1 * time.Second +const ( + statsInterval = 1 * time.Second + defaultBufferSize = 2048 // size of queue in eventloop +) -// NewSupervisor returns an initialized Process supervisor. -func NewSupervisor(id, stateDir string, tasks chan *StartTask, oom bool) (*Supervisor, error) { +// New returns an initialized Process supervisor. +func New(id, stateDir string, tasks chan *StartTask, oom bool) (*Supervisor, error) { if err := os.MkdirAll(stateDir, 0755); err != nil { return nil, err } @@ -39,7 +42,7 @@ func NewSupervisor(id, stateDir string, tasks chan *StartTask, oom bool) (*Super machine: machine, subscribers: make(map[chan *Event]struct{}), statsCollector: newStatsCollector(statsInterval), - el: eventloop.NewChanLoop(DefaultBufferSize), + el: eventloop.NewChanLoop(defaultBufferSize), } if oom { s.notifier = newNotifier(s) @@ -138,7 +141,7 @@ func (s *Supervisor) Close() error { func (s *Supervisor) Events() chan *Event { s.subscriberLock.Lock() defer s.subscriberLock.Unlock() - c := make(chan *Event, DefaultBufferSize) + c := make(chan *Event, defaultBufferSize) EventSubscriberCounter.Inc(1) s.subscribers[c] = struct{}{} return c diff --git a/supervisor_linux.go b/supervisor/supervisor_linux.go similarity index 91% rename from supervisor_linux.go rename to supervisor/supervisor_linux.go index 1eb69d9..ec75542 100644 --- a/supervisor_linux.go +++ b/supervisor/supervisor_linux.go @@ -1,6 +1,6 @@ // +build libcontainer -package containerd +package supervisor import ( "github.com/docker/containerd/linux" diff --git a/supervisor_runc.go b/supervisor/supervisor_runc.go similarity index 91% rename from supervisor_runc.go rename to supervisor/supervisor_runc.go index 4f47411..a0b7b49 100644 --- a/supervisor_runc.go +++ b/supervisor/supervisor_runc.go @@ -1,6 +1,6 @@ // +build runc -package containerd +package supervisor import ( "github.com/docker/containerd/runc" diff --git a/supervisor_unsupported.go b/supervisor/supervisor_unsupported.go similarity index 91% rename from supervisor_unsupported.go rename to supervisor/supervisor_unsupported.go index 3f5a322..2b54893 100644 --- a/supervisor_unsupported.go +++ b/supervisor/supervisor_unsupported.go @@ -1,6 +1,6 @@ // +build !libcontainer,!runc -package containerd +package supervisor import ( "errors" diff --git a/update.go b/supervisor/update.go similarity index 97% rename from update.go rename to supervisor/update.go index 925715d..ae42879 100644 --- a/update.go +++ b/supervisor/update.go @@ -1,4 +1,4 @@ -package containerd +package supervisor import "github.com/docker/containerd/runtime" diff --git a/worker.go b/supervisor/worker.go similarity index 98% rename from worker.go rename to supervisor/worker.go index 040be27..d6ef78d 100644 --- a/worker.go +++ b/supervisor/worker.go @@ -1,4 +1,4 @@ -package containerd +package supervisor import ( "sync"