Remove windows code and simplify linux

Windows will not use containerd and its just unused code and unneed
complexity to keep it all around.

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
Michael Crosby 2016-04-26 13:29:35 -07:00
parent 31270bba69
commit f7f4d8677f
20 changed files with 989 additions and 1165 deletions

View file

@ -1,8 +1,12 @@
package server package server
import ( import (
"bufio"
"errors" "errors"
"fmt" "fmt"
"os"
"strconv"
"strings"
"syscall" "syscall"
"time" "time"
@ -13,6 +17,10 @@ import (
"github.com/docker/containerd/runtime" "github.com/docker/containerd/runtime"
"github.com/docker/containerd/specs" "github.com/docker/containerd/specs"
"github.com/docker/containerd/supervisor" "github.com/docker/containerd/supervisor"
"github.com/opencontainers/runc/libcontainer"
"github.com/opencontainers/runc/libcontainer/cgroups"
"github.com/opencontainers/runc/libcontainer/system"
ocs "github.com/opencontainers/runtime-spec/specs-go"
"golang.org/x/net/context" "golang.org/x/net/context"
) )
@ -40,7 +48,11 @@ func (s *apiServer) CreateContainer(ctx context.Context, c *types.CreateContaine
e.Labels = c.Labels e.Labels = c.Labels
e.NoPivotRoot = c.NoPivotRoot e.NoPivotRoot = c.NoPivotRoot
e.StartResponse = make(chan supervisor.StartResponse, 1) e.StartResponse = make(chan supervisor.StartResponse, 1)
createContainerConfigCheckpoint(e, c) if c.Checkpoint != "" {
e.Checkpoint = &runtime.Checkpoint{
Name: c.Checkpoint,
}
}
s.sv.SendTask(e) s.sv.SendTask(e)
if err := <-e.ErrorCh(); err != nil { if err := <-e.ErrorCh(); err != nil {
return nil, err return nil, err
@ -55,6 +67,73 @@ func (s *apiServer) CreateContainer(ctx context.Context, c *types.CreateContaine
}, nil }, nil
} }
func (s *apiServer) CreateCheckpoint(ctx context.Context, r *types.CreateCheckpointRequest) (*types.CreateCheckpointResponse, error) {
e := &supervisor.CreateCheckpointTask{}
e.ID = r.Id
e.Checkpoint = &runtime.Checkpoint{
Name: r.Checkpoint.Name,
Exit: r.Checkpoint.Exit,
Tcp: r.Checkpoint.Tcp,
UnixSockets: r.Checkpoint.UnixSockets,
Shell: r.Checkpoint.Shell,
}
s.sv.SendTask(e)
if err := <-e.ErrorCh(); err != nil {
return nil, err
}
return &types.CreateCheckpointResponse{}, nil
}
func (s *apiServer) DeleteCheckpoint(ctx context.Context, r *types.DeleteCheckpointRequest) (*types.DeleteCheckpointResponse, error) {
if r.Name == "" {
return nil, errors.New("checkpoint name cannot be empty")
}
e := &supervisor.DeleteCheckpointTask{}
e.ID = r.Id
e.Checkpoint = &runtime.Checkpoint{
Name: r.Name,
}
s.sv.SendTask(e)
if err := <-e.ErrorCh(); err != nil {
return nil, err
}
return &types.DeleteCheckpointResponse{}, nil
}
func (s *apiServer) ListCheckpoint(ctx context.Context, r *types.ListCheckpointRequest) (*types.ListCheckpointResponse, error) {
e := &supervisor.GetContainersTask{}
s.sv.SendTask(e)
if err := <-e.ErrorCh(); err != nil {
return nil, err
}
var container runtime.Container
for _, c := range e.Containers {
if c.ID() == r.Id {
container = c
break
}
}
if container == nil {
return nil, grpc.Errorf(codes.NotFound, "no such containers")
}
var out []*types.Checkpoint
checkpoints, err := container.Checkpoints()
if err != nil {
return nil, err
}
for _, c := range checkpoints {
out = append(out, &types.Checkpoint{
Name: c.Name,
Tcp: c.Tcp,
Shell: c.Shell,
UnixSockets: c.UnixSockets,
// TODO: figure out timestamp
//Timestamp: c.Timestamp,
})
}
return &types.ListCheckpointResponse{Checkpoints: out}, nil
}
func (s *apiServer) Signal(ctx context.Context, r *types.SignalRequest) (*types.SignalResponse, error) { func (s *apiServer) Signal(ctx context.Context, r *types.SignalRequest) (*types.SignalResponse, error) {
e := &supervisor.SignalTask{} e := &supervisor.SignalTask{}
e.ID = r.Id e.ID = r.Id
@ -74,8 +153,22 @@ func (s *apiServer) AddProcess(ctx context.Context, r *types.AddProcessRequest)
Env: r.Env, Env: r.Env,
Cwd: r.Cwd, Cwd: r.Cwd,
} }
setPlatformRuntimeProcessSpecUserFields(r, process) process.User = ocs.User{
UID: r.User.Uid,
GID: r.User.Gid,
AdditionalGids: r.User.AdditionalGids,
}
process.Capabilities = r.Capabilities
process.ApparmorProfile = r.ApparmorProfile
process.SelinuxLabel = r.SelinuxLabel
process.NoNewPrivileges = r.NoNewPrivileges
for _, rl := range r.Rlimits {
process.Rlimits = append(process.Rlimits, ocs.Rlimit{
Type: rl.Type,
Soft: rl.Soft,
Hard: rl.Hard,
})
}
if r.Id == "" { if r.Id == "" {
return nil, fmt.Errorf("container id cannot be empty") return nil, fmt.Errorf("container id cannot be empty")
} }
@ -131,7 +224,7 @@ func createAPIContainer(c runtime.Container, getPids bool) (*types.Container, er
for _, p := range processes { for _, p := range processes {
oldProc := p.Spec() oldProc := p.Spec()
stdio := p.Stdio() stdio := p.Stdio()
appendToProcs := &types.Process{ proc := &types.Process{
Pid: p.ID(), Pid: p.ID(),
SystemPid: uint32(p.SystemPid()), SystemPid: uint32(p.SystemPid()),
Terminal: oldProc.Terminal, Terminal: oldProc.Terminal,
@ -142,8 +235,23 @@ func createAPIContainer(c runtime.Container, getPids bool) (*types.Container, er
Stdout: stdio.Stdout, Stdout: stdio.Stdout,
Stderr: stdio.Stderr, Stderr: stdio.Stderr,
} }
setUserFieldsInProcess(appendToProcs, oldProc) proc.User = &types.User{
procs = append(procs, appendToProcs) Uid: oldProc.User.UID,
Gid: oldProc.User.GID,
AdditionalGids: oldProc.User.AdditionalGids,
}
proc.Capabilities = oldProc.Capabilities
proc.ApparmorProfile = oldProc.ApparmorProfile
proc.SelinuxLabel = oldProc.SelinuxLabel
proc.NoNewPrivileges = oldProc.NoNewPrivileges
for _, rl := range oldProc.Rlimits {
proc.Rlimits = append(proc.Rlimits, &types.Rlimit{
Type: rl.Type,
Soft: rl.Soft,
Hard: rl.Hard,
})
}
procs = append(procs, proc)
} }
var pids []int var pids []int
state, err := c.Status() state, err := c.Status()
@ -254,3 +362,153 @@ func (s *apiServer) Events(r *types.EventsRequest, stream types.API_EventsServer
} }
return nil return nil
} }
func convertToPb(st *runtime.Stat) *types.StatsResponse {
pbSt := &types.StatsResponse{
Timestamp: uint64(st.Timestamp.Unix()),
CgroupStats: &types.CgroupStats{},
}
lcSt, ok := st.Data.(*libcontainer.Stats)
if !ok {
return pbSt
}
cpuSt := lcSt.CgroupStats.CpuStats
systemUsage, _ := getSystemCPUUsage()
pbSt.CgroupStats.CpuStats = &types.CpuStats{
CpuUsage: &types.CpuUsage{
TotalUsage: cpuSt.CpuUsage.TotalUsage,
PercpuUsage: cpuSt.CpuUsage.PercpuUsage,
UsageInKernelmode: cpuSt.CpuUsage.UsageInKernelmode,
UsageInUsermode: cpuSt.CpuUsage.UsageInUsermode,
},
ThrottlingData: &types.ThrottlingData{
Periods: cpuSt.ThrottlingData.Periods,
ThrottledPeriods: cpuSt.ThrottlingData.ThrottledPeriods,
ThrottledTime: cpuSt.ThrottlingData.ThrottledTime,
},
SystemUsage: systemUsage,
}
memSt := lcSt.CgroupStats.MemoryStats
pbSt.CgroupStats.MemoryStats = &types.MemoryStats{
Cache: memSt.Cache,
Usage: &types.MemoryData{
Usage: memSt.Usage.Usage,
MaxUsage: memSt.Usage.MaxUsage,
Failcnt: memSt.Usage.Failcnt,
Limit: memSt.Usage.Limit,
},
SwapUsage: &types.MemoryData{
Usage: memSt.SwapUsage.Usage,
MaxUsage: memSt.SwapUsage.MaxUsage,
Failcnt: memSt.SwapUsage.Failcnt,
Limit: memSt.SwapUsage.Limit,
},
KernelUsage: &types.MemoryData{
Usage: memSt.KernelUsage.Usage,
MaxUsage: memSt.KernelUsage.MaxUsage,
Failcnt: memSt.KernelUsage.Failcnt,
Limit: memSt.KernelUsage.Limit,
},
}
blkSt := lcSt.CgroupStats.BlkioStats
pbSt.CgroupStats.BlkioStats = &types.BlkioStats{
IoServiceBytesRecursive: convertBlkioEntryToPb(blkSt.IoServiceBytesRecursive),
IoServicedRecursive: convertBlkioEntryToPb(blkSt.IoServicedRecursive),
IoQueuedRecursive: convertBlkioEntryToPb(blkSt.IoQueuedRecursive),
IoServiceTimeRecursive: convertBlkioEntryToPb(blkSt.IoServiceTimeRecursive),
IoWaitTimeRecursive: convertBlkioEntryToPb(blkSt.IoWaitTimeRecursive),
IoMergedRecursive: convertBlkioEntryToPb(blkSt.IoMergedRecursive),
IoTimeRecursive: convertBlkioEntryToPb(blkSt.IoTimeRecursive),
SectorsRecursive: convertBlkioEntryToPb(blkSt.SectorsRecursive),
}
pbSt.CgroupStats.HugetlbStats = make(map[string]*types.HugetlbStats)
for k, st := range lcSt.CgroupStats.HugetlbStats {
pbSt.CgroupStats.HugetlbStats[k] = &types.HugetlbStats{
Usage: st.Usage,
MaxUsage: st.MaxUsage,
Failcnt: st.Failcnt,
}
}
pbSt.CgroupStats.PidsStats = &types.PidsStats{
Current: lcSt.CgroupStats.PidsStats.Current,
Limit: lcSt.CgroupStats.PidsStats.Limit,
}
return pbSt
}
func convertBlkioEntryToPb(b []cgroups.BlkioStatEntry) []*types.BlkioStatsEntry {
var pbEs []*types.BlkioStatsEntry
for _, e := range b {
pbEs = append(pbEs, &types.BlkioStatsEntry{
Major: e.Major,
Minor: e.Minor,
Op: e.Op,
Value: e.Value,
})
}
return pbEs
}
const nanoSecondsPerSecond = 1e9
var clockTicksPerSecond = uint64(system.GetClockTicks())
// getSystemCPUUsage returns the host system's cpu usage in
// nanoseconds. An error is returned if the format of the underlying
// file does not match.
//
// Uses /proc/stat defined by POSIX. Looks for the cpu
// statistics line and then sums up the first seven fields
// provided. See `man 5 proc` for details on specific field
// information.
func getSystemCPUUsage() (uint64, error) {
var line string
f, err := os.Open("/proc/stat")
if err != nil {
return 0, err
}
bufReader := bufio.NewReaderSize(nil, 128)
defer func() {
bufReader.Reset(nil)
f.Close()
}()
bufReader.Reset(f)
err = nil
for err == nil {
line, err = bufReader.ReadString('\n')
if err != nil {
break
}
parts := strings.Fields(line)
switch parts[0] {
case "cpu":
if len(parts) < 8 {
return 0, fmt.Errorf("bad format of cpu stats")
}
var totalClockTicks uint64
for _, i := range parts[1:8] {
v, err := strconv.ParseUint(i, 10, 64)
if err != nil {
return 0, fmt.Errorf("error parsing cpu stats")
}
totalClockTicks += v
}
return (totalClockTicks * nanoSecondsPerSecond) /
clockTicksPerSecond, nil
}
}
return 0, fmt.Errorf("bad stats format")
}
func (s *apiServer) Stats(ctx context.Context, r *types.StatsRequest) (*types.StatsResponse, error) {
e := &supervisor.StatsTask{}
e.ID = r.Id
e.Stat = make(chan *runtime.Stat, 1)
s.sv.SendTask(e)
if err := <-e.ErrorCh(); err != nil {
return nil, err
}
stats := <-e.Stat
t := convertToPb(stats)
return t, nil
}

View file

@ -1,285 +0,0 @@
package server
import (
"bufio"
"errors"
"fmt"
"os"
"strconv"
"strings"
"github.com/docker/containerd/api/grpc/types"
"github.com/docker/containerd/runtime"
"github.com/docker/containerd/specs"
"github.com/docker/containerd/supervisor"
"github.com/opencontainers/runc/libcontainer"
"github.com/opencontainers/runc/libcontainer/cgroups"
"github.com/opencontainers/runc/libcontainer/system"
ocs "github.com/opencontainers/runtime-spec/specs-go"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
)
func createContainerConfigCheckpoint(e *supervisor.StartTask, c *types.CreateContainerRequest) {
if c.Checkpoint != "" {
e.Checkpoint = &runtime.Checkpoint{
Name: c.Checkpoint,
}
}
}
func (s *apiServer) CreateCheckpoint(ctx context.Context, r *types.CreateCheckpointRequest) (*types.CreateCheckpointResponse, error) {
e := &supervisor.CreateCheckpointTask{}
e.ID = r.Id
e.Checkpoint = &runtime.Checkpoint{
Name: r.Checkpoint.Name,
Exit: r.Checkpoint.Exit,
Tcp: r.Checkpoint.Tcp,
UnixSockets: r.Checkpoint.UnixSockets,
Shell: r.Checkpoint.Shell,
}
s.sv.SendTask(e)
if err := <-e.ErrorCh(); err != nil {
return nil, err
}
return &types.CreateCheckpointResponse{}, nil
}
func (s *apiServer) DeleteCheckpoint(ctx context.Context, r *types.DeleteCheckpointRequest) (*types.DeleteCheckpointResponse, error) {
if r.Name == "" {
return nil, errors.New("checkpoint name cannot be empty")
}
e := &supervisor.DeleteCheckpointTask{}
e.ID = r.Id
e.Checkpoint = &runtime.Checkpoint{
Name: r.Name,
}
s.sv.SendTask(e)
if err := <-e.ErrorCh(); err != nil {
return nil, err
}
return &types.DeleteCheckpointResponse{}, nil
}
func (s *apiServer) ListCheckpoint(ctx context.Context, r *types.ListCheckpointRequest) (*types.ListCheckpointResponse, error) {
e := &supervisor.GetContainersTask{}
s.sv.SendTask(e)
if err := <-e.ErrorCh(); err != nil {
return nil, err
}
var container runtime.Container
for _, c := range e.Containers {
if c.ID() == r.Id {
container = c
break
}
}
if container == nil {
return nil, grpc.Errorf(codes.NotFound, "no such containers")
}
var out []*types.Checkpoint
checkpoints, err := container.Checkpoints()
if err != nil {
return nil, err
}
for _, c := range checkpoints {
out = append(out, &types.Checkpoint{
Name: c.Name,
Tcp: c.Tcp,
Shell: c.Shell,
UnixSockets: c.UnixSockets,
// TODO: figure out timestamp
//Timestamp: c.Timestamp,
})
}
return &types.ListCheckpointResponse{Checkpoints: out}, nil
}
func convertToPb(st *runtime.Stat) *types.StatsResponse {
pbSt := &types.StatsResponse{
Timestamp: uint64(st.Timestamp.Unix()),
CgroupStats: &types.CgroupStats{},
}
lcSt, ok := st.Data.(*libcontainer.Stats)
if !ok {
return pbSt
}
cpuSt := lcSt.CgroupStats.CpuStats
systemUsage, _ := getSystemCPUUsage()
pbSt.CgroupStats.CpuStats = &types.CpuStats{
CpuUsage: &types.CpuUsage{
TotalUsage: cpuSt.CpuUsage.TotalUsage,
PercpuUsage: cpuSt.CpuUsage.PercpuUsage,
UsageInKernelmode: cpuSt.CpuUsage.UsageInKernelmode,
UsageInUsermode: cpuSt.CpuUsage.UsageInUsermode,
},
ThrottlingData: &types.ThrottlingData{
Periods: cpuSt.ThrottlingData.Periods,
ThrottledPeriods: cpuSt.ThrottlingData.ThrottledPeriods,
ThrottledTime: cpuSt.ThrottlingData.ThrottledTime,
},
SystemUsage: systemUsage,
}
memSt := lcSt.CgroupStats.MemoryStats
pbSt.CgroupStats.MemoryStats = &types.MemoryStats{
Cache: memSt.Cache,
Usage: &types.MemoryData{
Usage: memSt.Usage.Usage,
MaxUsage: memSt.Usage.MaxUsage,
Failcnt: memSt.Usage.Failcnt,
Limit: memSt.Usage.Limit,
},
SwapUsage: &types.MemoryData{
Usage: memSt.SwapUsage.Usage,
MaxUsage: memSt.SwapUsage.MaxUsage,
Failcnt: memSt.SwapUsage.Failcnt,
Limit: memSt.SwapUsage.Limit,
},
KernelUsage: &types.MemoryData{
Usage: memSt.KernelUsage.Usage,
MaxUsage: memSt.KernelUsage.MaxUsage,
Failcnt: memSt.KernelUsage.Failcnt,
Limit: memSt.KernelUsage.Limit,
},
}
blkSt := lcSt.CgroupStats.BlkioStats
pbSt.CgroupStats.BlkioStats = &types.BlkioStats{
IoServiceBytesRecursive: convertBlkioEntryToPb(blkSt.IoServiceBytesRecursive),
IoServicedRecursive: convertBlkioEntryToPb(blkSt.IoServicedRecursive),
IoQueuedRecursive: convertBlkioEntryToPb(blkSt.IoQueuedRecursive),
IoServiceTimeRecursive: convertBlkioEntryToPb(blkSt.IoServiceTimeRecursive),
IoWaitTimeRecursive: convertBlkioEntryToPb(blkSt.IoWaitTimeRecursive),
IoMergedRecursive: convertBlkioEntryToPb(blkSt.IoMergedRecursive),
IoTimeRecursive: convertBlkioEntryToPb(blkSt.IoTimeRecursive),
SectorsRecursive: convertBlkioEntryToPb(blkSt.SectorsRecursive),
}
pbSt.CgroupStats.HugetlbStats = make(map[string]*types.HugetlbStats)
for k, st := range lcSt.CgroupStats.HugetlbStats {
pbSt.CgroupStats.HugetlbStats[k] = &types.HugetlbStats{
Usage: st.Usage,
MaxUsage: st.MaxUsage,
Failcnt: st.Failcnt,
}
}
pbSt.CgroupStats.PidsStats = &types.PidsStats{
Current: lcSt.CgroupStats.PidsStats.Current,
Limit: lcSt.CgroupStats.PidsStats.Limit,
}
return pbSt
}
func convertBlkioEntryToPb(b []cgroups.BlkioStatEntry) []*types.BlkioStatsEntry {
var pbEs []*types.BlkioStatsEntry
for _, e := range b {
pbEs = append(pbEs, &types.BlkioStatsEntry{
Major: e.Major,
Minor: e.Minor,
Op: e.Op,
Value: e.Value,
})
}
return pbEs
}
const nanoSecondsPerSecond = 1e9
var clockTicksPerSecond = uint64(system.GetClockTicks())
// getSystemCPUUsage returns the host system's cpu usage in
// nanoseconds. An error is returned if the format of the underlying
// file does not match.
//
// Uses /proc/stat defined by POSIX. Looks for the cpu
// statistics line and then sums up the first seven fields
// provided. See `man 5 proc` for details on specific field
// information.
func getSystemCPUUsage() (uint64, error) {
var line string
f, err := os.Open("/proc/stat")
if err != nil {
return 0, err
}
bufReader := bufio.NewReaderSize(nil, 128)
defer func() {
bufReader.Reset(nil)
f.Close()
}()
bufReader.Reset(f)
err = nil
for err == nil {
line, err = bufReader.ReadString('\n')
if err != nil {
break
}
parts := strings.Fields(line)
switch parts[0] {
case "cpu":
if len(parts) < 8 {
return 0, fmt.Errorf("bad format of cpu stats")
}
var totalClockTicks uint64
for _, i := range parts[1:8] {
v, err := strconv.ParseUint(i, 10, 64)
if err != nil {
return 0, fmt.Errorf("error parsing cpu stats")
}
totalClockTicks += v
}
return (totalClockTicks * nanoSecondsPerSecond) /
clockTicksPerSecond, nil
}
}
return 0, fmt.Errorf("bad stats format")
}
func (s *apiServer) Stats(ctx context.Context, r *types.StatsRequest) (*types.StatsResponse, error) {
e := &supervisor.StatsTask{}
e.ID = r.Id
e.Stat = make(chan *runtime.Stat, 1)
s.sv.SendTask(e)
if err := <-e.ErrorCh(); err != nil {
return nil, err
}
stats := <-e.Stat
t := convertToPb(stats)
return t, nil
}
func setUserFieldsInProcess(p *types.Process, oldProc specs.ProcessSpec) {
p.User = &types.User{
Uid: oldProc.User.UID,
Gid: oldProc.User.GID,
AdditionalGids: oldProc.User.AdditionalGids,
}
p.Capabilities = oldProc.Capabilities
p.ApparmorProfile = oldProc.ApparmorProfile
p.SelinuxLabel = oldProc.SelinuxLabel
p.NoNewPrivileges = oldProc.NoNewPrivileges
for _, rl := range oldProc.Rlimits {
p.Rlimits = append(p.Rlimits, &types.Rlimit{
Type: rl.Type,
Soft: rl.Soft,
Hard: rl.Hard,
})
}
}
func setPlatformRuntimeProcessSpecUserFields(r *types.AddProcessRequest, process *specs.ProcessSpec) {
process.User = ocs.User{
UID: r.User.Uid,
GID: r.User.Gid,
AdditionalGids: r.User.AdditionalGids,
}
process.Capabilities = r.Capabilities
process.ApparmorProfile = r.ApparmorProfile
process.SelinuxLabel = r.SelinuxLabel
process.NoNewPrivileges = r.NoNewPrivileges
for _, rl := range r.Rlimits {
process.Rlimits = append(process.Rlimits, ocs.Rlimit{
Type: rl.Type,
Soft: rl.Soft,
Hard: rl.Hard,
})
}
}

View file

@ -2,8 +2,11 @@ package main
import ( import (
"fmt" "fmt"
"log"
"net"
"os" "os"
"os/signal" "os/signal"
"runtime"
"strings" "strings"
"sync" "sync"
"syscall" "syscall"
@ -12,18 +15,24 @@ import (
"google.golang.org/grpc" "google.golang.org/grpc"
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
"github.com/cloudfoundry/gosigar"
"github.com/codegangsta/cli" "github.com/codegangsta/cli"
"github.com/cyberdelia/go-metrics-graphite"
"github.com/docker/containerd" "github.com/docker/containerd"
"github.com/docker/containerd/api/grpc/server" "github.com/docker/containerd/api/grpc/server"
"github.com/docker/containerd/api/grpc/types" "github.com/docker/containerd/api/grpc/types"
"github.com/docker/containerd/api/http/pprof"
"github.com/docker/containerd/osutils" "github.com/docker/containerd/osutils"
"github.com/docker/containerd/supervisor" "github.com/docker/containerd/supervisor"
"github.com/docker/docker/pkg/listeners" "github.com/docker/docker/pkg/listeners"
"github.com/rcrowley/go-metrics"
) )
const ( const (
usage = `High performance container daemon` usage = `High performance container daemon`
minRlimit = 1024 minRlimit = 1024
defaultStateDir = "/run/containerd"
defaultGRPCEndpoint = "unix:///run/containerd/containerd.sock"
) )
var daemonFlags = []cli.Flag{ var daemonFlags = []cli.Flag{
@ -75,11 +84,14 @@ var daemonFlags = []cli.Flag{
Value: 500, Value: 500,
Usage: "number of past events to keep in the event log", Usage: "number of past events to keep in the event log",
}, },
cli.StringFlag{
Name: "graphite-address",
Usage: "Address of graphite server",
},
} }
func main() { func main() {
logrus.SetFormatter(&logrus.TextFormatter{TimestampFormat: time.RFC3339Nano}) logrus.SetFormatter(&logrus.TextFormatter{TimestampFormat: time.RFC3339Nano})
appendPlatformFlags()
app := cli.NewApp() app := cli.NewApp()
app.Name = "containerd" app.Name = "containerd"
if containerd.GitCommit != "" { if containerd.GitCommit != "" {
@ -89,7 +101,24 @@ func main() {
} }
app.Usage = usage app.Usage = usage
app.Flags = daemonFlags app.Flags = daemonFlags
setAppBefore(app) app.Before = func(context *cli.Context) error {
if context.GlobalBool("debug") {
logrus.SetLevel(logrus.DebugLevel)
if context.GlobalDuration("metrics-interval") > 0 {
if err := debugMetrics(context.GlobalDuration("metrics-interval"), context.GlobalString("graphite-address")); err != nil {
return err
}
}
}
if p := context.GlobalString("pprof-address"); len(p) > 0 {
pprof.Enable(p)
}
if err := checkLimits(); err != nil {
return err
}
return nil
}
app.Action = func(context *cli.Context) { app.Action = func(context *cli.Context) {
if err := daemon(context); err != nil { if err := daemon(context); err != nil {
@ -183,3 +212,72 @@ func getDefaultID() string {
} }
return hostname return hostname
} }
func checkLimits() error {
var l syscall.Rlimit
if err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &l); err != nil {
return err
}
if l.Cur <= minRlimit {
logrus.WithFields(logrus.Fields{
"current": l.Cur,
"max": l.Max,
}).Warn("containerd: low RLIMIT_NOFILE changing to max")
l.Cur = l.Max
return syscall.Setrlimit(syscall.RLIMIT_NOFILE, &l)
}
return nil
}
func processMetrics() {
var (
g = metrics.NewGauge()
fg = metrics.NewGauge()
memg = metrics.NewGauge()
)
metrics.DefaultRegistry.Register("goroutines", g)
metrics.DefaultRegistry.Register("fds", fg)
metrics.DefaultRegistry.Register("memory-used", memg)
collect := func() {
// update number of goroutines
g.Update(int64(runtime.NumGoroutine()))
// collect the number of open fds
fds, err := osutils.GetOpenFds(os.Getpid())
if err != nil {
logrus.WithField("error", err).Error("containerd: get open fd count")
}
fg.Update(int64(fds))
// get the memory used
m := sigar.ProcMem{}
if err := m.Get(os.Getpid()); err != nil {
logrus.WithField("error", err).Error("containerd: get pid memory information")
}
memg.Update(int64(m.Size))
}
go func() {
collect()
for range time.Tick(30 * time.Second) {
collect()
}
}()
}
func debugMetrics(interval time.Duration, graphiteAddr string) error {
for name, m := range supervisor.Metrics() {
if err := metrics.DefaultRegistry.Register(name, m); err != nil {
return err
}
}
processMetrics()
if graphiteAddr != "" {
addr, err := net.ResolveTCPAddr("tcp", graphiteAddr)
if err != nil {
return err
}
go graphite.Graphite(metrics.DefaultRegistry, 10e9, "metrics", addr)
} else {
l := log.New(os.Stdout, "[containerd] ", log.LstdFlags)
go metrics.Log(metrics.DefaultRegistry, interval, l)
}
return nil
}

View file

@ -1,121 +0,0 @@
package main
import (
"log"
"net"
"os"
"runtime"
"syscall"
"time"
"github.com/Sirupsen/logrus"
"github.com/cloudfoundry/gosigar"
"github.com/codegangsta/cli"
"github.com/cyberdelia/go-metrics-graphite"
"github.com/docker/containerd/api/http/pprof"
"github.com/docker/containerd/osutils"
"github.com/docker/containerd/supervisor"
"github.com/rcrowley/go-metrics"
)
const (
defaultStateDir = "/run/containerd"
defaultGRPCEndpoint = "unix:///run/containerd/containerd.sock"
)
func appendPlatformFlags() {
daemonFlags = append(daemonFlags, cli.StringFlag{
Name: "graphite-address",
Usage: "Address of graphite server",
})
}
func setAppBefore(app *cli.App) {
app.Before = func(context *cli.Context) error {
if context.GlobalBool("debug") {
logrus.SetLevel(logrus.DebugLevel)
if context.GlobalDuration("metrics-interval") > 0 {
if err := debugMetrics(context.GlobalDuration("metrics-interval"), context.GlobalString("graphite-address")); err != nil {
return err
}
}
}
if p := context.GlobalString("pprof-address"); len(p) > 0 {
pprof.Enable(p)
}
if err := checkLimits(); err != nil {
return err
}
return nil
}
}
func checkLimits() error {
var l syscall.Rlimit
if err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &l); err != nil {
return err
}
if l.Cur <= minRlimit {
logrus.WithFields(logrus.Fields{
"current": l.Cur,
"max": l.Max,
}).Warn("containerd: low RLIMIT_NOFILE changing to max")
l.Cur = l.Max
return syscall.Setrlimit(syscall.RLIMIT_NOFILE, &l)
}
return nil
}
func processMetrics() {
var (
g = metrics.NewGauge()
fg = metrics.NewGauge()
memg = metrics.NewGauge()
)
metrics.DefaultRegistry.Register("goroutines", g)
metrics.DefaultRegistry.Register("fds", fg)
metrics.DefaultRegistry.Register("memory-used", memg)
collect := func() {
// update number of goroutines
g.Update(int64(runtime.NumGoroutine()))
// collect the number of open fds
fds, err := osutils.GetOpenFds(os.Getpid())
if err != nil {
logrus.WithField("error", err).Error("containerd: get open fd count")
}
fg.Update(int64(fds))
// get the memory used
m := sigar.ProcMem{}
if err := m.Get(os.Getpid()); err != nil {
logrus.WithField("error", err).Error("containerd: get pid memory information")
}
memg.Update(int64(m.Size))
}
go func() {
collect()
for range time.Tick(30 * time.Second) {
collect()
}
}()
}
func debugMetrics(interval time.Duration, graphiteAddr string) error {
for name, m := range supervisor.Metrics() {
if err := metrics.DefaultRegistry.Register(name, m); err != nil {
return err
}
}
processMetrics()
if graphiteAddr != "" {
addr, err := net.ResolveTCPAddr("tcp", graphiteAddr)
if err != nil {
return err
}
go graphite.Graphite(metrics.DefaultRegistry, 10e9, "metrics", addr)
} else {
l := log.New(os.Stdout, "[containerd] ", log.LstdFlags)
go metrics.Log(metrics.DefaultRegistry, interval, l)
}
return nil
}

View file

@ -621,3 +621,23 @@ type stdio struct {
stdout string stdout string
stderr string stderr string
} }
func createStdio() (s stdio, err error) {
tmp, err := ioutil.TempDir("", "ctr-")
if err != nil {
return s, err
}
// create fifo's for the process
for name, fd := range map[string]*string{
"stdin": &s.stdin,
"stdout": &s.stdout,
"stderr": &s.stderr,
} {
path := filepath.Join(tmp, name)
if err := syscall.Mkfifo(path, 0755); err != nil && !os.IsExist(err) {
return s, err
}
*fd = path
}
return s, nil
}

View file

@ -1,30 +0,0 @@
// +build !windows
package main
import (
"io/ioutil"
"os"
"path/filepath"
"syscall"
)
func createStdio() (s stdio, err error) {
tmp, err := ioutil.TempDir("", "ctr-")
if err != nil {
return s, err
}
// create fifo's for the process
for name, fd := range map[string]*string{
"stdin": &s.stdin,
"stdout": &s.stdout,
"stderr": &s.stderr,
} {
path := filepath.Join(tmp, name)
if err := syscall.Mkfifo(path, 0755); err != nil && !os.IsExist(err) {
return s, err
}
*fd = path
}
return s, nil
}

View file

@ -1,6 +0,0 @@
package main
// TODO Windows: This will have a very different implementation
func createStdio() (s stdio, err error) {
return stdio{}, nil
}

View file

@ -2,15 +2,20 @@ package runtime
import ( import (
"encoding/json" "encoding/json"
"fmt"
"io" "io"
"io/ioutil" "io/ioutil"
"os" "os"
"os/exec" "os/exec"
"path/filepath" "path/filepath"
"strings"
"syscall"
"time" "time"
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
"github.com/docker/containerd/specs" "github.com/docker/containerd/specs"
"github.com/opencontainers/runc/libcontainer"
ocs "github.com/opencontainers/runtime-spec/specs-go"
) )
type Container interface { type Container interface {
@ -271,3 +276,485 @@ func (c *container) UpdateResources(r *Resource) error {
config.Cgroups.Resources.MemorySwap = r.MemorySwap config.Cgroups.Resources.MemorySwap = r.MemorySwap
return container.Set(config) return container.Set(config)
} }
func getRootIDs(s *specs.Spec) (int, int, error) {
if s == nil {
return 0, 0, nil
}
var hasUserns bool
for _, ns := range s.Linux.Namespaces {
if ns.Type == ocs.UserNamespace {
hasUserns = true
break
}
}
if !hasUserns {
return 0, 0, nil
}
uid := hostIDFromMap(0, s.Linux.UIDMappings)
gid := hostIDFromMap(0, s.Linux.GIDMappings)
return uid, gid, nil
}
func (c *container) State() State {
proc := c.processes["init"]
if proc == nil {
return Stopped
}
return proc.State()
}
func (c *container) Runtime() string {
return c.runtime
}
func (c *container) Pause() error {
args := c.runtimeArgs
args = append(args, "pause", c.id)
b, err := exec.Command(c.runtime, args...).CombinedOutput()
if err != nil {
return fmt.Errorf(string(b))
}
return nil
}
func (c *container) Resume() error {
args := c.runtimeArgs
args = append(args, "resume", c.id)
b, err := exec.Command(c.runtime, args...).CombinedOutput()
if err != nil {
return fmt.Errorf(string(b))
}
return nil
}
func (c *container) Checkpoints() ([]Checkpoint, error) {
dirs, err := ioutil.ReadDir(filepath.Join(c.bundle, "checkpoints"))
if err != nil {
return nil, err
}
var out []Checkpoint
for _, d := range dirs {
if !d.IsDir() {
continue
}
path := filepath.Join(c.bundle, "checkpoints", d.Name(), "config.json")
data, err := ioutil.ReadFile(path)
if err != nil {
return nil, err
}
var cpt Checkpoint
if err := json.Unmarshal(data, &cpt); err != nil {
return nil, err
}
out = append(out, cpt)
}
return out, nil
}
func (c *container) Checkpoint(cpt Checkpoint) error {
if err := os.MkdirAll(filepath.Join(c.bundle, "checkpoints"), 0755); err != nil {
return err
}
path := filepath.Join(c.bundle, "checkpoints", cpt.Name)
if err := os.Mkdir(path, 0755); err != nil {
return err
}
f, err := os.Create(filepath.Join(path, "config.json"))
if err != nil {
return err
}
cpt.Created = time.Now()
err = json.NewEncoder(f).Encode(cpt)
f.Close()
if err != nil {
return err
}
args := []string{
"checkpoint",
"--image-path", path,
}
add := func(flags ...string) {
args = append(args, flags...)
}
add(c.runtimeArgs...)
if !cpt.Exit {
add("--leave-running")
}
if cpt.Shell {
add("--shell-job")
}
if cpt.Tcp {
add("--tcp-established")
}
if cpt.UnixSockets {
add("--ext-unix-sk")
}
add(c.id)
out, err := exec.Command(c.runtime, args...).CombinedOutput()
if err != nil {
return fmt.Errorf("%s: %s", err.Error(), string(out))
}
return err
}
func (c *container) DeleteCheckpoint(name string) error {
return os.RemoveAll(filepath.Join(c.bundle, "checkpoints", name))
}
func (c *container) Start(checkpoint string, s Stdio) (Process, error) {
processRoot := filepath.Join(c.root, c.id, InitProcessID)
if err := os.Mkdir(processRoot, 0755); err != nil {
return nil, err
}
cmd := exec.Command(c.shim,
c.id, c.bundle, c.runtime,
)
cmd.Dir = processRoot
cmd.SysProcAttr = &syscall.SysProcAttr{
Setpgid: true,
}
spec, err := c.readSpec()
if err != nil {
return nil, err
}
config := &processConfig{
checkpoint: checkpoint,
root: processRoot,
id: InitProcessID,
c: c,
stdio: s,
spec: spec,
processSpec: specs.ProcessSpec(spec.Process),
}
p, err := newProcess(config)
if err != nil {
return nil, err
}
if err := c.startCmd(InitProcessID, cmd, p); err != nil {
return nil, err
}
return p, nil
}
func (c *container) Exec(pid string, pspec specs.ProcessSpec, s Stdio) (pp Process, err error) {
processRoot := filepath.Join(c.root, c.id, pid)
if err := os.Mkdir(processRoot, 0755); err != nil {
return nil, err
}
defer func() {
if err != nil {
c.RemoveProcess(pid)
}
}()
cmd := exec.Command(c.shim,
c.id, c.bundle, c.runtime,
)
cmd.Dir = processRoot
cmd.SysProcAttr = &syscall.SysProcAttr{
Setpgid: true,
}
spec, err := c.readSpec()
if err != nil {
return nil, err
}
config := &processConfig{
exec: true,
id: pid,
root: processRoot,
c: c,
processSpec: pspec,
spec: spec,
stdio: s,
}
p, err := newProcess(config)
if err != nil {
return nil, err
}
if err := c.startCmd(pid, cmd, p); err != nil {
return nil, err
}
return p, nil
}
func (c *container) startCmd(pid string, cmd *exec.Cmd, p *process) error {
if err := cmd.Start(); err != nil {
if exErr, ok := err.(*exec.Error); ok {
if exErr.Err == exec.ErrNotFound || exErr.Err == os.ErrNotExist {
return fmt.Errorf("%s not installed on system", c.shim)
}
}
return err
}
if err := c.waitForStart(p, cmd); err != nil {
return err
}
c.processes[pid] = p
return nil
}
func (c *container) getLibctContainer() (libcontainer.Container, error) {
runtimeRoot := "/run/runc"
// Check that the root wasn't changed
for _, opt := range c.runtimeArgs {
if strings.HasPrefix(opt, "--root=") {
runtimeRoot = strings.TrimPrefix(opt, "--root=")
break
}
}
f, err := libcontainer.New(runtimeRoot, libcontainer.Cgroupfs)
if err != nil {
return nil, err
}
return f.Load(c.id)
}
func hostIDFromMap(id uint32, mp []ocs.IDMapping) int {
for _, m := range mp {
if (id >= m.ContainerID) && (id <= (m.ContainerID + m.Size - 1)) {
return int(m.HostID + (id - m.ContainerID))
}
}
return 0
}
func (c *container) Pids() ([]int, error) {
container, err := c.getLibctContainer()
if err != nil {
return nil, err
}
return container.Processes()
}
func (c *container) Stats() (*Stat, error) {
container, err := c.getLibctContainer()
if err != nil {
return nil, err
}
now := time.Now()
stats, err := container.Stats()
if err != nil {
return nil, err
}
return &Stat{
Timestamp: now,
Data: stats,
}, nil
}
func (c *container) OOM() (OOM, error) {
container, err := c.getLibctContainer()
if err != nil {
if lerr, ok := err.(libcontainer.Error); ok {
// with oom registration sometimes the container can run, exit, and be destroyed
// faster than we can get the state back so we can just ignore this
if lerr.Code() == libcontainer.ContainerNotExists {
return nil, ErrContainerExited
}
}
return nil, err
}
state, err := container.State()
if err != nil {
return nil, err
}
memoryPath := state.CgroupPaths["memory"]
return c.getMemeoryEventFD(memoryPath)
}
// Status implements the runtime Container interface.
func (c *container) Status() (State, error) {
args := c.runtimeArgs
args = append(args, "state", c.id)
out, err := exec.Command(c.runtime, args...).CombinedOutput()
if err != nil {
return "", fmt.Errorf(string(out))
}
// We only require the runtime json output to have a top level Status field.
var s struct {
Status State `json:"status"`
}
if err := json.Unmarshal(out, &s); err != nil {
return "", err
}
return s.Status, nil
}
func (c *container) getMemeoryEventFD(root string) (*oom, error) {
f, err := os.Open(filepath.Join(root, "memory.oom_control"))
if err != nil {
return nil, err
}
fd, _, serr := syscall.RawSyscall(syscall.SYS_EVENTFD2, 0, syscall.FD_CLOEXEC, 0)
if serr != 0 {
f.Close()
return nil, serr
}
if err := c.writeEventFD(root, int(f.Fd()), int(fd)); err != nil {
syscall.Close(int(fd))
f.Close()
return nil, err
}
return &oom{
root: root,
id: c.id,
eventfd: int(fd),
control: f,
}, nil
}
func (c *container) writeEventFD(root string, cfd, efd int) error {
f, err := os.OpenFile(filepath.Join(root, "cgroup.event_control"), os.O_WRONLY, 0)
if err != nil {
return err
}
defer f.Close()
_, err = f.WriteString(fmt.Sprintf("%d %d", efd, cfd))
return err
}
type waitArgs struct {
pid int
err error
}
func (c *container) waitForStart(p *process, cmd *exec.Cmd) error {
wc := make(chan error, 1)
go func() {
for {
if _, err := p.getPidFromFile(); err != nil {
if os.IsNotExist(err) || err == errInvalidPidInt {
alive, err := isAlive(cmd)
if err != nil {
wc <- err
return
}
if !alive {
// runc could have failed to run the container so lets get the error
// out of the logs or the shim could have encountered an error
messages, err := readLogMessages(filepath.Join(p.root, "shim-log.json"))
if err != nil {
wc <- err
return
}
for _, m := range messages {
if m.Level == "error" {
wc <- fmt.Errorf("shim error: %v", m.Msg)
return
}
}
// no errors reported back from shim, check for runc/runtime errors
messages, err = readLogMessages(filepath.Join(p.root, "log.json"))
if err != nil {
if os.IsNotExist(err) {
err = ErrContainerNotStarted
}
wc <- err
return
}
for _, m := range messages {
if m.Level == "error" {
wc <- fmt.Errorf("oci runtime error: %v", m.Msg)
return
}
}
wc <- ErrContainerNotStarted
return
}
time.Sleep(15 * time.Millisecond)
continue
}
wc <- err
return
}
// the pid file was read successfully
wc <- nil
return
}
}()
select {
case err := <-wc:
if err != nil {
return err
}
return nil
case <-time.After(c.timeout):
cmd.Process.Kill()
cmd.Wait()
return ErrContainerStartTimeout
}
}
// isAlive checks if the shim that launched the container is still alive
func isAlive(cmd *exec.Cmd) (bool, error) {
if err := syscall.Kill(cmd.Process.Pid, 0); err != nil {
if err == syscall.ESRCH {
return false, nil
}
return false, err
}
return true, nil
}
type oom struct {
id string
root string
control *os.File
eventfd int
}
func (o *oom) ContainerID() string {
return o.id
}
func (o *oom) FD() int {
return o.eventfd
}
func (o *oom) Flush() {
buf := make([]byte, 8)
syscall.Read(o.eventfd, buf)
}
func (o *oom) Removed() bool {
_, err := os.Lstat(filepath.Join(o.root, "cgroup.event_control"))
return os.IsNotExist(err)
}
func (o *oom) Close() error {
err := syscall.Close(o.eventfd)
if cerr := o.control.Close(); err == nil {
err = cerr
}
return err
}
type message struct {
Level string `json:"level"`
Msg string `json:"msg"`
}
func readLogMessages(path string) ([]message, error) {
var out []message
f, err := os.Open(path)
if err != nil {
return nil, err
}
defer f.Close()
dec := json.NewDecoder(f)
for {
var m message
if err := dec.Decode(&m); err != nil {
if err == io.EOF {
break
}
return nil, err
}
out = append(out, m)
}
return out, nil
}

View file

@ -1,500 +0,0 @@
package runtime
import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"os"
"os/exec"
"path/filepath"
"strings"
"syscall"
"time"
"github.com/docker/containerd/specs"
"github.com/opencontainers/runc/libcontainer"
ocs "github.com/opencontainers/runtime-spec/specs-go"
)
func getRootIDs(s *specs.Spec) (int, int, error) {
if s == nil {
return 0, 0, nil
}
var hasUserns bool
for _, ns := range s.Linux.Namespaces {
if ns.Type == ocs.UserNamespace {
hasUserns = true
break
}
}
if !hasUserns {
return 0, 0, nil
}
uid := hostIDFromMap(0, s.Linux.UIDMappings)
gid := hostIDFromMap(0, s.Linux.GIDMappings)
return uid, gid, nil
}
func (c *container) State() State {
proc := c.processes["init"]
if proc == nil {
return Stopped
}
return proc.State()
}
func (c *container) Runtime() string {
return c.runtime
}
func (c *container) Pause() error {
args := c.runtimeArgs
args = append(args, "pause", c.id)
b, err := exec.Command(c.runtime, args...).CombinedOutput()
if err != nil {
return fmt.Errorf(string(b))
}
return nil
}
func (c *container) Resume() error {
args := c.runtimeArgs
args = append(args, "resume", c.id)
b, err := exec.Command(c.runtime, args...).CombinedOutput()
if err != nil {
return fmt.Errorf(string(b))
}
return nil
}
func (c *container) Checkpoints() ([]Checkpoint, error) {
dirs, err := ioutil.ReadDir(filepath.Join(c.bundle, "checkpoints"))
if err != nil {
return nil, err
}
var out []Checkpoint
for _, d := range dirs {
if !d.IsDir() {
continue
}
path := filepath.Join(c.bundle, "checkpoints", d.Name(), "config.json")
data, err := ioutil.ReadFile(path)
if err != nil {
return nil, err
}
var cpt Checkpoint
if err := json.Unmarshal(data, &cpt); err != nil {
return nil, err
}
out = append(out, cpt)
}
return out, nil
}
func (c *container) Checkpoint(cpt Checkpoint) error {
if err := os.MkdirAll(filepath.Join(c.bundle, "checkpoints"), 0755); err != nil {
return err
}
path := filepath.Join(c.bundle, "checkpoints", cpt.Name)
if err := os.Mkdir(path, 0755); err != nil {
return err
}
f, err := os.Create(filepath.Join(path, "config.json"))
if err != nil {
return err
}
cpt.Created = time.Now()
err = json.NewEncoder(f).Encode(cpt)
f.Close()
if err != nil {
return err
}
args := []string{
"checkpoint",
"--image-path", path,
}
add := func(flags ...string) {
args = append(args, flags...)
}
add(c.runtimeArgs...)
if !cpt.Exit {
add("--leave-running")
}
if cpt.Shell {
add("--shell-job")
}
if cpt.Tcp {
add("--tcp-established")
}
if cpt.UnixSockets {
add("--ext-unix-sk")
}
add(c.id)
out, err := exec.Command(c.runtime, args...).CombinedOutput()
if err != nil {
return fmt.Errorf("%s: %s", err.Error(), string(out))
}
return err
}
func (c *container) DeleteCheckpoint(name string) error {
return os.RemoveAll(filepath.Join(c.bundle, "checkpoints", name))
}
func (c *container) Start(checkpoint string, s Stdio) (Process, error) {
processRoot := filepath.Join(c.root, c.id, InitProcessID)
if err := os.Mkdir(processRoot, 0755); err != nil {
return nil, err
}
cmd := exec.Command(c.shim,
c.id, c.bundle, c.runtime,
)
cmd.Dir = processRoot
cmd.SysProcAttr = &syscall.SysProcAttr{
Setpgid: true,
}
spec, err := c.readSpec()
if err != nil {
return nil, err
}
config := &processConfig{
checkpoint: checkpoint,
root: processRoot,
id: InitProcessID,
c: c,
stdio: s,
spec: spec,
processSpec: specs.ProcessSpec(spec.Process),
}
p, err := newProcess(config)
if err != nil {
return nil, err
}
if err := c.startCmd(InitProcessID, cmd, p); err != nil {
return nil, err
}
return p, nil
}
func (c *container) Exec(pid string, pspec specs.ProcessSpec, s Stdio) (pp Process, err error) {
processRoot := filepath.Join(c.root, c.id, pid)
if err := os.Mkdir(processRoot, 0755); err != nil {
return nil, err
}
defer func() {
if err != nil {
c.RemoveProcess(pid)
}
}()
cmd := exec.Command(c.shim,
c.id, c.bundle, c.runtime,
)
cmd.Dir = processRoot
cmd.SysProcAttr = &syscall.SysProcAttr{
Setpgid: true,
}
spec, err := c.readSpec()
if err != nil {
return nil, err
}
config := &processConfig{
exec: true,
id: pid,
root: processRoot,
c: c,
processSpec: pspec,
spec: spec,
stdio: s,
}
p, err := newProcess(config)
if err != nil {
return nil, err
}
if err := c.startCmd(pid, cmd, p); err != nil {
return nil, err
}
return p, nil
}
func (c *container) startCmd(pid string, cmd *exec.Cmd, p *process) error {
if err := cmd.Start(); err != nil {
if exErr, ok := err.(*exec.Error); ok {
if exErr.Err == exec.ErrNotFound || exErr.Err == os.ErrNotExist {
return fmt.Errorf("%s not installed on system", c.shim)
}
}
return err
}
if err := c.waitForStart(p, cmd); err != nil {
return err
}
c.processes[pid] = p
return nil
}
func (c *container) getLibctContainer() (libcontainer.Container, error) {
runtimeRoot := "/run/runc"
// Check that the root wasn't changed
for _, opt := range c.runtimeArgs {
if strings.HasPrefix(opt, "--root=") {
runtimeRoot = strings.TrimPrefix(opt, "--root=")
break
}
}
f, err := libcontainer.New(runtimeRoot, libcontainer.Cgroupfs)
if err != nil {
return nil, err
}
return f.Load(c.id)
}
func hostIDFromMap(id uint32, mp []ocs.IDMapping) int {
for _, m := range mp {
if (id >= m.ContainerID) && (id <= (m.ContainerID + m.Size - 1)) {
return int(m.HostID + (id - m.ContainerID))
}
}
return 0
}
func (c *container) Pids() ([]int, error) {
container, err := c.getLibctContainer()
if err != nil {
return nil, err
}
return container.Processes()
}
func (c *container) Stats() (*Stat, error) {
container, err := c.getLibctContainer()
if err != nil {
return nil, err
}
now := time.Now()
stats, err := container.Stats()
if err != nil {
return nil, err
}
return &Stat{
Timestamp: now,
Data: stats,
}, nil
}
// Status implements the runtime Container interface.
func (c *container) Status() (State, error) {
args := c.runtimeArgs
args = append(args, "state", c.id)
out, err := exec.Command(c.runtime, args...).CombinedOutput()
if err != nil {
return "", fmt.Errorf(string(out))
}
// We only require the runtime json output to have a top level Status field.
var s struct {
Status State `json:"status"`
}
if err := json.Unmarshal(out, &s); err != nil {
return "", err
}
return s.Status, nil
}
func (c *container) OOM() (OOM, error) {
container, err := c.getLibctContainer()
if err != nil {
if lerr, ok := err.(libcontainer.Error); ok {
// with oom registration sometimes the container can run, exit, and be destroyed
// faster than we can get the state back so we can just ignore this
if lerr.Code() == libcontainer.ContainerNotExists {
return nil, ErrContainerExited
}
}
return nil, err
}
state, err := container.State()
if err != nil {
return nil, err
}
memoryPath := state.CgroupPaths["memory"]
return c.getMemeoryEventFD(memoryPath)
}
func (c *container) getMemeoryEventFD(root string) (*oom, error) {
f, err := os.Open(filepath.Join(root, "memory.oom_control"))
if err != nil {
return nil, err
}
fd, _, serr := syscall.RawSyscall(syscall.SYS_EVENTFD2, 0, syscall.FD_CLOEXEC, 0)
if serr != 0 {
f.Close()
return nil, serr
}
if err := c.writeEventFD(root, int(f.Fd()), int(fd)); err != nil {
syscall.Close(int(fd))
f.Close()
return nil, err
}
return &oom{
root: root,
id: c.id,
eventfd: int(fd),
control: f,
}, nil
}
func (c *container) writeEventFD(root string, cfd, efd int) error {
f, err := os.OpenFile(filepath.Join(root, "cgroup.event_control"), os.O_WRONLY, 0)
if err != nil {
return err
}
defer f.Close()
_, err = f.WriteString(fmt.Sprintf("%d %d", efd, cfd))
return err
}
type waitArgs struct {
pid int
err error
}
func (c *container) waitForStart(p *process, cmd *exec.Cmd) error {
wc := make(chan error, 1)
go func() {
for {
if _, err := p.getPidFromFile(); err != nil {
if os.IsNotExist(err) || err == errInvalidPidInt {
alive, err := isAlive(cmd)
if err != nil {
wc <- err
return
}
if !alive {
// runc could have failed to run the container so lets get the error
// out of the logs or the shim could have encountered an error
messages, err := readLogMessages(filepath.Join(p.root, "shim-log.json"))
if err != nil {
wc <- err
return
}
for _, m := range messages {
if m.Level == "error" {
wc <- fmt.Errorf("shim error: %v", m.Msg)
return
}
}
// no errors reported back from shim, check for runc/runtime errors
messages, err = readLogMessages(filepath.Join(p.root, "log.json"))
if err != nil {
if os.IsNotExist(err) {
err = ErrContainerNotStarted
}
wc <- err
return
}
for _, m := range messages {
if m.Level == "error" {
wc <- fmt.Errorf("oci runtime error: %v", m.Msg)
return
}
}
wc <- ErrContainerNotStarted
return
}
time.Sleep(15 * time.Millisecond)
continue
}
wc <- err
return
}
// the pid file was read successfully
wc <- nil
return
}
}()
select {
case err := <-wc:
if err != nil {
return err
}
return nil
case <-time.After(c.timeout):
cmd.Process.Kill()
cmd.Wait()
return ErrContainerStartTimeout
}
}
// isAlive checks if the shim that launched the container is still alive
func isAlive(cmd *exec.Cmd) (bool, error) {
if err := syscall.Kill(cmd.Process.Pid, 0); err != nil {
if err == syscall.ESRCH {
return false, nil
}
return false, err
}
return true, nil
}
type oom struct {
id string
root string
control *os.File
eventfd int
}
func (o *oom) ContainerID() string {
return o.id
}
func (o *oom) FD() int {
return o.eventfd
}
func (o *oom) Flush() {
buf := make([]byte, 8)
syscall.Read(o.eventfd, buf)
}
func (o *oom) Removed() bool {
_, err := os.Lstat(filepath.Join(o.root, "cgroup.event_control"))
return os.IsNotExist(err)
}
func (o *oom) Close() error {
err := syscall.Close(o.eventfd)
if cerr := o.control.Close(); err == nil {
err = cerr
}
return err
}
type message struct {
Level string `json:"level"`
Msg string `json:"msg"`
}
func readLogMessages(path string) ([]message, error) {
var out []message
f, err := os.Open(path)
if err != nil {
return nil, err
}
defer f.Close()
dec := json.NewDecoder(f)
for {
var m message
if err := dec.Decode(&m); err != nil {
if err == io.EOF {
break
}
return nil, err
}
out = append(out, m)
}
return out, nil
}

View file

@ -1,71 +0,0 @@
package runtime
import (
"errors"
"github.com/docker/containerd/specs"
)
func getRootIDs(s *specs.PlatformSpec) (int, int, error) {
return 0, 0, nil
}
// TODO Windows: This will have a different implementation
func (c *container) State() State {
return Running // HACK HACK HACK
}
func (c *container) Runtime() string {
return "windows"
}
func (c *container) Pause() error {
return errors.New("Pause not supported on Windows")
}
func (c *container) Resume() error {
return errors.New("Resume not supported on Windows")
}
func (c *container) Checkpoints() ([]Checkpoint, error) {
return nil, errors.New("Checkpoints not supported on Windows ")
}
func (c *container) Checkpoint(cpt Checkpoint) error {
return errors.New("Checkpoint not supported on Windows ")
}
func (c *container) DeleteCheckpoint(name string) error {
return errors.New("DeleteCheckpoint not supported on Windows ")
}
// TODO Windows: Implement me.
// This will have a very different implementation on Windows.
func (c *container) Start(checkpoint string, s Stdio) (Process, error) {
return nil, errors.New("Start not yet implemented on Windows")
}
// TODO Windows: Implement me.
// This will have a very different implementation on Windows.
func (c *container) Exec(pid string, spec specs.ProcessSpec, s Stdio) (Process, error) {
return nil, errors.New("Exec not yet implemented on Windows")
}
// TODO Windows: Implement me.
func (c *container) Pids() ([]int, error) {
return nil, errors.New("Pids not yet implemented on Windows")
}
// TODO Windows: Implement me. (Not yet supported by docker on Windows either...)
func (c *container) Stats() (*Stat, error) {
return nil, errors.New("Stats not yet implemented on Windows")
}
// Status implements the runtime Container interface.
func (c *container) Status() (State, error) {
return "", errors.New("Status not yet implemented on Windows")
}
func (c *container) OOM() (OOM, error) {
return nil, errors.New("OOM not yet implemented on Windows")
}

View file

@ -70,7 +70,21 @@ func newProcess(config *processConfig) (*process, error) {
} }
defer f.Close() defer f.Close()
ps := populateProcessStateForEncoding(config, uid, gid) ps := ProcessState{
ProcessSpec: config.processSpec,
Exec: config.exec,
PlatformProcessState: PlatformProcessState{
Checkpoint: config.checkpoint,
RootUID: uid,
RootGID: gid,
},
Stdin: config.stdio.Stdin,
Stdout: config.stdio.Stdout,
Stderr: config.stdio.Stderr,
RuntimeArgs: config.c.runtimeArgs,
NoPivotRoot: config.c.noPivotRoot,
}
if err := json.NewEncoder(f).Encode(ps); err != nil { if err := json.NewEncoder(f).Encode(ps); err != nil {
return nil, err return nil, err
} }
@ -204,3 +218,24 @@ func (p *process) getPidFromFile() (int, error) {
p.pid = i p.pid = i
return i, nil return i, nil
} }
func getExitPipe(path string) (*os.File, error) {
if err := syscall.Mkfifo(path, 0755); err != nil && !os.IsExist(err) {
return nil, err
}
// add NONBLOCK in case the other side has already closed or else
// this function would never return
return os.OpenFile(path, syscall.O_RDONLY|syscall.O_NONBLOCK, 0)
}
func getControlPipe(path string) (*os.File, error) {
if err := syscall.Mkfifo(path, 0755); err != nil && !os.IsExist(err) {
return nil, err
}
return os.OpenFile(path, syscall.O_RDWR|syscall.O_NONBLOCK, 0)
}
// Signal sends the provided signal to the process
func (p *process) Signal(s os.Signal) error {
return syscall.Kill(p.pid, s.(syscall.Signal))
}

View file

@ -1,44 +0,0 @@
package runtime
import (
"os"
"syscall"
)
func getExitPipe(path string) (*os.File, error) {
if err := syscall.Mkfifo(path, 0755); err != nil && !os.IsExist(err) {
return nil, err
}
// add NONBLOCK in case the other side has already closed or else
// this function would never return
return os.OpenFile(path, syscall.O_RDONLY|syscall.O_NONBLOCK, 0)
}
func getControlPipe(path string) (*os.File, error) {
if err := syscall.Mkfifo(path, 0755); err != nil && !os.IsExist(err) {
return nil, err
}
return os.OpenFile(path, syscall.O_RDWR|syscall.O_NONBLOCK, 0)
}
// Signal sends the provided signal to the process
func (p *process) Signal(s os.Signal) error {
return syscall.Kill(p.pid, s.(syscall.Signal))
}
func populateProcessStateForEncoding(config *processConfig, uid int, gid int) ProcessState {
return ProcessState{
ProcessSpec: config.processSpec,
Exec: config.exec,
PlatformProcessState: PlatformProcessState{
Checkpoint: config.checkpoint,
RootUID: uid,
RootGID: gid,
},
Stdin: config.stdio.Stdin,
Stdout: config.stdio.Stdout,
Stderr: config.stdio.Stderr,
RuntimeArgs: config.c.runtimeArgs,
NoPivotRoot: config.c.noPivotRoot,
}
}

View file

@ -32,6 +32,27 @@ const (
InitProcessID = "init" InitProcessID = "init"
) )
type Checkpoint struct {
// Timestamp is the time that checkpoint happened
Created time.Time `json:"created"`
// Name is the name of the checkpoint
Name string `json:"name"`
// Tcp checkpoints open tcp connections
Tcp bool `json:"tcp"`
// UnixSockets persists unix sockets in the checkpoint
UnixSockets bool `json:"unixSockets"`
// Shell persists tty sessions in the checkpoint
Shell bool `json:"shell"`
// Exit exits the container after the checkpoint is finished
Exit bool `json:"exit"`
}
// PlatformProcessState container platform-specific fields in the ProcessState structure
type PlatformProcessState struct {
Checkpoint string `json:"checkpoint"`
RootUID int `json:"rootUID"`
RootGID int `json:"rootGID"`
}
type State string type State string
type Resource struct { type Resource struct {

View file

@ -1,25 +0,0 @@
package runtime
import "time"
type Checkpoint struct {
// Timestamp is the time that checkpoint happened
Created time.Time `json:"created"`
// Name is the name of the checkpoint
Name string `json:"name"`
// Tcp checkpoints open tcp connections
Tcp bool `json:"tcp"`
// UnixSockets persists unix sockets in the checkpoint
UnixSockets bool `json:"unixSockets"`
// Shell persists tty sessions in the checkpoint
Shell bool `json:"shell"`
// Exit exits the container after the checkpoint is finished
Exit bool `json:"exit"`
}
// PlatformProcessState container platform-specific fields in the ProcessState structure
type PlatformProcessState struct {
Checkpoint string `json:"checkpoint"`
RootUID int `json:"rootUID"`
RootGID int `json:"rootGID"`
}

View file

@ -8,7 +8,6 @@ import (
type StartTask struct { type StartTask struct {
baseTask baseTask
platformStartTask
ID string ID string
BundlePath string BundlePath string
Stdout string Stdout string
@ -17,6 +16,7 @@ type StartTask struct {
StartResponse chan StartResponse StartResponse chan StartResponse
Labels []string Labels []string
NoPivotRoot bool NoPivotRoot bool
Checkpoint *runtime.Checkpoint
} }
func (s *Supervisor) start(t *StartTask) error { func (s *Supervisor) start(t *StartTask) error {
@ -47,7 +47,9 @@ func (s *Supervisor) start(t *StartTask) error {
Stdout: t.Stdout, Stdout: t.Stdout,
Stderr: t.Stderr, Stderr: t.Stderr,
} }
task.setTaskCheckpoint(t) if t.Checkpoint != nil {
task.Checkpoint = t.Checkpoint.Name
}
s.startTasks <- task s.startTasks <- task
ContainerCreateTimer.UpdateSince(start) ContainerCreateTimer.UpdateSince(start)

View file

@ -1,13 +0,0 @@
package supervisor
import "github.com/docker/containerd/runtime"
type platformStartTask struct {
Checkpoint *runtime.Checkpoint
}
func (task *startTask) setTaskCheckpoint(t *StartTask) {
if t.Checkpoint != nil {
task.Checkpoint = t.Checkpoint.Name
}
}

View file

@ -1,6 +1,23 @@
package supervisor package supervisor
import "github.com/cloudfoundry/gosigar"
type Machine struct { type Machine struct {
Cpus int Cpus int
Memory int64 Memory int64
} }
func CollectMachineInformation() (Machine, error) {
m := Machine{}
cpu := sigar.CpuList{}
if err := cpu.Get(); err != nil {
return m, err
}
m.Cpus = len(cpu.List)
mem := sigar.Mem{}
if err := mem.Get(); err != nil {
return m, err
}
m.Memory = int64(mem.Total / 1024 / 1024)
return m, nil
}

View file

@ -1,18 +0,0 @@
package supervisor
import "github.com/cloudfoundry/gosigar"
func CollectMachineInformation() (Machine, error) {
m := Machine{}
cpu := sigar.CpuList{}
if err := cpu.Get(); err != nil {
return m, err
}
m.Cpus = len(cpu.List)
mem := sigar.Mem{}
if err := mem.Get(); err != nil {
return m, err
}
m.Memory = int64(mem.Total / 1024 / 1024)
return m, nil
}

View file

@ -345,3 +345,41 @@ func (s *Supervisor) restore() error {
} }
return nil return nil
} }
func (s *Supervisor) handleTask(i Task) {
var err error
switch t := i.(type) {
case *AddProcessTask:
err = s.addProcess(t)
case *CreateCheckpointTask:
err = s.createCheckpoint(t)
case *DeleteCheckpointTask:
err = s.deleteCheckpoint(t)
case *StartTask:
err = s.start(t)
case *DeleteTask:
err = s.delete(t)
case *ExitTask:
err = s.exit(t)
case *ExecExitTask:
err = s.execExit(t)
case *GetContainersTask:
err = s.getContainers(t)
case *SignalTask:
err = s.signal(t)
case *StatsTask:
err = s.stats(t)
case *UpdateTask:
err = s.updateContainer(t)
case *UpdateProcessTask:
err = s.updateProcess(t)
case *OOMTask:
err = s.oom(t)
default:
err = ErrUnknownTask
}
if err != errDeferredResponse {
i.ErrorCh() <- err
close(i.ErrorCh())
}
}

View file

@ -1,39 +0,0 @@
package supervisor
func (s *Supervisor) handleTask(i Task) {
var err error
switch t := i.(type) {
case *AddProcessTask:
err = s.addProcess(t)
case *CreateCheckpointTask:
err = s.createCheckpoint(t)
case *DeleteCheckpointTask:
err = s.deleteCheckpoint(t)
case *StartTask:
err = s.start(t)
case *DeleteTask:
err = s.delete(t)
case *ExitTask:
err = s.exit(t)
case *ExecExitTask:
err = s.execExit(t)
case *GetContainersTask:
err = s.getContainers(t)
case *SignalTask:
err = s.signal(t)
case *StatsTask:
err = s.stats(t)
case *UpdateTask:
err = s.updateContainer(t)
case *UpdateProcessTask:
err = s.updateProcess(t)
case *OOMTask:
err = s.oom(t)
default:
err = ErrUnknownTask
}
if err != errDeferredResponse {
i.ErrorCh() <- err
close(i.ErrorCh())
}
}