Merge pull request #118 from jhowardmsft/grpc

Final bit of compiling on Windows
This commit is contained in:
Michael Crosby 2016-02-26 15:52:54 -08:00
commit b413f33b99
7 changed files with 361 additions and 269 deletions

View file

@ -1,12 +1,8 @@
package server
import (
"bufio"
"errors"
"fmt"
"os"
"strconv"
"strings"
"syscall"
"time"
@ -16,10 +12,6 @@ import (
"github.com/docker/containerd/api/grpc/types"
"github.com/docker/containerd/runtime"
"github.com/docker/containerd/supervisor"
"github.com/opencontainers/runc/libcontainer"
"github.com/opencontainers/runc/libcontainer/cgroups"
"github.com/opencontainers/runc/libcontainer/system"
"github.com/opencontainers/specs"
"golang.org/x/net/context"
)
@ -46,11 +38,7 @@ func (s *apiServer) CreateContainer(ctx context.Context, c *types.CreateContaine
e.Stderr = c.Stderr
e.Labels = c.Labels
e.StartResponse = make(chan supervisor.StartResponse, 1)
if c.Checkpoint != "" {
e.Checkpoint = &runtime.Checkpoint{
Name: c.Checkpoint,
}
}
createContainerConfigCheckpoint(e, c)
s.sv.SendTask(e)
if err := <-e.ErrorCh(); err != nil {
return nil, err
@ -83,12 +71,9 @@ func (s *apiServer) AddProcess(ctx context.Context, r *types.AddProcessRequest)
Args: r.Args,
Env: r.Env,
Cwd: r.Cwd,
User: specs.User{
UID: r.User.Uid,
GID: r.User.Gid,
AdditionalGids: r.User.AdditionalGids,
},
}
setPlatformRuntimeProcessSpecUserFields(r.User, process)
if r.Id == "" {
return nil, fmt.Errorf("container id cannot be empty")
}
@ -111,73 +96,6 @@ func (s *apiServer) AddProcess(ctx context.Context, r *types.AddProcessRequest)
return &types.AddProcessResponse{}, nil
}
func (s *apiServer) CreateCheckpoint(ctx context.Context, r *types.CreateCheckpointRequest) (*types.CreateCheckpointResponse, error) {
e := &supervisor.CreateCheckpointTask{}
e.ID = r.Id
e.Checkpoint = &runtime.Checkpoint{
Name: r.Checkpoint.Name,
Exit: r.Checkpoint.Exit,
Tcp: r.Checkpoint.Tcp,
UnixSockets: r.Checkpoint.UnixSockets,
Shell: r.Checkpoint.Shell,
}
s.sv.SendTask(e)
if err := <-e.ErrorCh(); err != nil {
return nil, err
}
return &types.CreateCheckpointResponse{}, nil
}
func (s *apiServer) DeleteCheckpoint(ctx context.Context, r *types.DeleteCheckpointRequest) (*types.DeleteCheckpointResponse, error) {
if r.Name == "" {
return nil, errors.New("checkpoint name cannot be empty")
}
e := &supervisor.DeleteCheckpointTask{}
e.ID = r.Id
e.Checkpoint = &runtime.Checkpoint{
Name: r.Name,
}
s.sv.SendTask(e)
if err := <-e.ErrorCh(); err != nil {
return nil, err
}
return &types.DeleteCheckpointResponse{}, nil
}
func (s *apiServer) ListCheckpoint(ctx context.Context, r *types.ListCheckpointRequest) (*types.ListCheckpointResponse, error) {
e := &supervisor.GetContainersTask{}
s.sv.SendTask(e)
if err := <-e.ErrorCh(); err != nil {
return nil, err
}
var container runtime.Container
for _, c := range e.Containers {
if c.ID() == r.Id {
container = c
break
}
}
if container == nil {
return nil, grpc.Errorf(codes.NotFound, "no such containers")
}
var out []*types.Checkpoint
checkpoints, err := container.Checkpoints()
if err != nil {
return nil, err
}
for _, c := range checkpoints {
out = append(out, &types.Checkpoint{
Name: c.Name,
Tcp: c.Tcp,
Shell: c.Shell,
UnixSockets: c.UnixSockets,
// TODO: figure out timestamp
//Timestamp: c.Timestamp,
})
}
return &types.ListCheckpointResponse{Checkpoints: out}, nil
}
func (s *apiServer) State(ctx context.Context, r *types.StateRequest) (*types.StateResponse, error) {
e := &supervisor.GetContainersTask{}
e.ID = r.Id
@ -211,7 +129,7 @@ func createAPIContainer(c runtime.Container, getPids bool) (*types.Container, er
for _, p := range processes {
oldProc := p.Spec()
stdio := p.Stdio()
procs = append(procs, &types.Process{
appendToProcs := &types.Process{
Pid: p.ID(),
SystemPid: uint32(p.SystemPid()),
Terminal: oldProc.Terminal,
@ -221,12 +139,9 @@ func createAPIContainer(c runtime.Container, getPids bool) (*types.Container, er
Stdin: stdio.Stdin,
Stdout: stdio.Stdout,
Stderr: stdio.Stderr,
User: &types.User{
Uid: oldProc.User.UID,
Gid: oldProc.User.GID,
AdditionalGids: oldProc.User.AdditionalGids,
},
})
}
setUserFieldsInProcess(appendToProcs, oldProc)
procs = append(procs, appendToProcs)
}
var pids []int
if getPids {
@ -297,149 +212,3 @@ func (s *apiServer) Events(r *types.EventsRequest, stream types.API_EventsServer
}
return nil
}
func (s *apiServer) Stats(ctx context.Context, r *types.StatsRequest) (*types.StatsResponse, error) {
e := &supervisor.StatsTask{}
e.ID = r.Id
e.Stat = make(chan *runtime.Stat, 1)
s.sv.SendTask(e)
if err := <-e.ErrorCh(); err != nil {
return nil, err
}
stats := <-e.Stat
t := convertToPb(stats)
return t, nil
}
func convertToPb(st *runtime.Stat) *types.StatsResponse {
pbSt := &types.StatsResponse{
Timestamp: uint64(st.Timestamp.Unix()),
CgroupStats: &types.CgroupStats{},
}
lcSt, ok := st.Data.(*libcontainer.Stats)
if !ok {
return pbSt
}
cpuSt := lcSt.CgroupStats.CpuStats
systemUsage, _ := getSystemCPUUsage()
pbSt.CgroupStats.CpuStats = &types.CpuStats{
CpuUsage: &types.CpuUsage{
TotalUsage: cpuSt.CpuUsage.TotalUsage,
PercpuUsage: cpuSt.CpuUsage.PercpuUsage,
UsageInKernelmode: cpuSt.CpuUsage.UsageInKernelmode,
UsageInUsermode: cpuSt.CpuUsage.UsageInUsermode,
},
ThrottlingData: &types.ThrottlingData{
Periods: cpuSt.ThrottlingData.Periods,
ThrottledPeriods: cpuSt.ThrottlingData.ThrottledPeriods,
ThrottledTime: cpuSt.ThrottlingData.ThrottledTime,
},
SystemUsage: systemUsage,
}
memSt := lcSt.CgroupStats.MemoryStats
pbSt.CgroupStats.MemoryStats = &types.MemoryStats{
Cache: memSt.Cache,
Usage: &types.MemoryData{
Usage: memSt.Usage.Usage,
MaxUsage: memSt.Usage.MaxUsage,
Failcnt: memSt.Usage.Failcnt,
Limit: memSt.Usage.Limit,
},
SwapUsage: &types.MemoryData{
Usage: memSt.SwapUsage.Usage,
MaxUsage: memSt.SwapUsage.MaxUsage,
Failcnt: memSt.SwapUsage.Failcnt,
Limit: memSt.SwapUsage.Limit,
},
KernelUsage: &types.MemoryData{
Usage: memSt.KernelUsage.Usage,
MaxUsage: memSt.KernelUsage.MaxUsage,
Failcnt: memSt.KernelUsage.Failcnt,
Limit: memSt.KernelUsage.Limit,
},
}
blkSt := lcSt.CgroupStats.BlkioStats
pbSt.CgroupStats.BlkioStats = &types.BlkioStats{
IoServiceBytesRecursive: convertBlkioEntryToPb(blkSt.IoServiceBytesRecursive),
IoServicedRecursive: convertBlkioEntryToPb(blkSt.IoServicedRecursive),
IoQueuedRecursive: convertBlkioEntryToPb(blkSt.IoQueuedRecursive),
IoServiceTimeRecursive: convertBlkioEntryToPb(blkSt.IoServiceTimeRecursive),
IoWaitTimeRecursive: convertBlkioEntryToPb(blkSt.IoWaitTimeRecursive),
IoMergedRecursive: convertBlkioEntryToPb(blkSt.IoMergedRecursive),
IoTimeRecursive: convertBlkioEntryToPb(blkSt.IoTimeRecursive),
SectorsRecursive: convertBlkioEntryToPb(blkSt.SectorsRecursive),
}
pbSt.CgroupStats.HugetlbStats = make(map[string]*types.HugetlbStats)
for k, st := range lcSt.CgroupStats.HugetlbStats {
pbSt.CgroupStats.HugetlbStats[k] = &types.HugetlbStats{
Usage: st.Usage,
MaxUsage: st.MaxUsage,
Failcnt: st.Failcnt,
}
}
return pbSt
}
func convertBlkioEntryToPb(b []cgroups.BlkioStatEntry) []*types.BlkioStatsEntry {
var pbEs []*types.BlkioStatsEntry
for _, e := range b {
pbEs = append(pbEs, &types.BlkioStatsEntry{
Major: e.Major,
Minor: e.Minor,
Op: e.Op,
Value: e.Value,
})
}
return pbEs
}
const nanoSecondsPerSecond = 1e9
var clockTicksPerSecond = uint64(system.GetClockTicks())
// getSystemCPUUsage returns the host system's cpu usage in
// nanoseconds. An error is returned if the format of the underlying
// file does not match.
//
// Uses /proc/stat defined by POSIX. Looks for the cpu
// statistics line and then sums up the first seven fields
// provided. See `man 5 proc` for details on specific field
// information.
func getSystemCPUUsage() (uint64, error) {
var line string
f, err := os.Open("/proc/stat")
if err != nil {
return 0, err
}
bufReader := bufio.NewReaderSize(nil, 128)
defer func() {
bufReader.Reset(nil)
f.Close()
}()
bufReader.Reset(f)
err = nil
for err == nil {
line, err = bufReader.ReadString('\n')
if err != nil {
break
}
parts := strings.Fields(line)
switch parts[0] {
case "cpu":
if len(parts) < 8 {
return 0, fmt.Errorf("bad format of cpu stats")
}
var totalClockTicks uint64
for _, i := range parts[1:8] {
v, err := strconv.ParseUint(i, 10, 64)
if err != nil {
return 0, fmt.Errorf("error parsing cpu stats")
}
totalClockTicks += v
}
return (totalClockTicks * nanoSecondsPerSecond) /
clockTicksPerSecond, nil
}
}
return 0, fmt.Errorf("bad stats format")
}

View file

@ -0,0 +1,258 @@
package server
import (
"bufio"
"errors"
"fmt"
"os"
"strconv"
"strings"
"github.com/docker/containerd/api/grpc/types"
"github.com/docker/containerd/runtime"
"github.com/docker/containerd/supervisor"
"github.com/opencontainers/runc/libcontainer"
"github.com/opencontainers/runc/libcontainer/cgroups"
"github.com/opencontainers/runc/libcontainer/system"
"github.com/opencontainers/specs"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
)
func createContainerConfigCheckpoint(e *supervisor.StartTask, c *types.CreateContainerRequest) {
if c.Checkpoint != "" {
e.Checkpoint = &runtime.Checkpoint{
Name: c.Checkpoint,
}
}
}
func (s *apiServer) CreateCheckpoint(ctx context.Context, r *types.CreateCheckpointRequest) (*types.CreateCheckpointResponse, error) {
e := &supervisor.CreateCheckpointTask{}
e.ID = r.Id
e.Checkpoint = &runtime.Checkpoint{
Name: r.Checkpoint.Name,
Exit: r.Checkpoint.Exit,
Tcp: r.Checkpoint.Tcp,
UnixSockets: r.Checkpoint.UnixSockets,
Shell: r.Checkpoint.Shell,
}
s.sv.SendTask(e)
if err := <-e.ErrorCh(); err != nil {
return nil, err
}
return &types.CreateCheckpointResponse{}, nil
}
func (s *apiServer) DeleteCheckpoint(ctx context.Context, r *types.DeleteCheckpointRequest) (*types.DeleteCheckpointResponse, error) {
if r.Name == "" {
return nil, errors.New("checkpoint name cannot be empty")
}
e := &supervisor.DeleteCheckpointTask{}
e.ID = r.Id
e.Checkpoint = &runtime.Checkpoint{
Name: r.Name,
}
s.sv.SendTask(e)
if err := <-e.ErrorCh(); err != nil {
return nil, err
}
return &types.DeleteCheckpointResponse{}, nil
}
func (s *apiServer) ListCheckpoint(ctx context.Context, r *types.ListCheckpointRequest) (*types.ListCheckpointResponse, error) {
e := &supervisor.GetContainersTask{}
s.sv.SendTask(e)
if err := <-e.ErrorCh(); err != nil {
return nil, err
}
var container runtime.Container
for _, c := range e.Containers {
if c.ID() == r.Id {
container = c
break
}
}
if container == nil {
return nil, grpc.Errorf(codes.NotFound, "no such containers")
}
var out []*types.Checkpoint
checkpoints, err := container.Checkpoints()
if err != nil {
return nil, err
}
for _, c := range checkpoints {
out = append(out, &types.Checkpoint{
Name: c.Name,
Tcp: c.Tcp,
Shell: c.Shell,
UnixSockets: c.UnixSockets,
// TODO: figure out timestamp
//Timestamp: c.Timestamp,
})
}
return &types.ListCheckpointResponse{Checkpoints: out}, nil
}
func convertToPb(st *runtime.Stat) *types.StatsResponse {
pbSt := &types.StatsResponse{
Timestamp: uint64(st.Timestamp.Unix()),
CgroupStats: &types.CgroupStats{},
}
lcSt, ok := st.Data.(*libcontainer.Stats)
if !ok {
return pbSt
}
cpuSt := lcSt.CgroupStats.CpuStats
systemUsage, _ := getSystemCPUUsage()
pbSt.CgroupStats.CpuStats = &types.CpuStats{
CpuUsage: &types.CpuUsage{
TotalUsage: cpuSt.CpuUsage.TotalUsage,
PercpuUsage: cpuSt.CpuUsage.PercpuUsage,
UsageInKernelmode: cpuSt.CpuUsage.UsageInKernelmode,
UsageInUsermode: cpuSt.CpuUsage.UsageInUsermode,
},
ThrottlingData: &types.ThrottlingData{
Periods: cpuSt.ThrottlingData.Periods,
ThrottledPeriods: cpuSt.ThrottlingData.ThrottledPeriods,
ThrottledTime: cpuSt.ThrottlingData.ThrottledTime,
},
SystemUsage: systemUsage,
}
memSt := lcSt.CgroupStats.MemoryStats
pbSt.CgroupStats.MemoryStats = &types.MemoryStats{
Cache: memSt.Cache,
Usage: &types.MemoryData{
Usage: memSt.Usage.Usage,
MaxUsage: memSt.Usage.MaxUsage,
Failcnt: memSt.Usage.Failcnt,
Limit: memSt.Usage.Limit,
},
SwapUsage: &types.MemoryData{
Usage: memSt.SwapUsage.Usage,
MaxUsage: memSt.SwapUsage.MaxUsage,
Failcnt: memSt.SwapUsage.Failcnt,
Limit: memSt.SwapUsage.Limit,
},
KernelUsage: &types.MemoryData{
Usage: memSt.KernelUsage.Usage,
MaxUsage: memSt.KernelUsage.MaxUsage,
Failcnt: memSt.KernelUsage.Failcnt,
Limit: memSt.KernelUsage.Limit,
},
}
blkSt := lcSt.CgroupStats.BlkioStats
pbSt.CgroupStats.BlkioStats = &types.BlkioStats{
IoServiceBytesRecursive: convertBlkioEntryToPb(blkSt.IoServiceBytesRecursive),
IoServicedRecursive: convertBlkioEntryToPb(blkSt.IoServicedRecursive),
IoQueuedRecursive: convertBlkioEntryToPb(blkSt.IoQueuedRecursive),
IoServiceTimeRecursive: convertBlkioEntryToPb(blkSt.IoServiceTimeRecursive),
IoWaitTimeRecursive: convertBlkioEntryToPb(blkSt.IoWaitTimeRecursive),
IoMergedRecursive: convertBlkioEntryToPb(blkSt.IoMergedRecursive),
IoTimeRecursive: convertBlkioEntryToPb(blkSt.IoTimeRecursive),
SectorsRecursive: convertBlkioEntryToPb(blkSt.SectorsRecursive),
}
pbSt.CgroupStats.HugetlbStats = make(map[string]*types.HugetlbStats)
for k, st := range lcSt.CgroupStats.HugetlbStats {
pbSt.CgroupStats.HugetlbStats[k] = &types.HugetlbStats{
Usage: st.Usage,
MaxUsage: st.MaxUsage,
Failcnt: st.Failcnt,
}
}
return pbSt
}
func convertBlkioEntryToPb(b []cgroups.BlkioStatEntry) []*types.BlkioStatsEntry {
var pbEs []*types.BlkioStatsEntry
for _, e := range b {
pbEs = append(pbEs, &types.BlkioStatsEntry{
Major: e.Major,
Minor: e.Minor,
Op: e.Op,
Value: e.Value,
})
}
return pbEs
}
const nanoSecondsPerSecond = 1e9
var clockTicksPerSecond = uint64(system.GetClockTicks())
// getSystemCPUUsage returns the host system's cpu usage in
// nanoseconds. An error is returned if the format of the underlying
// file does not match.
//
// Uses /proc/stat defined by POSIX. Looks for the cpu
// statistics line and then sums up the first seven fields
// provided. See `man 5 proc` for details on specific field
// information.
func getSystemCPUUsage() (uint64, error) {
var line string
f, err := os.Open("/proc/stat")
if err != nil {
return 0, err
}
bufReader := bufio.NewReaderSize(nil, 128)
defer func() {
bufReader.Reset(nil)
f.Close()
}()
bufReader.Reset(f)
err = nil
for err == nil {
line, err = bufReader.ReadString('\n')
if err != nil {
break
}
parts := strings.Fields(line)
switch parts[0] {
case "cpu":
if len(parts) < 8 {
return 0, fmt.Errorf("bad format of cpu stats")
}
var totalClockTicks uint64
for _, i := range parts[1:8] {
v, err := strconv.ParseUint(i, 10, 64)
if err != nil {
return 0, fmt.Errorf("error parsing cpu stats")
}
totalClockTicks += v
}
return (totalClockTicks * nanoSecondsPerSecond) /
clockTicksPerSecond, nil
}
}
return 0, fmt.Errorf("bad stats format")
}
func (s *apiServer) Stats(ctx context.Context, r *types.StatsRequest) (*types.StatsResponse, error) {
e := &supervisor.StatsTask{}
e.ID = r.Id
e.Stat = make(chan *runtime.Stat, 1)
s.sv.SendTask(e)
if err := <-e.ErrorCh(); err != nil {
return nil, err
}
stats := <-e.Stat
t := convertToPb(stats)
return t, nil
}
func setUserFieldsInProcess(p *types.Process, oldProc runtime.ProcessSpec) {
p.User = &types.User{
Uid: oldProc.User.UID,
Gid: oldProc.User.GID,
AdditionalGids: oldProc.User.AdditionalGids,
}
}
func setPlatformRuntimeProcessSpecUserFields(r *types.User, process *runtime.ProcessSpec) {
process.User = specs.User{
UID: r.Uid,
GID: r.Gid,
AdditionalGids: r.AdditionalGids,
}
}

View file

@ -0,0 +1,39 @@
package server
import (
"errors"
"github.com/docker/containerd/api/grpc/types"
"github.com/docker/containerd/runtime"
"github.com/docker/containerd/supervisor"
"golang.org/x/net/context"
)
// noop on Windows (Checkpoints not supported)
func createContainerConfigCheckpoint(e *supervisor.StartTask, c *types.CreateContainerRequest) {
}
// TODO Windows - may be able to completely factor out
func (s *apiServer) CreateCheckpoint(ctx context.Context, r *types.CreateCheckpointRequest) (*types.CreateCheckpointResponse, error) {
return nil, errors.New("CreateCheckpoint() not supported on Windows")
}
// TODO Windows - may be able to completely factor out
func (s *apiServer) DeleteCheckpoint(ctx context.Context, r *types.DeleteCheckpointRequest) (*types.DeleteCheckpointResponse, error) {
return nil, errors.New("DeleteCheckpoint() not supported on Windows")
}
// TODO Windows - may be able to completely factor out
func (s *apiServer) ListCheckpoint(ctx context.Context, r *types.ListCheckpointRequest) (*types.ListCheckpointResponse, error) {
return nil, errors.New("ListCheckpoint() not supported on Windows")
}
func (s *apiServer) Stats(ctx context.Context, r *types.StatsRequest) (*types.StatsResponse, error) {
return nil, errors.New("Stats() not supported on Windows")
}
func setUserFieldsInProcess(p *types.Process, oldProc runtime.ProcessSpec) {
}
func setPlatformRuntimeProcessSpecUserFields(r *types.User, process *runtime.ProcessSpec) {
}

View file

@ -4,10 +4,8 @@ import (
"log"
"net"
"os"
"os/signal"
"runtime"
"sync"
"syscall"
"time"
"google.golang.org/grpc"
@ -92,22 +90,6 @@ func main() {
}
}
func checkLimits() error {
var l syscall.Rlimit
if err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &l); err != nil {
return err
}
if l.Cur <= minRlimit {
logrus.WithFields(logrus.Fields{
"current": l.Cur,
"max": l.Max,
}).Warn("containerd: low RLIMIT_NOFILE changing to max")
l.Cur = l.Max
return syscall.Setrlimit(syscall.RLIMIT_NOFILE, &l)
}
return nil
}
func debugMetrics(interval time.Duration, graphiteAddr string) error {
for name, m := range supervisor.Metrics() {
if err := metrics.DefaultRegistry.Register(name, m); err != nil {
@ -191,19 +173,6 @@ func daemon(address, stateDir string, concurrency int, oom bool) error {
return s.Serve(l)
}
func reapProcesses() {
s := make(chan os.Signal, 2048)
signal.Notify(s, syscall.SIGCHLD)
if err := osutils.SetSubreaper(1); err != nil {
logrus.WithField("error", err).Error("containerd: set subpreaper")
}
for range s {
if _, err := osutils.Reap(); err != nil {
logrus.WithField("error", err).Error("containerd: reap child processes")
}
}
}
// getDefaultID returns the hostname for the instance host
func getDefaultID() string {
hostname, err := os.Hostname()

39
containerd/main_linux.go Normal file
View file

@ -0,0 +1,39 @@
package main
import (
"os"
"os/signal"
"syscall"
"github.com/Sirupsen/logrus"
"github.com/docker/containerd/osutils"
)
func checkLimits() error {
var l syscall.Rlimit
if err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &l); err != nil {
return err
}
if l.Cur <= minRlimit {
logrus.WithFields(logrus.Fields{
"current": l.Cur,
"max": l.Max,
}).Warn("containerd: low RLIMIT_NOFILE changing to max")
l.Cur = l.Max
return syscall.Setrlimit(syscall.RLIMIT_NOFILE, &l)
}
return nil
}
func reapProcesses() {
s := make(chan os.Signal, 2048)
signal.Notify(s, syscall.SIGCHLD)
if err := osutils.SetSubreaper(1); err != nil {
logrus.WithField("error", err).Error("containerd: set subpreaper")
}
for range s {
if _, err := osutils.Reap(); err != nil {
logrus.WithField("error", err).Error("containerd: reap child processes")
}
}
}

View file

@ -0,0 +1,10 @@
package main
// TODO Windows: May be able to factor out entirely
func checkLimits() error {
return nil
}
// No idea how to implement this on Windows.
func reapProcesses() {
}

View file

@ -1 +1,9 @@
// +build windows
package osutils
// GetOpenFds returns the number of open fds for the process provided by pid
// Not supported on Windows (same as for docker daemon)
func GetOpenFds(pid int) (int, error) {
return -1, nil
}