Move supervisor to it's own package
It allows to keep main namespace cleaner Signed-off-by: Alexander Morozov <lk4d4@docker.com>
This commit is contained in:
		
							parent
							
								
									b296d50493
								
							
						
					
					
						commit
						69f8f566a2
					
				
					 24 changed files with 61 additions and 59 deletions
				
			
		|  | @ -8,19 +8,19 @@ import ( | |||
| 	"google.golang.org/grpc/codes" | ||||
| 
 | ||||
| 	"github.com/Sirupsen/logrus" | ||||
| 	"github.com/docker/containerd" | ||||
| 	"github.com/docker/containerd/api/grpc/types" | ||||
| 	"github.com/docker/containerd/runtime" | ||||
| 	"github.com/docker/containerd/supervisor" | ||||
| 	"github.com/opencontainers/specs" | ||||
| 	"golang.org/x/net/context" | ||||
| ) | ||||
| 
 | ||||
| type apiServer struct { | ||||
| 	sv *containerd.Supervisor | ||||
| 	sv *supervisor.Supervisor | ||||
| } | ||||
| 
 | ||||
| // NewServer returns grpc server instance | ||||
| func NewServer(sv *containerd.Supervisor) types.APIServer { | ||||
| func NewServer(sv *supervisor.Supervisor) types.APIServer { | ||||
| 	return &apiServer{ | ||||
| 		sv: sv, | ||||
| 	} | ||||
|  | @ -30,14 +30,14 @@ func (s *apiServer) CreateContainer(ctx context.Context, c *types.CreateContaine | |||
| 	if c.BundlePath == "" { | ||||
| 		return nil, errors.New("empty bundle path") | ||||
| 	} | ||||
| 	e := containerd.NewEvent(containerd.StartContainerEventType) | ||||
| 	e := supervisor.NewEvent(supervisor.StartContainerEventType) | ||||
| 	e.ID = c.Id | ||||
| 	e.BundlePath = c.BundlePath | ||||
| 	e.Stdout = c.Stdout | ||||
| 	e.Stderr = c.Stderr | ||||
| 	e.Stdin = c.Stdin | ||||
| 	e.Console = c.Console | ||||
| 	e.StartResponse = make(chan containerd.StartResponse, 1) | ||||
| 	e.StartResponse = make(chan supervisor.StartResponse, 1) | ||||
| 	if c.Checkpoint != "" { | ||||
| 		e.Checkpoint = &runtime.Checkpoint{ | ||||
| 			Name: c.Checkpoint, | ||||
|  | @ -54,7 +54,7 @@ func (s *apiServer) CreateContainer(ctx context.Context, c *types.CreateContaine | |||
| } | ||||
| 
 | ||||
| func (s *apiServer) Signal(ctx context.Context, r *types.SignalRequest) (*types.SignalResponse, error) { | ||||
| 	e := containerd.NewEvent(containerd.SignalEventType) | ||||
| 	e := supervisor.NewEvent(supervisor.SignalEventType) | ||||
| 	e.ID = r.Id | ||||
| 	e.Pid = int(r.Pid) | ||||
| 	e.Signal = syscall.Signal(int(r.Signal)) | ||||
|  | @ -77,7 +77,7 @@ func (s *apiServer) AddProcess(ctx context.Context, r *types.AddProcessRequest) | |||
| 			AdditionalGids: r.User.AdditionalGids, | ||||
| 		}, | ||||
| 	} | ||||
| 	e := containerd.NewEvent(containerd.AddProcessEventType) | ||||
| 	e := supervisor.NewEvent(supervisor.AddProcessEventType) | ||||
| 	e.ID = r.Id | ||||
| 	e.Process = process | ||||
| 	e.Console = r.Console | ||||
|  | @ -92,7 +92,7 @@ func (s *apiServer) AddProcess(ctx context.Context, r *types.AddProcessRequest) | |||
| } | ||||
| 
 | ||||
| func (s *apiServer) CreateCheckpoint(ctx context.Context, r *types.CreateCheckpointRequest) (*types.CreateCheckpointResponse, error) { | ||||
| 	e := containerd.NewEvent(containerd.CreateCheckpointEventType) | ||||
| 	e := supervisor.NewEvent(supervisor.CreateCheckpointEventType) | ||||
| 	e.ID = r.Id | ||||
| 	e.Checkpoint = &runtime.Checkpoint{ | ||||
| 		Name:        r.Checkpoint.Name, | ||||
|  | @ -112,7 +112,7 @@ func (s *apiServer) DeleteCheckpoint(ctx context.Context, r *types.DeleteCheckpo | |||
| 	if r.Name == "" { | ||||
| 		return nil, errors.New("checkpoint name cannot be empty") | ||||
| 	} | ||||
| 	e := containerd.NewEvent(containerd.DeleteCheckpointEventType) | ||||
| 	e := supervisor.NewEvent(supervisor.DeleteCheckpointEventType) | ||||
| 	e.ID = r.Id | ||||
| 	e.Checkpoint = &runtime.Checkpoint{ | ||||
| 		Name: r.Name, | ||||
|  | @ -125,7 +125,7 @@ func (s *apiServer) DeleteCheckpoint(ctx context.Context, r *types.DeleteCheckpo | |||
| } | ||||
| 
 | ||||
| func (s *apiServer) ListCheckpoint(ctx context.Context, r *types.ListCheckpointRequest) (*types.ListCheckpointResponse, error) { | ||||
| 	e := containerd.NewEvent(containerd.GetContainerEventType) | ||||
| 	e := supervisor.NewEvent(supervisor.GetContainerEventType) | ||||
| 	s.sv.SendEvent(e) | ||||
| 	if err := <-e.Err; err != nil { | ||||
| 		return nil, err | ||||
|  | @ -159,7 +159,7 @@ func (s *apiServer) ListCheckpoint(ctx context.Context, r *types.ListCheckpointR | |||
| } | ||||
| 
 | ||||
| func (s *apiServer) State(ctx context.Context, r *types.StateRequest) (*types.StateResponse, error) { | ||||
| 	e := containerd.NewEvent(containerd.GetContainerEventType) | ||||
| 	e := supervisor.NewEvent(supervisor.GetContainerEventType) | ||||
| 	s.sv.SendEvent(e) | ||||
| 	if err := <-e.Err; err != nil { | ||||
| 		return nil, err | ||||
|  | @ -208,7 +208,7 @@ func (s *apiServer) State(ctx context.Context, r *types.StateRequest) (*types.St | |||
| } | ||||
| 
 | ||||
| func (s *apiServer) UpdateContainer(ctx context.Context, r *types.UpdateContainerRequest) (*types.UpdateContainerResponse, error) { | ||||
| 	e := containerd.NewEvent(containerd.UpdateContainerEventType) | ||||
| 	e := supervisor.NewEvent(supervisor.UpdateContainerEventType) | ||||
| 	e.ID = r.Id | ||||
| 	if r.Signal != 0 { | ||||
| 		e.Signal = syscall.Signal(r.Signal) | ||||
|  | @ -229,14 +229,14 @@ func (s *apiServer) Events(r *types.EventsRequest, stream types.API_EventsServer | |||
| 	for evt := range events { | ||||
| 		var ev *types.Event | ||||
| 		switch evt.Type { | ||||
| 		case containerd.ExitEventType, containerd.ExecExitEventType: | ||||
| 		case supervisor.ExitEventType, supervisor.ExecExitEventType: | ||||
| 			ev = &types.Event{ | ||||
| 				Type:   "exit", | ||||
| 				Id:     evt.ID, | ||||
| 				Pid:    uint32(evt.Pid), | ||||
| 				Status: uint32(evt.Status), | ||||
| 			} | ||||
| 		case containerd.OOMEventType: | ||||
| 		case supervisor.OOMEventType: | ||||
| 			ev = &types.Event{ | ||||
| 				Type: "oom", | ||||
| 				Id:   evt.ID, | ||||
|  | @ -253,17 +253,17 @@ func (s *apiServer) Events(r *types.EventsRequest, stream types.API_EventsServer | |||
| } | ||||
| 
 | ||||
| func (s *apiServer) GetStats(r *types.StatsRequest, stream types.API_GetStatsServer) error { | ||||
| 	e := containerd.NewEvent(containerd.StatsEventType) | ||||
| 	e := supervisor.NewEvent(supervisor.StatsEventType) | ||||
| 	e.ID = r.Id | ||||
| 	s.sv.SendEvent(e) | ||||
| 	if err := <-e.Err; err != nil { | ||||
| 		if err == containerd.ErrContainerNotFound { | ||||
| 		if err == supervisor.ErrContainerNotFound { | ||||
| 			return grpc.Errorf(codes.NotFound, err.Error()) | ||||
| 		} | ||||
| 		return err | ||||
| 	} | ||||
| 	defer func() { | ||||
| 		ue := containerd.NewEvent(containerd.UnsubscribeStatsEventType) | ||||
| 		ue := supervisor.NewEvent(supervisor.UnsubscribeStatsEventType) | ||||
| 		ue.ID = e.ID | ||||
| 		ue.Stats = e.Stats | ||||
| 		s.sv.SendEvent(ue) | ||||
|  |  | |||
|  | @ -1,4 +0,0 @@ | |||
| package containerd | ||||
| 
 | ||||
| // DefaultBufferSize is the default size for a channel's buffer | ||||
| const DefaultBufferSize = 2048 | ||||
|  | @ -17,6 +17,7 @@ import ( | |||
| 	"github.com/docker/containerd" | ||||
| 	"github.com/docker/containerd/api/grpc/server" | ||||
| 	"github.com/docker/containerd/api/grpc/types" | ||||
| 	"github.com/docker/containerd/supervisor" | ||||
| 	"github.com/docker/containerd/util" | ||||
| 	"github.com/rcrowley/go-metrics" | ||||
| ) | ||||
|  | @ -125,7 +126,7 @@ func checkLimits() error { | |||
| } | ||||
| 
 | ||||
| func debugMetrics(interval time.Duration, graphiteAddr string) error { | ||||
| 	for name, m := range containerd.Metrics() { | ||||
| 	for name, m := range supervisor.Metrics() { | ||||
| 		if err := metrics.DefaultRegistry.Register(name, m); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
|  | @ -166,15 +167,15 @@ func processMetrics() { | |||
| } | ||||
| 
 | ||||
| func daemon(id, address, stateDir string, concurrency int, oom bool) error { | ||||
| 	tasks := make(chan *containerd.StartTask, concurrency*100) | ||||
| 	supervisor, err := containerd.NewSupervisor(id, stateDir, tasks, oom) | ||||
| 	tasks := make(chan *supervisor.StartTask, concurrency*100) | ||||
| 	sv, err := supervisor.New(id, stateDir, tasks, oom) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	wg := &sync.WaitGroup{} | ||||
| 	for i := 0; i < concurrency; i++ { | ||||
| 		wg.Add(1) | ||||
| 		w := containerd.NewWorker(supervisor, wg) | ||||
| 		w := supervisor.NewWorker(sv, wg) | ||||
| 		go w.Start() | ||||
| 	} | ||||
| 	// only set containerd as the subreaper if it is not an init process | ||||
|  | @ -187,8 +188,8 @@ func daemon(id, address, stateDir string, concurrency int, oom bool) error { | |||
| 		} | ||||
| 	} | ||||
| 	// start the signal handler in the background. | ||||
| 	go startSignalHandler(supervisor, containerd.DefaultBufferSize) | ||||
| 	if err := supervisor.Start(); err != nil { | ||||
| 	go startSignalHandler(sv) | ||||
| 	if err := sv.Start(); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	if err := os.RemoveAll(address); err != nil { | ||||
|  | @ -199,7 +200,7 @@ func daemon(id, address, stateDir string, concurrency int, oom bool) error { | |||
| 		return err | ||||
| 	} | ||||
| 	s := grpc.NewServer() | ||||
| 	types.RegisterAPIServer(s, server.NewServer(supervisor)) | ||||
| 	types.RegisterAPIServer(s, server.NewServer(sv)) | ||||
| 	logrus.Debugf("GRPC API listen on %s", address) | ||||
| 	return s.Serve(l) | ||||
| } | ||||
|  |  | |||
|  | @ -8,16 +8,18 @@ import ( | |||
| 	"syscall" | ||||
| 
 | ||||
| 	"github.com/Sirupsen/logrus" | ||||
| 	"github.com/docker/containerd" | ||||
| 	"github.com/docker/containerd/supervisor" | ||||
| 	"github.com/docker/containerd/util" | ||||
| 	"github.com/opencontainers/runc/libcontainer/utils" | ||||
| ) | ||||
| 
 | ||||
| func startSignalHandler(supervisor *containerd.Supervisor, bufferSize int) { | ||||
| const signalBufferSize = 2048 | ||||
| 
 | ||||
| func startSignalHandler(supervisor *supervisor.Supervisor) { | ||||
| 	logrus.WithFields(logrus.Fields{ | ||||
| 		"bufferSize": bufferSize, | ||||
| 		"bufferSize": signalBufferSize, | ||||
| 	}).Debug("containerd: starting signal handler") | ||||
| 	signals := make(chan os.Signal, bufferSize) | ||||
| 	signals := make(chan os.Signal, signalBufferSize) | ||||
| 	signal.Notify(signals) | ||||
| 	for s := range signals { | ||||
| 		switch s { | ||||
|  | @ -37,7 +39,7 @@ func startSignalHandler(supervisor *containerd.Supervisor, bufferSize int) { | |||
| 	os.Exit(0) | ||||
| } | ||||
| 
 | ||||
| func reap() (exits []*containerd.Event, err error) { | ||||
| func reap() (exits []*supervisor.Event, err error) { | ||||
| 	var ( | ||||
| 		ws  syscall.WaitStatus | ||||
| 		rus syscall.Rusage | ||||
|  | @ -53,7 +55,7 @@ func reap() (exits []*containerd.Event, err error) { | |||
| 		if pid <= 0 { | ||||
| 			return exits, nil | ||||
| 		} | ||||
| 		e := containerd.NewEvent(containerd.ExitEventType) | ||||
| 		e := supervisor.NewEvent(supervisor.ExitEventType) | ||||
| 		e.Pid = pid | ||||
| 		e.Status = utils.ExitStatus(ws) | ||||
| 		exits = append(exits, e) | ||||
|  |  | |||
|  | @ -1,4 +1,4 @@ | |||
| package containerd | ||||
| package supervisor | ||||
| 
 | ||||
| import "github.com/Sirupsen/logrus" | ||||
| 
 | ||||
|  | @ -1,4 +1,4 @@ | |||
| package containerd | ||||
| package supervisor | ||||
| 
 | ||||
| type CreateCheckpointEvent struct { | ||||
| 	s *Supervisor | ||||
|  | @ -1,4 +1,4 @@ | |||
| package containerd | ||||
| package supervisor | ||||
| 
 | ||||
| type StartEvent struct { | ||||
| 	s *Supervisor | ||||
|  | @ -1,4 +1,4 @@ | |||
| package containerd | ||||
| package supervisor | ||||
| 
 | ||||
| import ( | ||||
| 	"github.com/Sirupsen/logrus" | ||||
|  | @ -1,4 +1,4 @@ | |||
| package containerd | ||||
| package supervisor | ||||
| 
 | ||||
| import "errors" | ||||
| 
 | ||||
|  | @ -1,4 +1,4 @@ | |||
| package containerd | ||||
| package supervisor | ||||
| 
 | ||||
| import ( | ||||
| 	"os" | ||||
|  | @ -1,4 +1,4 @@ | |||
| package containerd | ||||
| package supervisor | ||||
| 
 | ||||
| import "github.com/Sirupsen/logrus" | ||||
| 
 | ||||
|  | @ -1,4 +1,4 @@ | |||
| package containerd | ||||
| package supervisor | ||||
| 
 | ||||
| type GetContainersEvent struct { | ||||
| 	s *Supervisor | ||||
|  | @ -1,4 +1,4 @@ | |||
| package containerd | ||||
| package supervisor | ||||
| 
 | ||||
| import ( | ||||
| 	"io" | ||||
|  | @ -1,4 +1,4 @@ | |||
| package containerd | ||||
| package supervisor | ||||
| 
 | ||||
| import "github.com/cloudfoundry/gosigar" | ||||
| 
 | ||||
|  | @ -1,4 +1,4 @@ | |||
| package containerd | ||||
| package supervisor | ||||
| 
 | ||||
| import ( | ||||
| 	"reflect" | ||||
|  | @ -1,4 +1,4 @@ | |||
| package containerd | ||||
| package supervisor | ||||
| 
 | ||||
| type SignalEvent struct { | ||||
| 	s *Supervisor | ||||
|  | @ -1,4 +1,4 @@ | |||
| package containerd | ||||
| package supervisor | ||||
| 
 | ||||
| import "github.com/rcrowley/go-metrics" | ||||
| 
 | ||||
|  | @ -1,4 +1,4 @@ | |||
| package containerd | ||||
| package supervisor | ||||
| 
 | ||||
| import ( | ||||
| 	"bufio" | ||||
|  | @ -1,4 +1,4 @@ | |||
| package containerd | ||||
| package supervisor | ||||
| 
 | ||||
| import ( | ||||
| 	"os" | ||||
|  | @ -14,10 +14,13 @@ import ( | |||
| 	"github.com/opencontainers/runc/libcontainer" | ||||
| ) | ||||
| 
 | ||||
| const statsInterval = 1 * time.Second | ||||
| const ( | ||||
| 	statsInterval     = 1 * time.Second | ||||
| 	defaultBufferSize = 2048 // size of queue in eventloop | ||||
| ) | ||||
| 
 | ||||
| // NewSupervisor returns an initialized Process supervisor. | ||||
| func NewSupervisor(id, stateDir string, tasks chan *StartTask, oom bool) (*Supervisor, error) { | ||||
| // New returns an initialized Process supervisor. | ||||
| func New(id, stateDir string, tasks chan *StartTask, oom bool) (*Supervisor, error) { | ||||
| 	if err := os.MkdirAll(stateDir, 0755); err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | @ -39,7 +42,7 @@ func NewSupervisor(id, stateDir string, tasks chan *StartTask, oom bool) (*Super | |||
| 		machine:        machine, | ||||
| 		subscribers:    make(map[chan *Event]struct{}), | ||||
| 		statsCollector: newStatsCollector(statsInterval), | ||||
| 		el:             eventloop.NewChanLoop(DefaultBufferSize), | ||||
| 		el:             eventloop.NewChanLoop(defaultBufferSize), | ||||
| 	} | ||||
| 	if oom { | ||||
| 		s.notifier = newNotifier(s) | ||||
|  | @ -138,7 +141,7 @@ func (s *Supervisor) Close() error { | |||
| func (s *Supervisor) Events() chan *Event { | ||||
| 	s.subscriberLock.Lock() | ||||
| 	defer s.subscriberLock.Unlock() | ||||
| 	c := make(chan *Event, DefaultBufferSize) | ||||
| 	c := make(chan *Event, defaultBufferSize) | ||||
| 	EventSubscriberCounter.Inc(1) | ||||
| 	s.subscribers[c] = struct{}{} | ||||
| 	return c | ||||
|  | @ -1,6 +1,6 @@ | |||
| // +build libcontainer | ||||
| 
 | ||||
| package containerd | ||||
| package supervisor | ||||
| 
 | ||||
| import ( | ||||
| 	"github.com/docker/containerd/linux" | ||||
|  | @ -1,6 +1,6 @@ | |||
| // +build runc | ||||
| 
 | ||||
| package containerd | ||||
| package supervisor | ||||
| 
 | ||||
| import ( | ||||
| 	"github.com/docker/containerd/runc" | ||||
|  | @ -1,6 +1,6 @@ | |||
| // +build !libcontainer,!runc | ||||
| 
 | ||||
| package containerd | ||||
| package supervisor | ||||
| 
 | ||||
| import ( | ||||
| 	"errors" | ||||
|  | @ -1,4 +1,4 @@ | |||
| package containerd | ||||
| package supervisor | ||||
| 
 | ||||
| import "github.com/docker/containerd/runtime" | ||||
| 
 | ||||
|  | @ -1,4 +1,4 @@ | |||
| package containerd | ||||
| package supervisor | ||||
| 
 | ||||
| import ( | ||||
| 	"sync" | ||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue