diff --git a/api/grpc/server/server.go b/api/grpc/server/server.go index bc696e7..9429be2 100644 --- a/api/grpc/server/server.go +++ b/api/grpc/server/server.go @@ -1,8 +1,12 @@ package server import ( + "bufio" "errors" "fmt" + "os" + "strconv" + "strings" "syscall" "time" @@ -13,6 +17,10 @@ import ( "github.com/docker/containerd/runtime" "github.com/docker/containerd/specs" "github.com/docker/containerd/supervisor" + "github.com/opencontainers/runc/libcontainer" + "github.com/opencontainers/runc/libcontainer/cgroups" + "github.com/opencontainers/runc/libcontainer/system" + ocs "github.com/opencontainers/runtime-spec/specs-go" "golang.org/x/net/context" ) @@ -40,7 +48,11 @@ func (s *apiServer) CreateContainer(ctx context.Context, c *types.CreateContaine e.Labels = c.Labels e.NoPivotRoot = c.NoPivotRoot e.StartResponse = make(chan supervisor.StartResponse, 1) - createContainerConfigCheckpoint(e, c) + if c.Checkpoint != "" { + e.Checkpoint = &runtime.Checkpoint{ + Name: c.Checkpoint, + } + } s.sv.SendTask(e) if err := <-e.ErrorCh(); err != nil { return nil, err @@ -55,6 +67,73 @@ func (s *apiServer) CreateContainer(ctx context.Context, c *types.CreateContaine }, nil } +func (s *apiServer) CreateCheckpoint(ctx context.Context, r *types.CreateCheckpointRequest) (*types.CreateCheckpointResponse, error) { + e := &supervisor.CreateCheckpointTask{} + e.ID = r.Id + e.Checkpoint = &runtime.Checkpoint{ + Name: r.Checkpoint.Name, + Exit: r.Checkpoint.Exit, + Tcp: r.Checkpoint.Tcp, + UnixSockets: r.Checkpoint.UnixSockets, + Shell: r.Checkpoint.Shell, + } + s.sv.SendTask(e) + if err := <-e.ErrorCh(); err != nil { + return nil, err + } + return &types.CreateCheckpointResponse{}, nil +} + +func (s *apiServer) DeleteCheckpoint(ctx context.Context, r *types.DeleteCheckpointRequest) (*types.DeleteCheckpointResponse, error) { + if r.Name == "" { + return nil, errors.New("checkpoint name cannot be empty") + } + e := &supervisor.DeleteCheckpointTask{} + e.ID = r.Id + e.Checkpoint = &runtime.Checkpoint{ + Name: r.Name, + } + s.sv.SendTask(e) + if err := <-e.ErrorCh(); err != nil { + return nil, err + } + return &types.DeleteCheckpointResponse{}, nil +} + +func (s *apiServer) ListCheckpoint(ctx context.Context, r *types.ListCheckpointRequest) (*types.ListCheckpointResponse, error) { + e := &supervisor.GetContainersTask{} + s.sv.SendTask(e) + if err := <-e.ErrorCh(); err != nil { + return nil, err + } + var container runtime.Container + for _, c := range e.Containers { + if c.ID() == r.Id { + container = c + break + } + } + if container == nil { + return nil, grpc.Errorf(codes.NotFound, "no such containers") + } + var out []*types.Checkpoint + checkpoints, err := container.Checkpoints() + if err != nil { + return nil, err + } + for _, c := range checkpoints { + out = append(out, &types.Checkpoint{ + Name: c.Name, + Tcp: c.Tcp, + Shell: c.Shell, + UnixSockets: c.UnixSockets, + // TODO: figure out timestamp + //Timestamp: c.Timestamp, + }) + } + return &types.ListCheckpointResponse{Checkpoints: out}, nil +} + func (s *apiServer) Signal(ctx context.Context, r *types.SignalRequest) (*types.SignalResponse, error) { e := &supervisor.SignalTask{} e.ID = r.Id @@ -74,8 +153,22 @@ func (s *apiServer) AddProcess(ctx context.Context, r *types.AddProcessRequest) Env: r.Env, Cwd: r.Cwd, } - setPlatformRuntimeProcessSpecUserFields(r, process) - + process.User = ocs.User{ + UID: r.User.Uid, + GID: r.User.Gid, + AdditionalGids: r.User.AdditionalGids, + } + process.Capabilities = r.Capabilities + process.ApparmorProfile = r.ApparmorProfile + process.SelinuxLabel = r.SelinuxLabel + process.NoNewPrivileges = r.NoNewPrivileges + for _, rl := range r.Rlimits { + process.Rlimits = append(process.Rlimits, ocs.Rlimit{ + Type: rl.Type, + Soft: rl.Soft, + Hard: rl.Hard, + }) + } if r.Id == "" { return nil, fmt.Errorf("container id cannot be empty") } @@ -131,7 +224,7 @@ func createAPIContainer(c runtime.Container, getPids bool) (*types.Container, er for _, p := range processes { oldProc := p.Spec() stdio := p.Stdio() - appendToProcs := &types.Process{ + proc := &types.Process{ Pid: p.ID(), SystemPid: uint32(p.SystemPid()), Terminal: oldProc.Terminal, @@ -142,8 +235,23 @@ func createAPIContainer(c runtime.Container, getPids bool) (*types.Container, er Stdout: stdio.Stdout, Stderr: stdio.Stderr, } - setUserFieldsInProcess(appendToProcs, oldProc) - procs = append(procs, appendToProcs) + proc.User = &types.User{ + Uid: oldProc.User.UID, + Gid: oldProc.User.GID, + AdditionalGids: oldProc.User.AdditionalGids, + } + proc.Capabilities = oldProc.Capabilities + proc.ApparmorProfile = oldProc.ApparmorProfile + proc.SelinuxLabel = oldProc.SelinuxLabel + proc.NoNewPrivileges = oldProc.NoNewPrivileges + for _, rl := range oldProc.Rlimits { + proc.Rlimits = append(proc.Rlimits, &types.Rlimit{ + Type: rl.Type, + Soft: rl.Soft, + Hard: rl.Hard, + }) + } + procs = append(procs, proc) } var pids []int state, err := c.Status() @@ -254,3 +362,153 @@ func (s *apiServer) Events(r *types.EventsRequest, stream types.API_EventsServer } return nil } + +func convertToPb(st *runtime.Stat) *types.StatsResponse { + pbSt := &types.StatsResponse{ + Timestamp: uint64(st.Timestamp.Unix()), + CgroupStats: &types.CgroupStats{}, + } + lcSt, ok := st.Data.(*libcontainer.Stats) + if !ok { + return pbSt + } + cpuSt := lcSt.CgroupStats.CpuStats + systemUsage, _ := getSystemCPUUsage() + pbSt.CgroupStats.CpuStats = &types.CpuStats{ + CpuUsage: &types.CpuUsage{ + TotalUsage: cpuSt.CpuUsage.TotalUsage, + PercpuUsage: cpuSt.CpuUsage.PercpuUsage, + UsageInKernelmode: cpuSt.CpuUsage.UsageInKernelmode, + UsageInUsermode: cpuSt.CpuUsage.UsageInUsermode, + }, + ThrottlingData: &types.ThrottlingData{ + Periods: cpuSt.ThrottlingData.Periods, + ThrottledPeriods: cpuSt.ThrottlingData.ThrottledPeriods, + ThrottledTime: cpuSt.ThrottlingData.ThrottledTime, + }, + SystemUsage: systemUsage, + } + memSt := lcSt.CgroupStats.MemoryStats + pbSt.CgroupStats.MemoryStats = &types.MemoryStats{ + Cache: memSt.Cache, + Usage: &types.MemoryData{ + Usage: memSt.Usage.Usage, + MaxUsage: memSt.Usage.MaxUsage, + Failcnt: memSt.Usage.Failcnt, + Limit: memSt.Usage.Limit, + }, + SwapUsage: &types.MemoryData{ + Usage: memSt.SwapUsage.Usage, + MaxUsage: memSt.SwapUsage.MaxUsage, + Failcnt: memSt.SwapUsage.Failcnt, + Limit: memSt.SwapUsage.Limit, + }, + KernelUsage: &types.MemoryData{ + Usage: memSt.KernelUsage.Usage, + MaxUsage: memSt.KernelUsage.MaxUsage, + Failcnt: memSt.KernelUsage.Failcnt, + Limit: memSt.KernelUsage.Limit, + }, + } + blkSt := lcSt.CgroupStats.BlkioStats + pbSt.CgroupStats.BlkioStats = &types.BlkioStats{ + IoServiceBytesRecursive: convertBlkioEntryToPb(blkSt.IoServiceBytesRecursive), + IoServicedRecursive: convertBlkioEntryToPb(blkSt.IoServicedRecursive), + IoQueuedRecursive: convertBlkioEntryToPb(blkSt.IoQueuedRecursive), + IoServiceTimeRecursive: convertBlkioEntryToPb(blkSt.IoServiceTimeRecursive), + IoWaitTimeRecursive: convertBlkioEntryToPb(blkSt.IoWaitTimeRecursive), + IoMergedRecursive: convertBlkioEntryToPb(blkSt.IoMergedRecursive), + IoTimeRecursive: convertBlkioEntryToPb(blkSt.IoTimeRecursive), + SectorsRecursive: convertBlkioEntryToPb(blkSt.SectorsRecursive), + } + pbSt.CgroupStats.HugetlbStats = make(map[string]*types.HugetlbStats) + for k, st := range lcSt.CgroupStats.HugetlbStats { + pbSt.CgroupStats.HugetlbStats[k] = &types.HugetlbStats{ + Usage: st.Usage, + MaxUsage: st.MaxUsage, + Failcnt: st.Failcnt, + } + } + pbSt.CgroupStats.PidsStats = &types.PidsStats{ + Current: lcSt.CgroupStats.PidsStats.Current, + Limit: lcSt.CgroupStats.PidsStats.Limit, + } + return pbSt +} + +func convertBlkioEntryToPb(b []cgroups.BlkioStatEntry) []*types.BlkioStatsEntry { + var pbEs []*types.BlkioStatsEntry + for _, e := range b { + pbEs = append(pbEs, &types.BlkioStatsEntry{ + Major: e.Major, + Minor: e.Minor, + Op: e.Op, + Value: e.Value, + }) + } + return pbEs +} + +const nanoSecondsPerSecond = 1e9 + +var clockTicksPerSecond = uint64(system.GetClockTicks()) + +// getSystemCPUUsage returns the host system's cpu usage in +// nanoseconds. An error is returned if the format of the underlying +// file does not match. +// +// Uses /proc/stat defined by POSIX. Looks for the cpu +// statistics line and then sums up the first seven fields +// provided. See `man 5 proc` for details on specific field +// information. +func getSystemCPUUsage() (uint64, error) { + var line string + f, err := os.Open("/proc/stat") + if err != nil { + return 0, err + } + bufReader := bufio.NewReaderSize(nil, 128) + defer func() { + bufReader.Reset(nil) + f.Close() + }() + bufReader.Reset(f) + err = nil + for err == nil { + line, err = bufReader.ReadString('\n') + if err != nil { + break + } + parts := strings.Fields(line) + switch parts[0] { + case "cpu": + if len(parts) < 8 { + return 0, fmt.Errorf("bad format of cpu stats") + } + var totalClockTicks uint64 + for _, i := range parts[1:8] { + v, err := strconv.ParseUint(i, 10, 64) + if err != nil { + return 0, fmt.Errorf("error parsing cpu stats") + } + totalClockTicks += v + } + return (totalClockTicks * nanoSecondsPerSecond) / + clockTicksPerSecond, nil + } + } + return 0, fmt.Errorf("bad stats format") +} + +func (s *apiServer) Stats(ctx context.Context, r *types.StatsRequest) (*types.StatsResponse, error) { + e := &supervisor.StatsTask{} + e.ID = r.Id + e.Stat = make(chan *runtime.Stat, 1) + s.sv.SendTask(e) + if err := <-e.ErrorCh(); err != nil { + return nil, err + } + stats := <-e.Stat + t := convertToPb(stats) + return t, nil +} diff --git a/api/grpc/server/server_linux.go b/api/grpc/server/server_linux.go deleted file mode 100644 index be7a148..0000000 --- a/api/grpc/server/server_linux.go +++ /dev/null @@ -1,285 +0,0 @@ -package server - -import ( - "bufio" - "errors" - "fmt" - "os" - "strconv" - "strings" - - "github.com/docker/containerd/api/grpc/types" - "github.com/docker/containerd/runtime" - "github.com/docker/containerd/specs" - "github.com/docker/containerd/supervisor" - "github.com/opencontainers/runc/libcontainer" - "github.com/opencontainers/runc/libcontainer/cgroups" - "github.com/opencontainers/runc/libcontainer/system" - ocs "github.com/opencontainers/runtime-spec/specs-go" - "golang.org/x/net/context" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" -) - -func createContainerConfigCheckpoint(e *supervisor.StartTask, c *types.CreateContainerRequest) { - if c.Checkpoint != "" { - e.Checkpoint = &runtime.Checkpoint{ - Name: c.Checkpoint, - } - } -} - -func (s *apiServer) CreateCheckpoint(ctx context.Context, r *types.CreateCheckpointRequest) (*types.CreateCheckpointResponse, error) { - e := &supervisor.CreateCheckpointTask{} - e.ID = r.Id - e.Checkpoint = &runtime.Checkpoint{ - Name: r.Checkpoint.Name, - Exit: r.Checkpoint.Exit, - Tcp: r.Checkpoint.Tcp, - UnixSockets: r.Checkpoint.UnixSockets, - Shell: r.Checkpoint.Shell, - } - s.sv.SendTask(e) - if err := <-e.ErrorCh(); err != nil { - return nil, err - } - return &types.CreateCheckpointResponse{}, nil -} - -func (s *apiServer) DeleteCheckpoint(ctx context.Context, r *types.DeleteCheckpointRequest) (*types.DeleteCheckpointResponse, error) { - if r.Name == "" { - return nil, errors.New("checkpoint name cannot be empty") - } - e := &supervisor.DeleteCheckpointTask{} - e.ID = r.Id - e.Checkpoint = &runtime.Checkpoint{ - Name: r.Name, - } - s.sv.SendTask(e) - if err := <-e.ErrorCh(); err != nil { - return nil, err - } - return &types.DeleteCheckpointResponse{}, nil -} - -func (s *apiServer) ListCheckpoint(ctx context.Context, r *types.ListCheckpointRequest) (*types.ListCheckpointResponse, error) { - e := &supervisor.GetContainersTask{} - s.sv.SendTask(e) - if err := <-e.ErrorCh(); err != nil { - return nil, err - } - var container runtime.Container - for _, c := range e.Containers { - if c.ID() == r.Id { - container = c - break - } - } - if container == nil { - return nil, grpc.Errorf(codes.NotFound, "no such containers") - } - var out []*types.Checkpoint - checkpoints, err := container.Checkpoints() - if err != nil { - return nil, err - } - for _, c := range checkpoints { - out = append(out, &types.Checkpoint{ - Name: c.Name, - Tcp: c.Tcp, - Shell: c.Shell, - UnixSockets: c.UnixSockets, - // TODO: figure out timestamp - //Timestamp: c.Timestamp, - }) - } - return &types.ListCheckpointResponse{Checkpoints: out}, nil -} - -func convertToPb(st *runtime.Stat) *types.StatsResponse { - pbSt := &types.StatsResponse{ - Timestamp: uint64(st.Timestamp.Unix()), - CgroupStats: &types.CgroupStats{}, - } - lcSt, ok := st.Data.(*libcontainer.Stats) - if !ok { - return pbSt - } - cpuSt := lcSt.CgroupStats.CpuStats - systemUsage, _ := getSystemCPUUsage() - pbSt.CgroupStats.CpuStats = &types.CpuStats{ - CpuUsage: &types.CpuUsage{ - TotalUsage: cpuSt.CpuUsage.TotalUsage, - PercpuUsage: cpuSt.CpuUsage.PercpuUsage, - UsageInKernelmode: cpuSt.CpuUsage.UsageInKernelmode, - UsageInUsermode: cpuSt.CpuUsage.UsageInUsermode, - }, - ThrottlingData: &types.ThrottlingData{ - Periods: cpuSt.ThrottlingData.Periods, - ThrottledPeriods: cpuSt.ThrottlingData.ThrottledPeriods, - ThrottledTime: cpuSt.ThrottlingData.ThrottledTime, - }, - SystemUsage: systemUsage, - } - memSt := lcSt.CgroupStats.MemoryStats - pbSt.CgroupStats.MemoryStats = &types.MemoryStats{ - Cache: memSt.Cache, - Usage: &types.MemoryData{ - Usage: memSt.Usage.Usage, - MaxUsage: memSt.Usage.MaxUsage, - Failcnt: memSt.Usage.Failcnt, - Limit: memSt.Usage.Limit, - }, - SwapUsage: &types.MemoryData{ - Usage: memSt.SwapUsage.Usage, - MaxUsage: memSt.SwapUsage.MaxUsage, - Failcnt: memSt.SwapUsage.Failcnt, - Limit: memSt.SwapUsage.Limit, - }, - KernelUsage: &types.MemoryData{ - Usage: memSt.KernelUsage.Usage, - MaxUsage: memSt.KernelUsage.MaxUsage, - Failcnt: memSt.KernelUsage.Failcnt, - Limit: memSt.KernelUsage.Limit, - }, - } - blkSt := lcSt.CgroupStats.BlkioStats - pbSt.CgroupStats.BlkioStats = &types.BlkioStats{ - IoServiceBytesRecursive: convertBlkioEntryToPb(blkSt.IoServiceBytesRecursive), - IoServicedRecursive: convertBlkioEntryToPb(blkSt.IoServicedRecursive), - IoQueuedRecursive: convertBlkioEntryToPb(blkSt.IoQueuedRecursive), - IoServiceTimeRecursive: convertBlkioEntryToPb(blkSt.IoServiceTimeRecursive), - IoWaitTimeRecursive: convertBlkioEntryToPb(blkSt.IoWaitTimeRecursive), - IoMergedRecursive: convertBlkioEntryToPb(blkSt.IoMergedRecursive), - IoTimeRecursive: convertBlkioEntryToPb(blkSt.IoTimeRecursive), - SectorsRecursive: convertBlkioEntryToPb(blkSt.SectorsRecursive), - } - pbSt.CgroupStats.HugetlbStats = make(map[string]*types.HugetlbStats) - for k, st := range lcSt.CgroupStats.HugetlbStats { - pbSt.CgroupStats.HugetlbStats[k] = &types.HugetlbStats{ - Usage: st.Usage, - MaxUsage: st.MaxUsage, - Failcnt: st.Failcnt, - } - } - pbSt.CgroupStats.PidsStats = &types.PidsStats{ - Current: lcSt.CgroupStats.PidsStats.Current, - Limit: lcSt.CgroupStats.PidsStats.Limit, - } - return pbSt -} - -func convertBlkioEntryToPb(b []cgroups.BlkioStatEntry) []*types.BlkioStatsEntry { - var pbEs []*types.BlkioStatsEntry - for _, e := range b { - pbEs = append(pbEs, &types.BlkioStatsEntry{ - Major: e.Major, - Minor: e.Minor, - Op: e.Op, - Value: e.Value, - }) - } - return pbEs -} - -const nanoSecondsPerSecond = 1e9 - -var clockTicksPerSecond = uint64(system.GetClockTicks()) - -// getSystemCPUUsage returns the host system's cpu usage in -// nanoseconds. An error is returned if the format of the underlying -// file does not match. -// -// Uses /proc/stat defined by POSIX. Looks for the cpu -// statistics line and then sums up the first seven fields -// provided. See `man 5 proc` for details on specific field -// information. -func getSystemCPUUsage() (uint64, error) { - var line string - f, err := os.Open("/proc/stat") - if err != nil { - return 0, err - } - bufReader := bufio.NewReaderSize(nil, 128) - defer func() { - bufReader.Reset(nil) - f.Close() - }() - bufReader.Reset(f) - err = nil - for err == nil { - line, err = bufReader.ReadString('\n') - if err != nil { - break - } - parts := strings.Fields(line) - switch parts[0] { - case "cpu": - if len(parts) < 8 { - return 0, fmt.Errorf("bad format of cpu stats") - } - var totalClockTicks uint64 - for _, i := range parts[1:8] { - v, err := strconv.ParseUint(i, 10, 64) - if err != nil { - return 0, fmt.Errorf("error parsing cpu stats") - } - totalClockTicks += v - } - return (totalClockTicks * nanoSecondsPerSecond) / - clockTicksPerSecond, nil - } - } - return 0, fmt.Errorf("bad stats format") -} - -func (s *apiServer) Stats(ctx context.Context, r *types.StatsRequest) (*types.StatsResponse, error) { - e := &supervisor.StatsTask{} - e.ID = r.Id - e.Stat = make(chan *runtime.Stat, 1) - s.sv.SendTask(e) - if err := <-e.ErrorCh(); err != nil { - return nil, err - } - stats := <-e.Stat - t := convertToPb(stats) - return t, nil -} - -func setUserFieldsInProcess(p *types.Process, oldProc specs.ProcessSpec) { - p.User = &types.User{ - Uid: oldProc.User.UID, - Gid: oldProc.User.GID, - AdditionalGids: oldProc.User.AdditionalGids, - } - p.Capabilities = oldProc.Capabilities - p.ApparmorProfile = oldProc.ApparmorProfile - p.SelinuxLabel = oldProc.SelinuxLabel - p.NoNewPrivileges = oldProc.NoNewPrivileges - for _, rl := range oldProc.Rlimits { - p.Rlimits = append(p.Rlimits, &types.Rlimit{ - Type: rl.Type, - Soft: rl.Soft, - Hard: rl.Hard, - }) - } -} - -func setPlatformRuntimeProcessSpecUserFields(r *types.AddProcessRequest, process *specs.ProcessSpec) { - process.User = ocs.User{ - UID: r.User.Uid, - GID: r.User.Gid, - AdditionalGids: r.User.AdditionalGids, - } - process.Capabilities = r.Capabilities - process.ApparmorProfile = r.ApparmorProfile - process.SelinuxLabel = r.SelinuxLabel - process.NoNewPrivileges = r.NoNewPrivileges - for _, rl := range r.Rlimits { - process.Rlimits = append(process.Rlimits, ocs.Rlimit{ - Type: rl.Type, - Soft: rl.Soft, - Hard: rl.Hard, - }) - } -} diff --git a/api/grpc/server/server_windows.go b/api/grpc/server/server_windows.go deleted file mode 100644 index 96b34e7..0000000 --- a/api/grpc/server/server_windows.go +++ /dev/null @@ -1,39 +0,0 @@ -package server - -import ( - "errors" - - "github.com/docker/containerd/api/grpc/types" - "github.com/docker/containerd/specs" - "github.com/docker/containerd/supervisor" - "golang.org/x/net/context" -) - -// noop on Windows (Checkpoints not supported) -func createContainerConfigCheckpoint(e *supervisor.StartTask, c *types.CreateContainerRequest) { -} - -// TODO Windows - may be able to completely factor out -func (s *apiServer) CreateCheckpoint(ctx context.Context, r *types.CreateCheckpointRequest) (*types.CreateCheckpointResponse, error) { - return nil, errors.New("CreateCheckpoint() not supported on Windows") -} - -// TODO Windows - may be able to completely factor out -func (s *apiServer) DeleteCheckpoint(ctx context.Context, r *types.DeleteCheckpointRequest) (*types.DeleteCheckpointResponse, error) { - return nil, errors.New("DeleteCheckpoint() not supported on Windows") -} - -// TODO Windows - may be able to completely factor out -func (s *apiServer) ListCheckpoint(ctx context.Context, r *types.ListCheckpointRequest) (*types.ListCheckpointResponse, error) { - return nil, errors.New("ListCheckpoint() not supported on Windows") -} - -func (s *apiServer) Stats(ctx context.Context, r *types.StatsRequest) (*types.StatsResponse, error) { - return nil, errors.New("Stats() not supported on Windows") -} - -func setUserFieldsInProcess(p *types.Process, oldProc specs.ProcessSpec) { -} - -func setPlatformRuntimeProcessSpecUserFields(r *types.User, process *specs.ProcessSpec) { -} diff --git a/containerd/main.go b/containerd/main.go index e39e1cb..635c089 100644 --- a/containerd/main.go +++ b/containerd/main.go @@ -2,8 +2,11 @@ package main import ( "fmt" + "log" + "net" "os" "os/signal" + "runtime" "strings" "sync" "syscall" @@ -12,18 +15,24 @@ import ( "google.golang.org/grpc" "github.com/Sirupsen/logrus" + "github.com/cloudfoundry/gosigar" "github.com/codegangsta/cli" + "github.com/cyberdelia/go-metrics-graphite" "github.com/docker/containerd" "github.com/docker/containerd/api/grpc/server" "github.com/docker/containerd/api/grpc/types" + "github.com/docker/containerd/api/http/pprof" "github.com/docker/containerd/osutils" "github.com/docker/containerd/supervisor" "github.com/docker/docker/pkg/listeners" + "github.com/rcrowley/go-metrics" ) const ( - usage = `High performance container daemon` - minRlimit = 1024 + usage = `High performance container daemon` + minRlimit = 1024 + defaultStateDir = "/run/containerd" + defaultGRPCEndpoint = "unix:///run/containerd/containerd.sock" ) var daemonFlags = []cli.Flag{ @@ -75,11 +84,14 @@ var daemonFlags = []cli.Flag{ Value: 500, Usage: "number of past events to keep in the event log", }, + cli.StringFlag{ + Name: "graphite-address", + Usage: "Address of graphite server", + }, } func main() { logrus.SetFormatter(&logrus.TextFormatter{TimestampFormat: time.RFC3339Nano}) - appendPlatformFlags() app := cli.NewApp() app.Name = "containerd" if containerd.GitCommit != "" { @@ -89,7 +101,24 @@ func main() { } app.Usage = usage app.Flags = daemonFlags - setAppBefore(app) + app.Before = func(context *cli.Context) error { + if context.GlobalBool("debug") { + logrus.SetLevel(logrus.DebugLevel) + if context.GlobalDuration("metrics-interval") > 0 { + if err := debugMetrics(context.GlobalDuration("metrics-interval"), context.GlobalString("graphite-address")); err != nil { + return err + } + } + + } + if p := context.GlobalString("pprof-address"); len(p) > 0 { + pprof.Enable(p) + } + if err := checkLimits(); err != nil { + return err + } + return nil + } app.Action = func(context *cli.Context) { if err := daemon(context); err != nil { @@ -183,3 +212,72 @@ func getDefaultID() string { } return hostname } + +func checkLimits() error { + var l syscall.Rlimit + if err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &l); err != nil { + return err + } + if l.Cur <= minRlimit { + logrus.WithFields(logrus.Fields{ + "current": l.Cur, + "max": l.Max, + }).Warn("containerd: low RLIMIT_NOFILE changing to max") + l.Cur = l.Max + return syscall.Setrlimit(syscall.RLIMIT_NOFILE, &l) + } + return nil +} + +func processMetrics() { + var ( + g = metrics.NewGauge() + fg = metrics.NewGauge() + memg = metrics.NewGauge() + ) + metrics.DefaultRegistry.Register("goroutines", g) + metrics.DefaultRegistry.Register("fds", fg) + metrics.DefaultRegistry.Register("memory-used", memg) + collect := func() { + // update number of goroutines + g.Update(int64(runtime.NumGoroutine())) + // collect the number of open fds + fds, err := osutils.GetOpenFds(os.Getpid()) + if err != nil { + logrus.WithField("error", err).Error("containerd: get open fd count") + } + fg.Update(int64(fds)) + // get the memory used + m := sigar.ProcMem{} + if err := m.Get(os.Getpid()); err != nil { + logrus.WithField("error", err).Error("containerd: get pid memory information") + } + memg.Update(int64(m.Size)) + } + go func() { + collect() + for range time.Tick(30 * time.Second) { + collect() + } + }() +} + +func debugMetrics(interval time.Duration, graphiteAddr string) error { + for name, m := range supervisor.Metrics() { + if err := metrics.DefaultRegistry.Register(name, m); err != nil { + return err + } + } + processMetrics() + if graphiteAddr != "" { + addr, err := net.ResolveTCPAddr("tcp", graphiteAddr) + if err != nil { + return err + } + go graphite.Graphite(metrics.DefaultRegistry, 10e9, "metrics", addr) + } else { + l := log.New(os.Stdout, "[containerd] ", log.LstdFlags) + go metrics.Log(metrics.DefaultRegistry, interval, l) + } + return nil +} diff --git a/containerd/main_linux.go b/containerd/main_linux.go deleted file mode 100644 index 02922c2..0000000 --- a/containerd/main_linux.go +++ /dev/null @@ -1,121 +0,0 @@ -package main - -import ( - "log" - "net" - "os" - "runtime" - "syscall" - "time" - - "github.com/Sirupsen/logrus" - "github.com/cloudfoundry/gosigar" - "github.com/codegangsta/cli" - "github.com/cyberdelia/go-metrics-graphite" - "github.com/docker/containerd/api/http/pprof" - "github.com/docker/containerd/osutils" - "github.com/docker/containerd/supervisor" - "github.com/rcrowley/go-metrics" -) - -const ( - defaultStateDir = "/run/containerd" - defaultGRPCEndpoint = "unix:///run/containerd/containerd.sock" -) - -func appendPlatformFlags() { - daemonFlags = append(daemonFlags, cli.StringFlag{ - Name: "graphite-address", - Usage: "Address of graphite server", - }) -} - -func setAppBefore(app *cli.App) { - app.Before = func(context *cli.Context) error { - if context.GlobalBool("debug") { - logrus.SetLevel(logrus.DebugLevel) - if context.GlobalDuration("metrics-interval") > 0 { - if err := debugMetrics(context.GlobalDuration("metrics-interval"), context.GlobalString("graphite-address")); err != nil { - return err - } - } - - } - if p := context.GlobalString("pprof-address"); len(p) > 0 { - pprof.Enable(p) - } - if err := checkLimits(); err != nil { - return err - } - return nil - } -} - -func checkLimits() error { - var l syscall.Rlimit - if err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &l); err != nil { - return err - } - if l.Cur <= minRlimit { - logrus.WithFields(logrus.Fields{ - "current": l.Cur, - "max": l.Max, - }).Warn("containerd: low RLIMIT_NOFILE changing to max") - l.Cur = l.Max - return syscall.Setrlimit(syscall.RLIMIT_NOFILE, &l) - } - return nil -} - -func processMetrics() { - var ( - g = metrics.NewGauge() - fg = metrics.NewGauge() - memg = metrics.NewGauge() - ) - metrics.DefaultRegistry.Register("goroutines", g) - metrics.DefaultRegistry.Register("fds", fg) - metrics.DefaultRegistry.Register("memory-used", memg) - collect := func() { - // update number of goroutines - g.Update(int64(runtime.NumGoroutine())) - // collect the number of open fds - fds, err := osutils.GetOpenFds(os.Getpid()) - if err != nil { - logrus.WithField("error", err).Error("containerd: get open fd count") - } - fg.Update(int64(fds)) - // get the memory used - m := sigar.ProcMem{} - if err := m.Get(os.Getpid()); err != nil { - logrus.WithField("error", err).Error("containerd: get pid memory information") - } - memg.Update(int64(m.Size)) - } - go func() { - collect() - for range time.Tick(30 * time.Second) { - collect() - } - }() -} - -func debugMetrics(interval time.Duration, graphiteAddr string) error { - for name, m := range supervisor.Metrics() { - if err := metrics.DefaultRegistry.Register(name, m); err != nil { - return err - } - } - processMetrics() - if graphiteAddr != "" { - addr, err := net.ResolveTCPAddr("tcp", graphiteAddr) - if err != nil { - return err - } - go graphite.Graphite(metrics.DefaultRegistry, 10e9, "metrics", addr) - } else { - l := log.New(os.Stdout, "[containerd] ", log.LstdFlags) - go metrics.Log(metrics.DefaultRegistry, interval, l) - } - return nil -} diff --git a/ctr/container.go b/ctr/container.go index b488f9c..1e9b57c 100644 --- a/ctr/container.go +++ b/ctr/container.go @@ -621,3 +621,23 @@ type stdio struct { stdout string stderr string } + +func createStdio() (s stdio, err error) { + tmp, err := ioutil.TempDir("", "ctr-") + if err != nil { + return s, err + } + // create fifo's for the process + for name, fd := range map[string]*string{ + "stdin": &s.stdin, + "stdout": &s.stdout, + "stderr": &s.stderr, + } { + path := filepath.Join(tmp, name) + if err := syscall.Mkfifo(path, 0755); err != nil && !os.IsExist(err) { + return s, err + } + *fd = path + } + return s, nil +} diff --git a/ctr/container_unix.go b/ctr/container_unix.go deleted file mode 100644 index bd31ecc..0000000 --- a/ctr/container_unix.go +++ /dev/null @@ -1,30 +0,0 @@ -// +build !windows - -package main - -import ( - "io/ioutil" - "os" - "path/filepath" - "syscall" -) - -func createStdio() (s stdio, err error) { - tmp, err := ioutil.TempDir("", "ctr-") - if err != nil { - return s, err - } - // create fifo's for the process - for name, fd := range map[string]*string{ - "stdin": &s.stdin, - "stdout": &s.stdout, - "stderr": &s.stderr, - } { - path := filepath.Join(tmp, name) - if err := syscall.Mkfifo(path, 0755); err != nil && !os.IsExist(err) { - return s, err - } - *fd = path - } - return s, nil -} diff --git a/ctr/container_windows.go b/ctr/container_windows.go deleted file mode 100644 index 9348b3f..0000000 --- a/ctr/container_windows.go +++ /dev/null @@ -1,6 +0,0 @@ -package main - -// TODO Windows: This will have a very different implementation -func createStdio() (s stdio, err error) { - return stdio{}, nil -} diff --git a/osutils/windows.go b/osutils/windows.go deleted file mode 100644 index 4908c05..0000000 --- a/osutils/windows.go +++ /dev/null @@ -1,9 +0,0 @@ -// +build windows - -package osutils - -// GetOpenFds returns the number of open fds for the process provided by pid -// Not supported on Windows (same as for docker daemon) -func GetOpenFds(pid int) (int, error) { - return -1, nil -} diff --git a/runtime/container.go b/runtime/container.go index 6a3db7d..933fc05 100644 --- a/runtime/container.go +++ b/runtime/container.go @@ -2,15 +2,20 @@ package runtime import ( "encoding/json" + "fmt" "io" "io/ioutil" "os" "os/exec" "path/filepath" + "strings" + "syscall" "time" "github.com/Sirupsen/logrus" "github.com/docker/containerd/specs" + "github.com/opencontainers/runc/libcontainer" + ocs "github.com/opencontainers/runtime-spec/specs-go" ) type Container interface { @@ -271,3 +276,485 @@ func (c *container) UpdateResources(r *Resource) error { config.Cgroups.Resources.MemorySwap = r.MemorySwap return container.Set(config) } + +func getRootIDs(s *specs.Spec) (int, int, error) { + if s == nil { + return 0, 0, nil + } + var hasUserns bool + for _, ns := range s.Linux.Namespaces { + if ns.Type == ocs.UserNamespace { + hasUserns = true + break + } + } + if !hasUserns { + return 0, 0, nil + } + uid := hostIDFromMap(0, s.Linux.UIDMappings) + gid := hostIDFromMap(0, s.Linux.GIDMappings) + return uid, gid, nil +} + +func (c *container) State() State { + proc := c.processes["init"] + if proc == nil { + return Stopped + } + return proc.State() +} + +func (c *container) Runtime() string { + return c.runtime +} + +func (c *container) Pause() error { + args := c.runtimeArgs + args = append(args, "pause", c.id) + b, err := exec.Command(c.runtime, args...).CombinedOutput() + if err != nil { + return fmt.Errorf(string(b)) + } + return nil +} + +func (c *container) Resume() error { + args := c.runtimeArgs + args = append(args, "resume", c.id) + b, err := exec.Command(c.runtime, args...).CombinedOutput() + if err != nil { + return fmt.Errorf(string(b)) + } + return nil +} + +func (c *container) Checkpoints() ([]Checkpoint, error) { + dirs, err := ioutil.ReadDir(filepath.Join(c.bundle, "checkpoints")) + if err != nil { + return nil, err + } + var out []Checkpoint + for _, d := range dirs { + if !d.IsDir() { + continue + } + path := filepath.Join(c.bundle, "checkpoints", d.Name(), "config.json") + data, err := ioutil.ReadFile(path) + if err != nil { + return nil, err + } + var cpt Checkpoint + if err := json.Unmarshal(data, &cpt); err != nil { + return nil, err + } + out = append(out, cpt) + } + return out, nil +} + +func (c *container) Checkpoint(cpt Checkpoint) error { + if err := os.MkdirAll(filepath.Join(c.bundle, "checkpoints"), 0755); err != nil { + return err + } + path := filepath.Join(c.bundle, "checkpoints", cpt.Name) + if err := os.Mkdir(path, 0755); err != nil { + return err + } + f, err := os.Create(filepath.Join(path, "config.json")) + if err != nil { + return err + } + cpt.Created = time.Now() + err = json.NewEncoder(f).Encode(cpt) + f.Close() + if err != nil { + return err + } + args := []string{ + "checkpoint", + "--image-path", path, + } + add := func(flags ...string) { + args = append(args, flags...) + } + add(c.runtimeArgs...) + if !cpt.Exit { + add("--leave-running") + } + if cpt.Shell { + add("--shell-job") + } + if cpt.Tcp { + add("--tcp-established") + } + if cpt.UnixSockets { + add("--ext-unix-sk") + } + add(c.id) + out, err := exec.Command(c.runtime, args...).CombinedOutput() + if err != nil { + return fmt.Errorf("%s: %s", err.Error(), string(out)) + } + return err +} + +func (c *container) DeleteCheckpoint(name string) error { + return os.RemoveAll(filepath.Join(c.bundle, "checkpoints", name)) +} + +func (c *container) Start(checkpoint string, s Stdio) (Process, error) { + processRoot := filepath.Join(c.root, c.id, InitProcessID) + if err := os.Mkdir(processRoot, 0755); err != nil { + return nil, err + } + cmd := exec.Command(c.shim, + c.id, c.bundle, c.runtime, + ) + cmd.Dir = processRoot + cmd.SysProcAttr = &syscall.SysProcAttr{ + Setpgid: true, + } + spec, err := c.readSpec() + if err != nil { + return nil, err + } + config := &processConfig{ + checkpoint: checkpoint, + root: processRoot, + id: InitProcessID, + c: c, + stdio: s, + spec: spec, + processSpec: specs.ProcessSpec(spec.Process), + } + p, err := newProcess(config) + if err != nil { + return nil, err + } + if err := c.startCmd(InitProcessID, cmd, p); err != nil { + return nil, err + } + return p, nil +} + +func (c *container) Exec(pid string, pspec specs.ProcessSpec, s Stdio) (pp Process, err error) { + processRoot := filepath.Join(c.root, c.id, pid) + if err := os.Mkdir(processRoot, 0755); err != nil { + return nil, err + } + defer func() { + if err != nil { + c.RemoveProcess(pid) + } + }() + cmd := exec.Command(c.shim, + c.id, c.bundle, c.runtime, + ) + cmd.Dir = processRoot + cmd.SysProcAttr = &syscall.SysProcAttr{ + Setpgid: true, + } + spec, err := c.readSpec() + if err != nil { + return nil, err + } + config := &processConfig{ + exec: true, + id: pid, + root: processRoot, + c: c, + processSpec: pspec, + spec: spec, + stdio: s, + } + p, err := newProcess(config) + if err != nil { + return nil, err + } + if err := c.startCmd(pid, cmd, p); err != nil { + return nil, err + } + return p, nil +} + +func (c *container) startCmd(pid string, cmd *exec.Cmd, p *process) error { + if err := cmd.Start(); err != nil { + if exErr, ok := err.(*exec.Error); ok { + if exErr.Err == exec.ErrNotFound || exErr.Err == os.ErrNotExist { + return fmt.Errorf("%s not installed on system", c.shim) + } + } + return err + } + if err := c.waitForStart(p, cmd); err != nil { + return err + } + c.processes[pid] = p + return nil +} + +func (c *container) getLibctContainer() (libcontainer.Container, error) { + runtimeRoot := "/run/runc" + + // Check that the root wasn't changed + for _, opt := range c.runtimeArgs { + if strings.HasPrefix(opt, "--root=") { + runtimeRoot = strings.TrimPrefix(opt, "--root=") + break + } + } + + f, err := libcontainer.New(runtimeRoot, libcontainer.Cgroupfs) + if err != nil { + return nil, err + } + return f.Load(c.id) +} + +func hostIDFromMap(id uint32, mp []ocs.IDMapping) int { + for _, m := range mp { + if (id >= m.ContainerID) && (id <= (m.ContainerID + m.Size - 1)) { + return int(m.HostID + (id - m.ContainerID)) + } + } + return 0 +} + +func (c *container) Pids() ([]int, error) { + container, err := c.getLibctContainer() + if err != nil { + return nil, err + } + return container.Processes() +} + +func (c *container) Stats() (*Stat, error) { + container, err := c.getLibctContainer() + if err != nil { + return nil, err + } + now := time.Now() + stats, err := container.Stats() + if err != nil { + return nil, err + } + return &Stat{ + Timestamp: now, + Data: stats, + }, nil +} + +func (c *container) OOM() (OOM, error) { + container, err := c.getLibctContainer() + if err != nil { + if lerr, ok := err.(libcontainer.Error); ok { + // with oom registration sometimes the container can run, exit, and be destroyed + // faster than we can get the state back so we can just ignore this + if lerr.Code() == libcontainer.ContainerNotExists { + return nil, ErrContainerExited + } + } + return nil, err + } + state, err := container.State() + if err != nil { + return nil, err + } + memoryPath := state.CgroupPaths["memory"] + return c.getMemeoryEventFD(memoryPath) +} + +// Status implements the runtime Container interface. +func (c *container) Status() (State, error) { + args := c.runtimeArgs + args = append(args, "state", c.id) + + out, err := exec.Command(c.runtime, args...).CombinedOutput() + if err != nil { + return "", fmt.Errorf(string(out)) + } + + // We only require the runtime json output to have a top level Status field. + var s struct { + Status State `json:"status"` + } + if err := json.Unmarshal(out, &s); err != nil { + return "", err + } + return s.Status, nil +} + +func (c *container) getMemeoryEventFD(root string) (*oom, error) { + f, err := os.Open(filepath.Join(root, "memory.oom_control")) + if err != nil { + return nil, err + } + fd, _, serr := syscall.RawSyscall(syscall.SYS_EVENTFD2, 0, syscall.FD_CLOEXEC, 0) + if serr != 0 { + f.Close() + return nil, serr + } + if err := c.writeEventFD(root, int(f.Fd()), int(fd)); err != nil { + syscall.Close(int(fd)) + f.Close() + return nil, err + } + return &oom{ + root: root, + id: c.id, + eventfd: int(fd), + control: f, + }, nil +} + +func (c *container) writeEventFD(root string, cfd, efd int) error { + f, err := os.OpenFile(filepath.Join(root, "cgroup.event_control"), os.O_WRONLY, 0) + if err != nil { + return err + } + defer f.Close() + _, err = f.WriteString(fmt.Sprintf("%d %d", efd, cfd)) + return err +} + +type waitArgs struct { + pid int + err error +} + +func (c *container) waitForStart(p *process, cmd *exec.Cmd) error { + wc := make(chan error, 1) + go func() { + for { + if _, err := p.getPidFromFile(); err != nil { + if os.IsNotExist(err) || err == errInvalidPidInt { + alive, err := isAlive(cmd) + if err != nil { + wc <- err + return + } + if !alive { + // runc could have failed to run the container so lets get the error + // out of the logs or the shim could have encountered an error + messages, err := readLogMessages(filepath.Join(p.root, "shim-log.json")) + if err != nil { + wc <- err + return + } + for _, m := range messages { + if m.Level == "error" { + wc <- fmt.Errorf("shim error: %v", m.Msg) + return + } + } + // no errors reported back from shim, check for runc/runtime errors + messages, err = readLogMessages(filepath.Join(p.root, "log.json")) + if err != nil { + if os.IsNotExist(err) { + err = ErrContainerNotStarted + } + wc <- err + return + } + for _, m := range messages { + if m.Level == "error" { + wc <- fmt.Errorf("oci runtime error: %v", m.Msg) + return + } + } + wc <- ErrContainerNotStarted + return + } + time.Sleep(15 * time.Millisecond) + continue + } + wc <- err + return + } + // the pid file was read successfully + wc <- nil + return + } + }() + select { + case err := <-wc: + if err != nil { + return err + } + return nil + case <-time.After(c.timeout): + cmd.Process.Kill() + cmd.Wait() + return ErrContainerStartTimeout + } +} + +// isAlive checks if the shim that launched the container is still alive +func isAlive(cmd *exec.Cmd) (bool, error) { + if err := syscall.Kill(cmd.Process.Pid, 0); err != nil { + if err == syscall.ESRCH { + return false, nil + } + return false, err + } + return true, nil +} + +type oom struct { + id string + root string + control *os.File + eventfd int +} + +func (o *oom) ContainerID() string { + return o.id +} + +func (o *oom) FD() int { + return o.eventfd +} + +func (o *oom) Flush() { + buf := make([]byte, 8) + syscall.Read(o.eventfd, buf) +} + +func (o *oom) Removed() bool { + _, err := os.Lstat(filepath.Join(o.root, "cgroup.event_control")) + return os.IsNotExist(err) +} + +func (o *oom) Close() error { + err := syscall.Close(o.eventfd) + if cerr := o.control.Close(); err == nil { + err = cerr + } + return err +} + +type message struct { + Level string `json:"level"` + Msg string `json:"msg"` +} + +func readLogMessages(path string) ([]message, error) { + var out []message + f, err := os.Open(path) + if err != nil { + return nil, err + } + defer f.Close() + dec := json.NewDecoder(f) + for { + var m message + if err := dec.Decode(&m); err != nil { + if err == io.EOF { + break + } + return nil, err + } + out = append(out, m) + } + return out, nil +} diff --git a/runtime/container_linux.go b/runtime/container_linux.go deleted file mode 100644 index 4f03219..0000000 --- a/runtime/container_linux.go +++ /dev/null @@ -1,500 +0,0 @@ -package runtime - -import ( - "encoding/json" - "fmt" - "io" - "io/ioutil" - "os" - "os/exec" - "path/filepath" - "strings" - "syscall" - "time" - - "github.com/docker/containerd/specs" - "github.com/opencontainers/runc/libcontainer" - ocs "github.com/opencontainers/runtime-spec/specs-go" -) - -func getRootIDs(s *specs.Spec) (int, int, error) { - if s == nil { - return 0, 0, nil - } - var hasUserns bool - for _, ns := range s.Linux.Namespaces { - if ns.Type == ocs.UserNamespace { - hasUserns = true - break - } - } - if !hasUserns { - return 0, 0, nil - } - uid := hostIDFromMap(0, s.Linux.UIDMappings) - gid := hostIDFromMap(0, s.Linux.GIDMappings) - return uid, gid, nil -} - -func (c *container) State() State { - proc := c.processes["init"] - if proc == nil { - return Stopped - } - return proc.State() -} - -func (c *container) Runtime() string { - return c.runtime -} - -func (c *container) Pause() error { - args := c.runtimeArgs - args = append(args, "pause", c.id) - b, err := exec.Command(c.runtime, args...).CombinedOutput() - if err != nil { - return fmt.Errorf(string(b)) - } - return nil -} - -func (c *container) Resume() error { - args := c.runtimeArgs - args = append(args, "resume", c.id) - b, err := exec.Command(c.runtime, args...).CombinedOutput() - if err != nil { - return fmt.Errorf(string(b)) - } - return nil -} - -func (c *container) Checkpoints() ([]Checkpoint, error) { - dirs, err := ioutil.ReadDir(filepath.Join(c.bundle, "checkpoints")) - if err != nil { - return nil, err - } - var out []Checkpoint - for _, d := range dirs { - if !d.IsDir() { - continue - } - path := filepath.Join(c.bundle, "checkpoints", d.Name(), "config.json") - data, err := ioutil.ReadFile(path) - if err != nil { - return nil, err - } - var cpt Checkpoint - if err := json.Unmarshal(data, &cpt); err != nil { - return nil, err - } - out = append(out, cpt) - } - return out, nil -} - -func (c *container) Checkpoint(cpt Checkpoint) error { - if err := os.MkdirAll(filepath.Join(c.bundle, "checkpoints"), 0755); err != nil { - return err - } - path := filepath.Join(c.bundle, "checkpoints", cpt.Name) - if err := os.Mkdir(path, 0755); err != nil { - return err - } - f, err := os.Create(filepath.Join(path, "config.json")) - if err != nil { - return err - } - cpt.Created = time.Now() - err = json.NewEncoder(f).Encode(cpt) - f.Close() - if err != nil { - return err - } - args := []string{ - "checkpoint", - "--image-path", path, - } - add := func(flags ...string) { - args = append(args, flags...) - } - add(c.runtimeArgs...) - if !cpt.Exit { - add("--leave-running") - } - if cpt.Shell { - add("--shell-job") - } - if cpt.Tcp { - add("--tcp-established") - } - if cpt.UnixSockets { - add("--ext-unix-sk") - } - add(c.id) - out, err := exec.Command(c.runtime, args...).CombinedOutput() - if err != nil { - return fmt.Errorf("%s: %s", err.Error(), string(out)) - } - return err -} - -func (c *container) DeleteCheckpoint(name string) error { - return os.RemoveAll(filepath.Join(c.bundle, "checkpoints", name)) -} - -func (c *container) Start(checkpoint string, s Stdio) (Process, error) { - processRoot := filepath.Join(c.root, c.id, InitProcessID) - if err := os.Mkdir(processRoot, 0755); err != nil { - return nil, err - } - cmd := exec.Command(c.shim, - c.id, c.bundle, c.runtime, - ) - cmd.Dir = processRoot - cmd.SysProcAttr = &syscall.SysProcAttr{ - Setpgid: true, - } - spec, err := c.readSpec() - if err != nil { - return nil, err - } - config := &processConfig{ - checkpoint: checkpoint, - root: processRoot, - id: InitProcessID, - c: c, - stdio: s, - spec: spec, - processSpec: specs.ProcessSpec(spec.Process), - } - p, err := newProcess(config) - if err != nil { - return nil, err - } - if err := c.startCmd(InitProcessID, cmd, p); err != nil { - return nil, err - } - return p, nil -} - -func (c *container) Exec(pid string, pspec specs.ProcessSpec, s Stdio) (pp Process, err error) { - processRoot := filepath.Join(c.root, c.id, pid) - if err := os.Mkdir(processRoot, 0755); err != nil { - return nil, err - } - defer func() { - if err != nil { - c.RemoveProcess(pid) - } - }() - cmd := exec.Command(c.shim, - c.id, c.bundle, c.runtime, - ) - cmd.Dir = processRoot - cmd.SysProcAttr = &syscall.SysProcAttr{ - Setpgid: true, - } - spec, err := c.readSpec() - if err != nil { - return nil, err - } - config := &processConfig{ - exec: true, - id: pid, - root: processRoot, - c: c, - processSpec: pspec, - spec: spec, - stdio: s, - } - p, err := newProcess(config) - if err != nil { - return nil, err - } - if err := c.startCmd(pid, cmd, p); err != nil { - return nil, err - } - return p, nil -} - -func (c *container) startCmd(pid string, cmd *exec.Cmd, p *process) error { - if err := cmd.Start(); err != nil { - if exErr, ok := err.(*exec.Error); ok { - if exErr.Err == exec.ErrNotFound || exErr.Err == os.ErrNotExist { - return fmt.Errorf("%s not installed on system", c.shim) - } - } - return err - } - if err := c.waitForStart(p, cmd); err != nil { - return err - } - c.processes[pid] = p - return nil -} - -func (c *container) getLibctContainer() (libcontainer.Container, error) { - runtimeRoot := "/run/runc" - - // Check that the root wasn't changed - for _, opt := range c.runtimeArgs { - if strings.HasPrefix(opt, "--root=") { - runtimeRoot = strings.TrimPrefix(opt, "--root=") - break - } - } - - f, err := libcontainer.New(runtimeRoot, libcontainer.Cgroupfs) - if err != nil { - return nil, err - } - return f.Load(c.id) -} - -func hostIDFromMap(id uint32, mp []ocs.IDMapping) int { - for _, m := range mp { - if (id >= m.ContainerID) && (id <= (m.ContainerID + m.Size - 1)) { - return int(m.HostID + (id - m.ContainerID)) - } - } - return 0 -} - -func (c *container) Pids() ([]int, error) { - container, err := c.getLibctContainer() - if err != nil { - return nil, err - } - return container.Processes() -} - -func (c *container) Stats() (*Stat, error) { - container, err := c.getLibctContainer() - if err != nil { - return nil, err - } - now := time.Now() - stats, err := container.Stats() - if err != nil { - return nil, err - } - return &Stat{ - Timestamp: now, - Data: stats, - }, nil -} - -// Status implements the runtime Container interface. -func (c *container) Status() (State, error) { - args := c.runtimeArgs - args = append(args, "state", c.id) - - out, err := exec.Command(c.runtime, args...).CombinedOutput() - if err != nil { - return "", fmt.Errorf(string(out)) - } - - // We only require the runtime json output to have a top level Status field. - var s struct { - Status State `json:"status"` - } - if err := json.Unmarshal(out, &s); err != nil { - return "", err - } - return s.Status, nil -} - -func (c *container) OOM() (OOM, error) { - container, err := c.getLibctContainer() - if err != nil { - if lerr, ok := err.(libcontainer.Error); ok { - // with oom registration sometimes the container can run, exit, and be destroyed - // faster than we can get the state back so we can just ignore this - if lerr.Code() == libcontainer.ContainerNotExists { - return nil, ErrContainerExited - } - } - return nil, err - } - state, err := container.State() - if err != nil { - return nil, err - } - memoryPath := state.CgroupPaths["memory"] - return c.getMemeoryEventFD(memoryPath) -} - -func (c *container) getMemeoryEventFD(root string) (*oom, error) { - f, err := os.Open(filepath.Join(root, "memory.oom_control")) - if err != nil { - return nil, err - } - fd, _, serr := syscall.RawSyscall(syscall.SYS_EVENTFD2, 0, syscall.FD_CLOEXEC, 0) - if serr != 0 { - f.Close() - return nil, serr - } - if err := c.writeEventFD(root, int(f.Fd()), int(fd)); err != nil { - syscall.Close(int(fd)) - f.Close() - return nil, err - } - return &oom{ - root: root, - id: c.id, - eventfd: int(fd), - control: f, - }, nil -} - -func (c *container) writeEventFD(root string, cfd, efd int) error { - f, err := os.OpenFile(filepath.Join(root, "cgroup.event_control"), os.O_WRONLY, 0) - if err != nil { - return err - } - defer f.Close() - _, err = f.WriteString(fmt.Sprintf("%d %d", efd, cfd)) - return err -} - -type waitArgs struct { - pid int - err error -} - -func (c *container) waitForStart(p *process, cmd *exec.Cmd) error { - wc := make(chan error, 1) - go func() { - for { - if _, err := p.getPidFromFile(); err != nil { - if os.IsNotExist(err) || err == errInvalidPidInt { - alive, err := isAlive(cmd) - if err != nil { - wc <- err - return - } - if !alive { - // runc could have failed to run the container so lets get the error - // out of the logs or the shim could have encountered an error - messages, err := readLogMessages(filepath.Join(p.root, "shim-log.json")) - if err != nil { - wc <- err - return - } - for _, m := range messages { - if m.Level == "error" { - wc <- fmt.Errorf("shim error: %v", m.Msg) - return - } - } - // no errors reported back from shim, check for runc/runtime errors - messages, err = readLogMessages(filepath.Join(p.root, "log.json")) - if err != nil { - if os.IsNotExist(err) { - err = ErrContainerNotStarted - } - wc <- err - return - } - for _, m := range messages { - if m.Level == "error" { - wc <- fmt.Errorf("oci runtime error: %v", m.Msg) - return - } - } - wc <- ErrContainerNotStarted - return - } - time.Sleep(15 * time.Millisecond) - continue - } - wc <- err - return - } - // the pid file was read successfully - wc <- nil - return - } - }() - select { - case err := <-wc: - if err != nil { - return err - } - return nil - case <-time.After(c.timeout): - cmd.Process.Kill() - cmd.Wait() - return ErrContainerStartTimeout - } -} - -// isAlive checks if the shim that launched the container is still alive -func isAlive(cmd *exec.Cmd) (bool, error) { - if err := syscall.Kill(cmd.Process.Pid, 0); err != nil { - if err == syscall.ESRCH { - return false, nil - } - return false, err - } - return true, nil -} - -type oom struct { - id string - root string - control *os.File - eventfd int -} - -func (o *oom) ContainerID() string { - return o.id -} - -func (o *oom) FD() int { - return o.eventfd -} - -func (o *oom) Flush() { - buf := make([]byte, 8) - syscall.Read(o.eventfd, buf) -} - -func (o *oom) Removed() bool { - _, err := os.Lstat(filepath.Join(o.root, "cgroup.event_control")) - return os.IsNotExist(err) -} - -func (o *oom) Close() error { - err := syscall.Close(o.eventfd) - if cerr := o.control.Close(); err == nil { - err = cerr - } - return err -} - -type message struct { - Level string `json:"level"` - Msg string `json:"msg"` -} - -func readLogMessages(path string) ([]message, error) { - var out []message - f, err := os.Open(path) - if err != nil { - return nil, err - } - defer f.Close() - dec := json.NewDecoder(f) - for { - var m message - if err := dec.Decode(&m); err != nil { - if err == io.EOF { - break - } - return nil, err - } - out = append(out, m) - } - return out, nil -} diff --git a/runtime/container_windows.go b/runtime/container_windows.go deleted file mode 100644 index 6a35400..0000000 --- a/runtime/container_windows.go +++ /dev/null @@ -1,71 +0,0 @@ -package runtime - -import ( - "errors" - - "github.com/docker/containerd/specs" -) - -func getRootIDs(s *specs.PlatformSpec) (int, int, error) { - return 0, 0, nil -} - -// TODO Windows: This will have a different implementation -func (c *container) State() State { - return Running // HACK HACK HACK -} - -func (c *container) Runtime() string { - return "windows" -} - -func (c *container) Pause() error { - return errors.New("Pause not supported on Windows") -} - -func (c *container) Resume() error { - return errors.New("Resume not supported on Windows") -} - -func (c *container) Checkpoints() ([]Checkpoint, error) { - return nil, errors.New("Checkpoints not supported on Windows ") -} - -func (c *container) Checkpoint(cpt Checkpoint) error { - return errors.New("Checkpoint not supported on Windows ") -} - -func (c *container) DeleteCheckpoint(name string) error { - return errors.New("DeleteCheckpoint not supported on Windows ") -} - -// TODO Windows: Implement me. -// This will have a very different implementation on Windows. -func (c *container) Start(checkpoint string, s Stdio) (Process, error) { - return nil, errors.New("Start not yet implemented on Windows") -} - -// TODO Windows: Implement me. -// This will have a very different implementation on Windows. -func (c *container) Exec(pid string, spec specs.ProcessSpec, s Stdio) (Process, error) { - return nil, errors.New("Exec not yet implemented on Windows") -} - -// TODO Windows: Implement me. -func (c *container) Pids() ([]int, error) { - return nil, errors.New("Pids not yet implemented on Windows") -} - -// TODO Windows: Implement me. (Not yet supported by docker on Windows either...) -func (c *container) Stats() (*Stat, error) { - return nil, errors.New("Stats not yet implemented on Windows") -} - -// Status implements the runtime Container interface. -func (c *container) Status() (State, error) { - return "", errors.New("Status not yet implemented on Windows") -} - -func (c *container) OOM() (OOM, error) { - return nil, errors.New("OOM not yet implemented on Windows") -} diff --git a/runtime/process.go b/runtime/process.go index 082996e..4b40b21 100644 --- a/runtime/process.go +++ b/runtime/process.go @@ -70,7 +70,21 @@ func newProcess(config *processConfig) (*process, error) { } defer f.Close() - ps := populateProcessStateForEncoding(config, uid, gid) + ps := ProcessState{ + ProcessSpec: config.processSpec, + Exec: config.exec, + PlatformProcessState: PlatformProcessState{ + Checkpoint: config.checkpoint, + RootUID: uid, + RootGID: gid, + }, + Stdin: config.stdio.Stdin, + Stdout: config.stdio.Stdout, + Stderr: config.stdio.Stderr, + RuntimeArgs: config.c.runtimeArgs, + NoPivotRoot: config.c.noPivotRoot, + } + if err := json.NewEncoder(f).Encode(ps); err != nil { return nil, err } @@ -204,3 +218,24 @@ func (p *process) getPidFromFile() (int, error) { p.pid = i return i, nil } + +func getExitPipe(path string) (*os.File, error) { + if err := syscall.Mkfifo(path, 0755); err != nil && !os.IsExist(err) { + return nil, err + } + // add NONBLOCK in case the other side has already closed or else + // this function would never return + return os.OpenFile(path, syscall.O_RDONLY|syscall.O_NONBLOCK, 0) +} + +func getControlPipe(path string) (*os.File, error) { + if err := syscall.Mkfifo(path, 0755); err != nil && !os.IsExist(err) { + return nil, err + } + return os.OpenFile(path, syscall.O_RDWR|syscall.O_NONBLOCK, 0) +} + +// Signal sends the provided signal to the process +func (p *process) Signal(s os.Signal) error { + return syscall.Kill(p.pid, s.(syscall.Signal)) +} diff --git a/runtime/process_linux.go b/runtime/process_linux.go deleted file mode 100644 index 44c9a95..0000000 --- a/runtime/process_linux.go +++ /dev/null @@ -1,44 +0,0 @@ -package runtime - -import ( - "os" - "syscall" -) - -func getExitPipe(path string) (*os.File, error) { - if err := syscall.Mkfifo(path, 0755); err != nil && !os.IsExist(err) { - return nil, err - } - // add NONBLOCK in case the other side has already closed or else - // this function would never return - return os.OpenFile(path, syscall.O_RDONLY|syscall.O_NONBLOCK, 0) -} - -func getControlPipe(path string) (*os.File, error) { - if err := syscall.Mkfifo(path, 0755); err != nil && !os.IsExist(err) { - return nil, err - } - return os.OpenFile(path, syscall.O_RDWR|syscall.O_NONBLOCK, 0) -} - -// Signal sends the provided signal to the process -func (p *process) Signal(s os.Signal) error { - return syscall.Kill(p.pid, s.(syscall.Signal)) -} - -func populateProcessStateForEncoding(config *processConfig, uid int, gid int) ProcessState { - return ProcessState{ - ProcessSpec: config.processSpec, - Exec: config.exec, - PlatformProcessState: PlatformProcessState{ - Checkpoint: config.checkpoint, - RootUID: uid, - RootGID: gid, - }, - Stdin: config.stdio.Stdin, - Stdout: config.stdio.Stdout, - Stderr: config.stdio.Stderr, - RuntimeArgs: config.c.runtimeArgs, - NoPivotRoot: config.c.noPivotRoot, - } -} diff --git a/runtime/process_windows.go b/runtime/process_windows.go deleted file mode 100644 index e435352..0000000 --- a/runtime/process_windows.go +++ /dev/null @@ -1,29 +0,0 @@ -package runtime - -import "os" - -// TODO Windows: Linux uses syscalls which don't map to Windows. Needs alternate mechanism -func getExitPipe(path string) (*os.File, error) { - return nil, nil -} - -// TODO Windows: Linux uses syscalls which don't map to Windows. Needs alternate mechanism -func getControlPipe(path string) (*os.File, error) { - return nil, nil -} - -// TODO Windows. Windows does not support signals. Need alternate mechanism -// Signal sends the provided signal to the process -func (p *process) Signal(s os.Signal) error { - return nil -} - -func populateProcessStateForEncoding(config *processConfig, uid int, gid int) ProcessState { - return ProcessState{ - ProcessSpec: config.processSpec, - Exec: config.exec, - Stdin: config.stdio.Stdin, - Stdout: config.stdio.Stdout, - Stderr: config.stdio.Stderr, - } -} diff --git a/runtime/runtime.go b/runtime/runtime.go index 77a0865..fff7b29 100644 --- a/runtime/runtime.go +++ b/runtime/runtime.go @@ -32,6 +32,27 @@ const ( InitProcessID = "init" ) +type Checkpoint struct { + // Timestamp is the time that checkpoint happened + Created time.Time `json:"created"` + // Name is the name of the checkpoint + Name string `json:"name"` + // Tcp checkpoints open tcp connections + Tcp bool `json:"tcp"` + // UnixSockets persists unix sockets in the checkpoint + UnixSockets bool `json:"unixSockets"` + // Shell persists tty sessions in the checkpoint + Shell bool `json:"shell"` + // Exit exits the container after the checkpoint is finished + Exit bool `json:"exit"` +} + +// PlatformProcessState container platform-specific fields in the ProcessState structure +type PlatformProcessState struct { + Checkpoint string `json:"checkpoint"` + RootUID int `json:"rootUID"` + RootGID int `json:"rootGID"` +} type State string type Resource struct { diff --git a/runtime/runtime_linux.go b/runtime/runtime_linux.go deleted file mode 100644 index b2d925a..0000000 --- a/runtime/runtime_linux.go +++ /dev/null @@ -1,25 +0,0 @@ -package runtime - -import "time" - -type Checkpoint struct { - // Timestamp is the time that checkpoint happened - Created time.Time `json:"created"` - // Name is the name of the checkpoint - Name string `json:"name"` - // Tcp checkpoints open tcp connections - Tcp bool `json:"tcp"` - // UnixSockets persists unix sockets in the checkpoint - UnixSockets bool `json:"unixSockets"` - // Shell persists tty sessions in the checkpoint - Shell bool `json:"shell"` - // Exit exits the container after the checkpoint is finished - Exit bool `json:"exit"` -} - -// PlatformProcessState container platform-specific fields in the ProcessState structure -type PlatformProcessState struct { - Checkpoint string `json:"checkpoint"` - RootUID int `json:"rootUID"` - RootGID int `json:"rootGID"` -} diff --git a/runtime/runtime_windows.go b/runtime/runtime_windows.go deleted file mode 100644 index b3d9218..0000000 --- a/runtime/runtime_windows.go +++ /dev/null @@ -1,10 +0,0 @@ -package runtime - -// Checkpoint is not supported on Windows. -// TODO Windows: Can eventually be factored out entirely. -type Checkpoint struct { -} - -// PlatformProcessState container platform-specific fields in the ProcessState structure -type PlatformProcessState struct { -} diff --git a/specs/spec_windows.go b/specs/spec_windows.go deleted file mode 100644 index 006a0ef..0000000 --- a/specs/spec_windows.go +++ /dev/null @@ -1,56 +0,0 @@ -package specs - -// Temporary Windows version of the spec in lieu of opencontainers/runtime-spec/specs-go having -// Windows support currently. - -type ( - PlatformSpec WindowsSpec - ProcessSpec Process -) - -// This is a temporary module in lieu of opencontainers/runtime-spec/specs-go being compatible -// currently on Windows. - -// Process contains information to start a specific application inside the container. -type Process struct { - // Terminal creates an interactive terminal for the container. - Terminal bool `json:"terminal"` - // User specifies user information for the process. - // TEMPORARY HACK User User `json:"user"` - // Args specifies the binary and arguments for the application to execute. - Args []string `json:"args"` - // Env populates the process environment for the process. - Env []string `json:"env,omitempty"` - // Cwd is the current working directory for the process and must be - // relative to the container's root. - Cwd string `json:"cwd"` -} - -type Spec struct { - // Version is the version of the specification that is supported. - Version string `json:"ociVersion"` - // Platform is the host information for OS and Arch. - // TEMPORARY HACK Platform Platform `json:"platform"` - // Process is the container's main process. - Process Process `json:"process"` - // Root is the root information for the container's filesystem. - // TEMPORARY HACK Root Root `json:"root"` - // Hostname is the container's host name. - // TEMPORARY HACK Hostname string `json:"hostname,omitempty"` - // Mounts profile configuration for adding mounts to the container's filesystem. - // TEMPORARY HACK Mounts []Mount `json:"mounts"` - // Hooks are the commands run at various lifecycle events of the container. - // TEMPORARY HACK Hooks Hooks `json:"hooks"` -} - -// Windows contains platform specific configuration for Windows based containers. -type Windows struct { -} - -// TODO Windows - Interim hack. Needs implementing. -type WindowsSpec struct { - Spec - - // Windows is platform specific configuration for Windows based containers. - Windows Windows `json:"windows"` -} diff --git a/supervisor/create.go b/supervisor/create.go index e51fe8d..817ed37 100644 --- a/supervisor/create.go +++ b/supervisor/create.go @@ -8,7 +8,6 @@ import ( type StartTask struct { baseTask - platformStartTask ID string BundlePath string Stdout string @@ -17,6 +16,7 @@ type StartTask struct { StartResponse chan StartResponse Labels []string NoPivotRoot bool + Checkpoint *runtime.Checkpoint } func (s *Supervisor) start(t *StartTask) error { @@ -47,7 +47,9 @@ func (s *Supervisor) start(t *StartTask) error { Stdout: t.Stdout, Stderr: t.Stderr, } - task.setTaskCheckpoint(t) + if t.Checkpoint != nil { + task.Checkpoint = t.Checkpoint.Name + } s.startTasks <- task ContainerCreateTimer.UpdateSince(start) diff --git a/supervisor/create_linux.go b/supervisor/create_linux.go deleted file mode 100644 index 67c570c..0000000 --- a/supervisor/create_linux.go +++ /dev/null @@ -1,13 +0,0 @@ -package supervisor - -import "github.com/docker/containerd/runtime" - -type platformStartTask struct { - Checkpoint *runtime.Checkpoint -} - -func (task *startTask) setTaskCheckpoint(t *StartTask) { - if t.Checkpoint != nil { - task.Checkpoint = t.Checkpoint.Name - } -} diff --git a/supervisor/create_windows.go b/supervisor/create_windows.go deleted file mode 100644 index 2d76396..0000000 --- a/supervisor/create_windows.go +++ /dev/null @@ -1,8 +0,0 @@ -package supervisor - -type platformStartTask struct { -} - -// Checkpoint not supported on Windows -func (task *startTask) setTaskCheckpoint(t *StartTask) { -} diff --git a/supervisor/machine.go b/supervisor/machine.go index f6045a7..1dcada5 100644 --- a/supervisor/machine.go +++ b/supervisor/machine.go @@ -1,6 +1,23 @@ package supervisor +import "github.com/cloudfoundry/gosigar" + type Machine struct { Cpus int Memory int64 } + +func CollectMachineInformation() (Machine, error) { + m := Machine{} + cpu := sigar.CpuList{} + if err := cpu.Get(); err != nil { + return m, err + } + m.Cpus = len(cpu.List) + mem := sigar.Mem{} + if err := mem.Get(); err != nil { + return m, err + } + m.Memory = int64(mem.Total / 1024 / 1024) + return m, nil +} diff --git a/supervisor/machine_linux.go b/supervisor/machine_linux.go deleted file mode 100644 index 5cbec23..0000000 --- a/supervisor/machine_linux.go +++ /dev/null @@ -1,18 +0,0 @@ -package supervisor - -import "github.com/cloudfoundry/gosigar" - -func CollectMachineInformation() (Machine, error) { - m := Machine{} - cpu := sigar.CpuList{} - if err := cpu.Get(); err != nil { - return m, err - } - m.Cpus = len(cpu.List) - mem := sigar.Mem{} - if err := mem.Get(); err != nil { - return m, err - } - m.Memory = int64(mem.Total / 1024 / 1024) - return m, nil -} diff --git a/supervisor/machine_windows.go b/supervisor/machine_windows.go deleted file mode 100644 index 90d47c8..0000000 --- a/supervisor/machine_windows.go +++ /dev/null @@ -1,5 +0,0 @@ -package supervisor - -func CollectMachineInformation() (Machine, error) { - return Machine{}, nil -} diff --git a/supervisor/monitor_windows.go b/supervisor/monitor_windows.go deleted file mode 100644 index 71db76e..0000000 --- a/supervisor/monitor_windows.go +++ /dev/null @@ -1,34 +0,0 @@ -package supervisor - -import ( - "errors" - - "github.com/docker/containerd/runtime" -) - -// TODO Windows: This is going to be very problematic to port to Windows. -// Windows golang has no concept of EpollEvent/EpollCtl etc as in the -// Linux implementation. @crosbymichael - Help needed. - -func NewMonitor() (*Monitor, error) { - // During Windows bring-up, don't error out other binary bombs immediately. - return &Monitor{}, nil -} - -type Monitor struct { -} - -func (m *Monitor) Exits() chan runtime.Process { - return nil -} - -func (m *Monitor) Monitor(p runtime.Process) error { - return errors.New("Monitor not implemented on Windows") -} - -func (m *Monitor) Close() error { - return errors.New("Monitor Close() not implemented on Windows") -} - -func (m *Monitor) start() { -} diff --git a/supervisor/supervisor.go b/supervisor/supervisor.go index 665ef5b..ba80632 100644 --- a/supervisor/supervisor.go +++ b/supervisor/supervisor.go @@ -345,3 +345,41 @@ func (s *Supervisor) restore() error { } return nil } + +func (s *Supervisor) handleTask(i Task) { + var err error + switch t := i.(type) { + case *AddProcessTask: + err = s.addProcess(t) + case *CreateCheckpointTask: + err = s.createCheckpoint(t) + case *DeleteCheckpointTask: + err = s.deleteCheckpoint(t) + case *StartTask: + err = s.start(t) + case *DeleteTask: + err = s.delete(t) + case *ExitTask: + err = s.exit(t) + case *ExecExitTask: + err = s.execExit(t) + case *GetContainersTask: + err = s.getContainers(t) + case *SignalTask: + err = s.signal(t) + case *StatsTask: + err = s.stats(t) + case *UpdateTask: + err = s.updateContainer(t) + case *UpdateProcessTask: + err = s.updateProcess(t) + case *OOMTask: + err = s.oom(t) + default: + err = ErrUnknownTask + } + if err != errDeferredResponse { + i.ErrorCh() <- err + close(i.ErrorCh()) + } +} diff --git a/supervisor/supervisor_linux.go b/supervisor/supervisor_linux.go deleted file mode 100644 index a2dd041..0000000 --- a/supervisor/supervisor_linux.go +++ /dev/null @@ -1,39 +0,0 @@ -package supervisor - -func (s *Supervisor) handleTask(i Task) { - var err error - switch t := i.(type) { - case *AddProcessTask: - err = s.addProcess(t) - case *CreateCheckpointTask: - err = s.createCheckpoint(t) - case *DeleteCheckpointTask: - err = s.deleteCheckpoint(t) - case *StartTask: - err = s.start(t) - case *DeleteTask: - err = s.delete(t) - case *ExitTask: - err = s.exit(t) - case *ExecExitTask: - err = s.execExit(t) - case *GetContainersTask: - err = s.getContainers(t) - case *SignalTask: - err = s.signal(t) - case *StatsTask: - err = s.stats(t) - case *UpdateTask: - err = s.updateContainer(t) - case *UpdateProcessTask: - err = s.updateProcess(t) - case *OOMTask: - err = s.oom(t) - default: - err = ErrUnknownTask - } - if err != errDeferredResponse { - i.ErrorCh() <- err - close(i.ErrorCh()) - } -} diff --git a/supervisor/supervisor_windows.go b/supervisor/supervisor_windows.go deleted file mode 100644 index 2882577..0000000 --- a/supervisor/supervisor_windows.go +++ /dev/null @@ -1,33 +0,0 @@ -package supervisor - -func (s *Supervisor) handleTask(i Task) { - var err error - switch t := i.(type) { - case *AddProcessTask: - err = s.addProcess(t) - case *StartTask: - err = s.start(t) - case *DeleteTask: - err = s.delete(t) - case *ExitTask: - err = s.exit(t) - case *ExecExitTask: - err = s.execExit(t) - case *GetContainersTask: - err = s.getContainers(t) - case *SignalTask: - err = s.signal(t) - case *StatsTask: - err = s.stats(t) - case *UpdateTask: - err = s.updateContainer(t) - case *UpdateProcessTask: - err = s.updateProcess(t) - default: - err = ErrUnknownTask - } - if err != errDeferredResponse { - i.ErrorCh() <- err - close(i.ErrorCh()) - } -}