add better generate

Signed-off-by: Jess Frazelle <acidburn@microsoft.com>
This commit is contained in:
Jess Frazelle 2018-03-20 01:33:56 -04:00
parent 3fc6abf56b
commit cdd93563f5
5655 changed files with 1187011 additions and 392 deletions

View file

@ -0,0 +1,877 @@
// +build !windows
package libcontainerd // import "github.com/docker/docker/libcontainerd"
import (
"context"
"encoding/json"
"fmt"
"io"
"os"
"path/filepath"
"reflect"
"runtime"
"strings"
"sync"
"syscall"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/containerd/containerd"
"github.com/containerd/containerd/api/events"
eventsapi "github.com/containerd/containerd/api/services/events/v1"
"github.com/containerd/containerd/api/types"
"github.com/containerd/containerd/archive"
"github.com/containerd/containerd/cio"
"github.com/containerd/containerd/content"
containerderrors "github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/linux/runctypes"
"github.com/containerd/typeurl"
"github.com/docker/docker/errdefs"
"github.com/docker/docker/pkg/ioutils"
"github.com/opencontainers/image-spec/specs-go/v1"
specs "github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
// InitProcessName is the name given to the first process of a
// container
const InitProcessName = "init"
type container struct {
mu sync.Mutex
bundleDir string
ctr containerd.Container
task containerd.Task
execs map[string]containerd.Process
oomKilled bool
}
func (c *container) setTask(t containerd.Task) {
c.mu.Lock()
c.task = t
c.mu.Unlock()
}
func (c *container) getTask() containerd.Task {
c.mu.Lock()
t := c.task
c.mu.Unlock()
return t
}
func (c *container) addProcess(id string, p containerd.Process) {
c.mu.Lock()
if c.execs == nil {
c.execs = make(map[string]containerd.Process)
}
c.execs[id] = p
c.mu.Unlock()
}
func (c *container) deleteProcess(id string) {
c.mu.Lock()
delete(c.execs, id)
c.mu.Unlock()
}
func (c *container) getProcess(id string) containerd.Process {
c.mu.Lock()
p := c.execs[id]
c.mu.Unlock()
return p
}
func (c *container) setOOMKilled(killed bool) {
c.mu.Lock()
c.oomKilled = killed
c.mu.Unlock()
}
func (c *container) getOOMKilled() bool {
c.mu.Lock()
killed := c.oomKilled
c.mu.Unlock()
return killed
}
type client struct {
sync.RWMutex // protects containers map
remote *containerd.Client
stateDir string
logger *logrus.Entry
namespace string
backend Backend
eventQ queue
containers map[string]*container
}
func (c *client) setRemote(remote *containerd.Client) {
c.Lock()
c.remote = remote
c.Unlock()
}
func (c *client) getRemote() *containerd.Client {
c.RLock()
remote := c.remote
c.RUnlock()
return remote
}
func (c *client) Version(ctx context.Context) (containerd.Version, error) {
return c.getRemote().Version(ctx)
}
func (c *client) Restore(ctx context.Context, id string, attachStdio StdioCallback) (alive bool, pid int, err error) {
c.Lock()
defer c.Unlock()
var dio *cio.DirectIO
defer func() {
if err != nil && dio != nil {
dio.Cancel()
dio.Close()
}
err = wrapError(err)
}()
ctr, err := c.remote.LoadContainer(ctx, id)
if err != nil {
return false, -1, errors.WithStack(err)
}
attachIO := func(fifos *cio.FIFOSet) (cio.IO, error) {
// dio must be assigned to the previously defined dio for the defer above
// to handle cleanup
dio, err = cio.NewDirectIO(ctx, fifos)
if err != nil {
return nil, err
}
return attachStdio(dio)
}
t, err := ctr.Task(ctx, attachIO)
if err != nil && !containerderrors.IsNotFound(err) {
return false, -1, err
}
if t != nil {
s, err := t.Status(ctx)
if err != nil {
return false, -1, err
}
alive = s.Status != containerd.Stopped
pid = int(t.Pid())
}
c.containers[id] = &container{
bundleDir: filepath.Join(c.stateDir, id),
ctr: ctr,
task: t,
// TODO(mlaventure): load execs
}
c.logger.WithFields(logrus.Fields{
"container": id,
"alive": alive,
"pid": pid,
}).Debug("restored container")
return alive, pid, nil
}
func (c *client) Create(ctx context.Context, id string, ociSpec *specs.Spec, runtimeOptions interface{}) error {
if ctr := c.getContainer(id); ctr != nil {
return errors.WithStack(newConflictError("id already in use"))
}
bdir, err := prepareBundleDir(filepath.Join(c.stateDir, id), ociSpec)
if err != nil {
return errdefs.System(errors.Wrap(err, "prepare bundle dir failed"))
}
c.logger.WithField("bundle", bdir).WithField("root", ociSpec.Root.Path).Debug("bundle dir created")
cdCtr, err := c.getRemote().NewContainer(ctx, id,
containerd.WithSpec(ociSpec),
// TODO(mlaventure): when containerd support lcow, revisit runtime value
containerd.WithRuntime(fmt.Sprintf("io.containerd.runtime.v1.%s", runtime.GOOS), runtimeOptions))
if err != nil {
return err
}
c.Lock()
c.containers[id] = &container{
bundleDir: bdir,
ctr: cdCtr,
}
c.Unlock()
return nil
}
// Start create and start a task for the specified containerd id
func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin bool, attachStdio StdioCallback) (int, error) {
ctr := c.getContainer(id)
if ctr == nil {
return -1, errors.WithStack(newNotFoundError("no such container"))
}
if t := ctr.getTask(); t != nil {
return -1, errors.WithStack(newConflictError("container already started"))
}
var (
cp *types.Descriptor
t containerd.Task
rio cio.IO
err error
stdinCloseSync = make(chan struct{})
)
if checkpointDir != "" {
// write checkpoint to the content store
tar := archive.Diff(ctx, "", checkpointDir)
cp, err = c.writeContent(ctx, images.MediaTypeContainerd1Checkpoint, checkpointDir, tar)
// remove the checkpoint when we're done
defer func() {
if cp != nil {
err := c.getRemote().ContentStore().Delete(context.Background(), cp.Digest)
if err != nil {
c.logger.WithError(err).WithFields(logrus.Fields{
"ref": checkpointDir,
"digest": cp.Digest,
}).Warnf("failed to delete temporary checkpoint entry")
}
}
}()
if err := tar.Close(); err != nil {
return -1, errors.Wrap(err, "failed to close checkpoint tar stream")
}
if err != nil {
return -1, errors.Wrapf(err, "failed to upload checkpoint to containerd")
}
}
spec, err := ctr.ctr.Spec(ctx)
if err != nil {
return -1, errors.Wrap(err, "failed to retrieve spec")
}
uid, gid := getSpecUser(spec)
t, err = ctr.ctr.NewTask(ctx,
func(id string) (cio.IO, error) {
fifos := newFIFOSet(ctr.bundleDir, InitProcessName, withStdin, spec.Process.Terminal)
rio, err = c.createIO(fifos, id, InitProcessName, stdinCloseSync, attachStdio)
return rio, err
},
func(_ context.Context, _ *containerd.Client, info *containerd.TaskInfo) error {
info.Checkpoint = cp
info.Options = &runctypes.CreateOptions{
IoUid: uint32(uid),
IoGid: uint32(gid),
NoPivotRoot: os.Getenv("DOCKER_RAMDISK") != "",
}
return nil
})
if err != nil {
close(stdinCloseSync)
if rio != nil {
rio.Cancel()
rio.Close()
}
return -1, err
}
ctr.setTask(t)
// Signal c.createIO that it can call CloseIO
close(stdinCloseSync)
if err := t.Start(ctx); err != nil {
if _, err := t.Delete(ctx); err != nil {
c.logger.WithError(err).WithField("container", id).
Error("failed to delete task after fail start")
}
ctr.setTask(nil)
return -1, err
}
return int(t.Pid()), nil
}
func (c *client) Exec(ctx context.Context, containerID, processID string, spec *specs.Process, withStdin bool, attachStdio StdioCallback) (int, error) {
ctr := c.getContainer(containerID)
if ctr == nil {
return -1, errors.WithStack(newNotFoundError("no such container"))
}
t := ctr.getTask()
if t == nil {
return -1, errors.WithStack(newInvalidParameterError("container is not running"))
}
if p := ctr.getProcess(processID); p != nil {
return -1, errors.WithStack(newConflictError("id already in use"))
}
var (
p containerd.Process
rio cio.IO
err error
stdinCloseSync = make(chan struct{})
)
fifos := newFIFOSet(ctr.bundleDir, processID, withStdin, spec.Terminal)
defer func() {
if err != nil {
if rio != nil {
rio.Cancel()
rio.Close()
}
}
}()
p, err = t.Exec(ctx, processID, spec, func(id string) (cio.IO, error) {
rio, err = c.createIO(fifos, containerID, processID, stdinCloseSync, attachStdio)
return rio, err
})
if err != nil {
close(stdinCloseSync)
return -1, err
}
ctr.addProcess(processID, p)
// Signal c.createIO that it can call CloseIO
close(stdinCloseSync)
if err = p.Start(ctx); err != nil {
p.Delete(context.Background())
ctr.deleteProcess(processID)
return -1, err
}
return int(p.Pid()), nil
}
func (c *client) SignalProcess(ctx context.Context, containerID, processID string, signal int) error {
p, err := c.getProcess(containerID, processID)
if err != nil {
return err
}
return wrapError(p.Kill(ctx, syscall.Signal(signal)))
}
func (c *client) ResizeTerminal(ctx context.Context, containerID, processID string, width, height int) error {
p, err := c.getProcess(containerID, processID)
if err != nil {
return err
}
return p.Resize(ctx, uint32(width), uint32(height))
}
func (c *client) CloseStdin(ctx context.Context, containerID, processID string) error {
p, err := c.getProcess(containerID, processID)
if err != nil {
return err
}
return p.CloseIO(ctx, containerd.WithStdinCloser)
}
func (c *client) Pause(ctx context.Context, containerID string) error {
p, err := c.getProcess(containerID, InitProcessName)
if err != nil {
return err
}
return p.(containerd.Task).Pause(ctx)
}
func (c *client) Resume(ctx context.Context, containerID string) error {
p, err := c.getProcess(containerID, InitProcessName)
if err != nil {
return err
}
return p.(containerd.Task).Resume(ctx)
}
func (c *client) Stats(ctx context.Context, containerID string) (*Stats, error) {
p, err := c.getProcess(containerID, InitProcessName)
if err != nil {
return nil, err
}
m, err := p.(containerd.Task).Metrics(ctx)
if err != nil {
return nil, err
}
v, err := typeurl.UnmarshalAny(m.Data)
if err != nil {
return nil, err
}
return interfaceToStats(m.Timestamp, v), nil
}
func (c *client) ListPids(ctx context.Context, containerID string) ([]uint32, error) {
p, err := c.getProcess(containerID, InitProcessName)
if err != nil {
return nil, err
}
pis, err := p.(containerd.Task).Pids(ctx)
if err != nil {
return nil, err
}
var pids []uint32
for _, i := range pis {
pids = append(pids, i.Pid)
}
return pids, nil
}
func (c *client) Summary(ctx context.Context, containerID string) ([]Summary, error) {
p, err := c.getProcess(containerID, InitProcessName)
if err != nil {
return nil, err
}
pis, err := p.(containerd.Task).Pids(ctx)
if err != nil {
return nil, err
}
var infos []Summary
for _, pi := range pis {
i, err := typeurl.UnmarshalAny(pi.Info)
if err != nil {
return nil, errors.Wrap(err, "unable to decode process details")
}
s, err := summaryFromInterface(i)
if err != nil {
return nil, err
}
infos = append(infos, *s)
}
return infos, nil
}
func (c *client) DeleteTask(ctx context.Context, containerID string) (uint32, time.Time, error) {
p, err := c.getProcess(containerID, InitProcessName)
if err != nil {
return 255, time.Now(), nil
}
status, err := p.(containerd.Task).Delete(ctx)
if err != nil {
return 255, time.Now(), nil
}
if ctr := c.getContainer(containerID); ctr != nil {
ctr.setTask(nil)
}
return status.ExitCode(), status.ExitTime(), nil
}
func (c *client) Delete(ctx context.Context, containerID string) error {
ctr := c.getContainer(containerID)
if ctr == nil {
return errors.WithStack(newNotFoundError("no such container"))
}
if err := ctr.ctr.Delete(ctx); err != nil {
return err
}
if os.Getenv("LIBCONTAINERD_NOCLEAN") != "1" {
if err := os.RemoveAll(ctr.bundleDir); err != nil {
c.logger.WithError(err).WithFields(logrus.Fields{
"container": containerID,
"bundle": ctr.bundleDir,
}).Error("failed to remove state dir")
}
}
c.removeContainer(containerID)
return nil
}
func (c *client) Status(ctx context.Context, containerID string) (Status, error) {
ctr := c.getContainer(containerID)
if ctr == nil {
return StatusUnknown, errors.WithStack(newNotFoundError("no such container"))
}
t := ctr.getTask()
if t == nil {
return StatusUnknown, errors.WithStack(newNotFoundError("no such task"))
}
s, err := t.Status(ctx)
if err != nil {
return StatusUnknown, err
}
return Status(s.Status), nil
}
func (c *client) CreateCheckpoint(ctx context.Context, containerID, checkpointDir string, exit bool) error {
p, err := c.getProcess(containerID, InitProcessName)
if err != nil {
return err
}
img, err := p.(containerd.Task).Checkpoint(ctx)
if err != nil {
return err
}
// Whatever happens, delete the checkpoint from containerd
defer func() {
err := c.getRemote().ImageService().Delete(context.Background(), img.Name())
if err != nil {
c.logger.WithError(err).WithField("digest", img.Target().Digest).
Warnf("failed to delete checkpoint image")
}
}()
b, err := content.ReadBlob(ctx, c.getRemote().ContentStore(), img.Target().Digest)
if err != nil {
return errdefs.System(errors.Wrapf(err, "failed to retrieve checkpoint data"))
}
var index v1.Index
if err := json.Unmarshal(b, &index); err != nil {
return errdefs.System(errors.Wrapf(err, "failed to decode checkpoint data"))
}
var cpDesc *v1.Descriptor
for _, m := range index.Manifests {
if m.MediaType == images.MediaTypeContainerd1Checkpoint {
cpDesc = &m
break
}
}
if cpDesc == nil {
return errdefs.System(errors.Wrapf(err, "invalid checkpoint"))
}
rat, err := c.getRemote().ContentStore().ReaderAt(ctx, cpDesc.Digest)
if err != nil {
return errdefs.System(errors.Wrapf(err, "failed to get checkpoint reader"))
}
defer rat.Close()
_, err = archive.Apply(ctx, checkpointDir, content.NewReader(rat))
if err != nil {
return errdefs.System(errors.Wrapf(err, "failed to read checkpoint reader"))
}
return err
}
func (c *client) getContainer(id string) *container {
c.RLock()
ctr := c.containers[id]
c.RUnlock()
return ctr
}
func (c *client) removeContainer(id string) {
c.Lock()
delete(c.containers, id)
c.Unlock()
}
func (c *client) getProcess(containerID, processID string) (containerd.Process, error) {
ctr := c.getContainer(containerID)
if ctr == nil {
return nil, errors.WithStack(newNotFoundError("no such container"))
}
t := ctr.getTask()
if t == nil {
return nil, errors.WithStack(newNotFoundError("container is not running"))
}
if processID == InitProcessName {
return t, nil
}
p := ctr.getProcess(processID)
if p == nil {
return nil, errors.WithStack(newNotFoundError("no such exec"))
}
return p, nil
}
// createIO creates the io to be used by a process
// This needs to get a pointer to interface as upon closure the process may not have yet been registered
func (c *client) createIO(fifos *cio.FIFOSet, containerID, processID string, stdinCloseSync chan struct{}, attachStdio StdioCallback) (cio.IO, error) {
io, err := cio.NewDirectIO(context.Background(), fifos)
if err != nil {
return nil, err
}
if io.Stdin != nil {
var (
err error
stdinOnce sync.Once
)
pipe := io.Stdin
io.Stdin = ioutils.NewWriteCloserWrapper(pipe, func() error {
stdinOnce.Do(func() {
err = pipe.Close()
// Do the rest in a new routine to avoid a deadlock if the
// Exec/Start call failed.
go func() {
<-stdinCloseSync
p, err := c.getProcess(containerID, processID)
if err == nil {
err = p.CloseIO(context.Background(), containerd.WithStdinCloser)
if err != nil && strings.Contains(err.Error(), "transport is closing") {
err = nil
}
}
}()
})
return err
})
}
rio, err := attachStdio(io)
if err != nil {
io.Cancel()
io.Close()
}
return rio, err
}
func (c *client) processEvent(ctr *container, et EventType, ei EventInfo) {
c.eventQ.append(ei.ContainerID, func() {
err := c.backend.ProcessEvent(ei.ContainerID, et, ei)
if err != nil {
c.logger.WithError(err).WithFields(logrus.Fields{
"container": ei.ContainerID,
"event": et,
"event-info": ei,
}).Error("failed to process event")
}
if et == EventExit && ei.ProcessID != ei.ContainerID {
p := ctr.getProcess(ei.ProcessID)
if p == nil {
c.logger.WithError(errors.New("no such process")).
WithFields(logrus.Fields{
"container": ei.ContainerID,
"process": ei.ProcessID,
}).Error("exit event")
return
}
_, err = p.Delete(context.Background())
if err != nil {
c.logger.WithError(err).WithFields(logrus.Fields{
"container": ei.ContainerID,
"process": ei.ProcessID,
}).Warn("failed to delete process")
}
ctr.deleteProcess(ei.ProcessID)
ctr := c.getContainer(ei.ContainerID)
if ctr == nil {
c.logger.WithFields(logrus.Fields{
"container": ei.ContainerID,
}).Error("failed to find container")
} else {
newFIFOSet(ctr.bundleDir, ei.ProcessID, true, false).Close()
}
}
})
}
func (c *client) processEventStream(ctx context.Context) {
var (
err error
eventStream eventsapi.Events_SubscribeClient
ev *eventsapi.Envelope
et EventType
ei EventInfo
ctr *container
)
defer func() {
if err != nil {
select {
case <-ctx.Done():
c.logger.WithError(ctx.Err()).
Info("stopping event stream following graceful shutdown")
default:
go c.processEventStream(ctx)
}
}
}()
eventStream, err = c.getRemote().EventService().Subscribe(ctx, &eventsapi.SubscribeRequest{
Filters: []string{
// Filter on both namespace *and* topic. To create an "and" filter,
// this must be a single, comma-separated string
"namespace==" + c.namespace + ",topic~=|^/tasks/|",
},
}, grpc.FailFast(false))
if err != nil {
return
}
c.logger.WithField("namespace", c.namespace).Debug("processing event stream")
var oomKilled bool
for {
ev, err = eventStream.Recv()
if err != nil {
errStatus, ok := status.FromError(err)
if !ok || errStatus.Code() != codes.Canceled {
c.logger.WithError(err).Error("failed to get event")
}
return
}
if ev.Event == nil {
c.logger.WithField("event", ev).Warn("invalid event")
continue
}
v, err := typeurl.UnmarshalAny(ev.Event)
if err != nil {
c.logger.WithError(err).WithField("event", ev).Warn("failed to unmarshal event")
continue
}
c.logger.WithField("topic", ev.Topic).Debug("event")
switch t := v.(type) {
case *events.TaskCreate:
et = EventCreate
ei = EventInfo{
ContainerID: t.ContainerID,
ProcessID: t.ContainerID,
Pid: t.Pid,
}
case *events.TaskStart:
et = EventStart
ei = EventInfo{
ContainerID: t.ContainerID,
ProcessID: t.ContainerID,
Pid: t.Pid,
}
case *events.TaskExit:
et = EventExit
ei = EventInfo{
ContainerID: t.ContainerID,
ProcessID: t.ID,
Pid: t.Pid,
ExitCode: t.ExitStatus,
ExitedAt: t.ExitedAt,
}
case *events.TaskOOM:
et = EventOOM
ei = EventInfo{
ContainerID: t.ContainerID,
OOMKilled: true,
}
oomKilled = true
case *events.TaskExecAdded:
et = EventExecAdded
ei = EventInfo{
ContainerID: t.ContainerID,
ProcessID: t.ExecID,
}
case *events.TaskExecStarted:
et = EventExecStarted
ei = EventInfo{
ContainerID: t.ContainerID,
ProcessID: t.ExecID,
Pid: t.Pid,
}
case *events.TaskPaused:
et = EventPaused
ei = EventInfo{
ContainerID: t.ContainerID,
}
case *events.TaskResumed:
et = EventResumed
ei = EventInfo{
ContainerID: t.ContainerID,
}
default:
c.logger.WithFields(logrus.Fields{
"topic": ev.Topic,
"type": reflect.TypeOf(t)},
).Info("ignoring event")
continue
}
ctr = c.getContainer(ei.ContainerID)
if ctr == nil {
c.logger.WithField("container", ei.ContainerID).Warn("unknown container")
continue
}
if oomKilled {
ctr.setOOMKilled(true)
oomKilled = false
}
ei.OOMKilled = ctr.getOOMKilled()
c.processEvent(ctr, et, ei)
}
}
func (c *client) writeContent(ctx context.Context, mediaType, ref string, r io.Reader) (*types.Descriptor, error) {
writer, err := c.getRemote().ContentStore().Writer(ctx, ref, 0, "")
if err != nil {
return nil, err
}
defer writer.Close()
size, err := io.Copy(writer, r)
if err != nil {
return nil, err
}
labels := map[string]string{
"containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339),
}
if err := writer.Commit(ctx, 0, "", content.WithLabels(labels)); err != nil {
return nil, err
}
return &types.Descriptor{
MediaType: mediaType,
Digest: writer.Digest(),
Size_: size,
}, nil
}
func wrapError(err error) error {
switch {
case err == nil:
return nil
case containerderrors.IsNotFound(err):
return errdefs.NotFound(err)
}
msg := err.Error()
for _, s := range []string{"container does not exist", "not found", "no such container"} {
if strings.Contains(msg, s) {
return errdefs.NotFound(err)
}
}
return err
}

View file

@ -0,0 +1,108 @@
package libcontainerd // import "github.com/docker/docker/libcontainerd"
import (
"context"
"fmt"
"os"
"path/filepath"
"strings"
"github.com/containerd/containerd"
"github.com/containerd/containerd/cio"
"github.com/docker/docker/pkg/idtools"
specs "github.com/opencontainers/runtime-spec/specs-go"
"github.com/sirupsen/logrus"
)
func summaryFromInterface(i interface{}) (*Summary, error) {
return &Summary{}, nil
}
func (c *client) UpdateResources(ctx context.Context, containerID string, resources *Resources) error {
p, err := c.getProcess(containerID, InitProcessName)
if err != nil {
return err
}
// go doesn't like the alias in 1.8, this means this need to be
// platform specific
return p.(containerd.Task).Update(ctx, containerd.WithResources((*specs.LinuxResources)(resources)))
}
func hostIDFromMap(id uint32, mp []specs.LinuxIDMapping) int {
for _, m := range mp {
if id >= m.ContainerID && id <= m.ContainerID+m.Size-1 {
return int(m.HostID + id - m.ContainerID)
}
}
return 0
}
func getSpecUser(ociSpec *specs.Spec) (int, int) {
var (
uid int
gid int
)
for _, ns := range ociSpec.Linux.Namespaces {
if ns.Type == specs.UserNamespace {
uid = hostIDFromMap(0, ociSpec.Linux.UIDMappings)
gid = hostIDFromMap(0, ociSpec.Linux.GIDMappings)
break
}
}
return uid, gid
}
func prepareBundleDir(bundleDir string, ociSpec *specs.Spec) (string, error) {
uid, gid := getSpecUser(ociSpec)
if uid == 0 && gid == 0 {
return bundleDir, idtools.MkdirAllAndChownNew(bundleDir, 0755, idtools.IDPair{0, 0})
}
p := string(filepath.Separator)
components := strings.Split(bundleDir, string(filepath.Separator))
for _, d := range components[1:] {
p = filepath.Join(p, d)
fi, err := os.Stat(p)
if err != nil && !os.IsNotExist(err) {
return "", err
}
if os.IsNotExist(err) || fi.Mode()&1 == 0 {
p = fmt.Sprintf("%s.%d.%d", p, uid, gid)
if err := idtools.MkdirAndChown(p, 0700, idtools.IDPair{uid, gid}); err != nil && !os.IsExist(err) {
return "", err
}
}
}
return p, nil
}
func newFIFOSet(bundleDir, processID string, withStdin, withTerminal bool) *cio.FIFOSet {
config := cio.Config{
Terminal: withTerminal,
Stdout: filepath.Join(bundleDir, processID+"-stdout"),
}
paths := []string{config.Stdout}
if withStdin {
config.Stdin = filepath.Join(bundleDir, processID+"-stdin")
paths = append(paths, config.Stdin)
}
if !withTerminal {
config.Stderr = filepath.Join(bundleDir, processID+"-stderr")
paths = append(paths, config.Stderr)
}
closer := func() error {
for _, path := range paths {
if err := os.RemoveAll(path); err != nil {
logrus.Warnf("libcontainerd: failed to remove fifo %v: %v", path, err)
}
}
return nil
}
return cio.NewFIFOSet(config, closer)
}

View file

@ -0,0 +1,55 @@
package libcontainerd // import "github.com/docker/docker/libcontainerd"
import (
"fmt"
"path/filepath"
"github.com/containerd/containerd/cio"
"github.com/containerd/containerd/windows/hcsshimtypes"
specs "github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors"
)
func summaryFromInterface(i interface{}) (*Summary, error) {
switch pd := i.(type) {
case *hcsshimtypes.ProcessDetails:
return &Summary{
CreateTimestamp: pd.CreatedAt,
ImageName: pd.ImageName,
KernelTime100ns: pd.KernelTime_100Ns,
MemoryCommitBytes: pd.MemoryCommitBytes,
MemoryWorkingSetPrivateBytes: pd.MemoryWorkingSetPrivateBytes,
MemoryWorkingSetSharedBytes: pd.MemoryWorkingSetSharedBytes,
ProcessId: pd.ProcessID,
UserTime100ns: pd.UserTime_100Ns,
}, nil
default:
return nil, errors.Errorf("Unknown process details type %T", pd)
}
}
func prepareBundleDir(bundleDir string, ociSpec *specs.Spec) (string, error) {
return bundleDir, nil
}
func pipeName(containerID, processID, name string) string {
return fmt.Sprintf(`\\.\pipe\containerd-%s-%s-%s`, containerID, processID, name)
}
func newFIFOSet(bundleDir, processID string, withStdin, withTerminal bool) *cio.FIFOSet {
containerID := filepath.Base(bundleDir)
config := cio.Config{
Terminal: withTerminal,
Stdout: pipeName(containerID, processID, "stdout"),
}
if withStdin {
config.Stdin = pipeName(containerID, processID, "stdin")
}
if !config.Terminal {
config.Stderr = pipeName(containerID, processID, "stderr")
}
return cio.NewFIFOSet(config, nil)
}

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,13 @@
package libcontainerd // import "github.com/docker/docker/libcontainerd"
import (
"errors"
"github.com/docker/docker/errdefs"
)
func newNotFoundError(err string) error { return errdefs.NotFound(errors.New(err)) }
func newInvalidParameterError(err string) error { return errdefs.InvalidParameter(errors.New(err)) }
func newConflictError(err string) error { return errdefs.Conflict(errors.New(err)) }

View file

@ -0,0 +1,44 @@
package libcontainerd // import "github.com/docker/docker/libcontainerd"
import (
"io"
"sync"
"github.com/Microsoft/hcsshim"
"github.com/docker/docker/pkg/ioutils"
)
type autoClosingReader struct {
io.ReadCloser
sync.Once
}
func (r *autoClosingReader) Read(b []byte) (n int, err error) {
n, err = r.ReadCloser.Read(b)
if err != nil {
r.Once.Do(func() { r.ReadCloser.Close() })
}
return
}
func createStdInCloser(pipe io.WriteCloser, process hcsshim.Process) io.WriteCloser {
return ioutils.NewWriteCloserWrapper(pipe, func() error {
if err := pipe.Close(); err != nil {
return err
}
err := process.CloseStdin()
if err != nil && !hcsshim.IsNotExist(err) && !hcsshim.IsAlreadyClosed(err) {
// This error will occur if the compute system is currently shutting down
if perr, ok := err.(*hcsshim.ProcessError); ok && perr.Err != hcsshim.ErrVmcomputeOperationInvalidState {
return err
}
}
return nil
})
}
func (p *process) Cleanup() error {
return nil
}

View file

@ -0,0 +1,35 @@
package libcontainerd // import "github.com/docker/docker/libcontainerd"
import "sync"
type queue struct {
sync.Mutex
fns map[string]chan struct{}
}
func (q *queue) append(id string, f func()) {
q.Lock()
defer q.Unlock()
if q.fns == nil {
q.fns = make(map[string]chan struct{})
}
done := make(chan struct{})
fn, ok := q.fns[id]
q.fns[id] = done
go func() {
if ok {
<-fn
}
f()
close(done)
q.Lock()
if q.fns[id] == done {
delete(q.fns, id)
}
q.Unlock()
}()
}

View file

@ -0,0 +1,31 @@
package libcontainerd // import "github.com/docker/docker/libcontainerd"
import (
"testing"
"time"
"github.com/gotestyourself/gotestyourself/assert"
)
func TestSerialization(t *testing.T) {
var (
q queue
serialization = 1
)
q.append("aaa", func() {
//simulate a long time task
time.Sleep(10 * time.Millisecond)
assert.Equal(t, serialization, 1)
serialization = 2
})
q.append("aaa", func() {
assert.Equal(t, serialization, 2)
serialization = 3
})
q.append("aaa", func() {
assert.Equal(t, serialization, 3)
serialization = 4
})
time.Sleep(20 * time.Millisecond)
}

View file

@ -0,0 +1,351 @@
// +build !windows
package libcontainerd // import "github.com/docker/docker/libcontainerd"
import (
"context"
"fmt"
"io"
"io/ioutil"
"os"
"os/exec"
"path/filepath"
"strconv"
"strings"
"sync"
"syscall"
"time"
"github.com/BurntSushi/toml"
"github.com/containerd/containerd"
"github.com/containerd/containerd/server"
"github.com/docker/docker/pkg/system"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
const (
maxConnectionRetryCount = 3
healthCheckTimeout = 3 * time.Second
shutdownTimeout = 15 * time.Second
configFile = "containerd.toml"
binaryName = "docker-containerd"
pidFile = "docker-containerd.pid"
)
type pluginConfigs struct {
Plugins map[string]interface{} `toml:"plugins"`
}
type remote struct {
sync.RWMutex
server.Config
daemonPid int
logger *logrus.Entry
daemonWaitCh chan struct{}
clients []*client
shutdownContext context.Context
shutdownCancel context.CancelFunc
shutdown bool
// Options
startDaemon bool
rootDir string
stateDir string
snapshotter string
pluginConfs pluginConfigs
}
// New creates a fresh instance of libcontainerd remote.
func New(rootDir, stateDir string, options ...RemoteOption) (rem Remote, err error) {
defer func() {
if err != nil {
err = errors.Wrap(err, "Failed to connect to containerd")
}
}()
r := &remote{
rootDir: rootDir,
stateDir: stateDir,
Config: server.Config{
Root: filepath.Join(rootDir, "daemon"),
State: filepath.Join(stateDir, "daemon"),
},
pluginConfs: pluginConfigs{make(map[string]interface{})},
daemonPid: -1,
logger: logrus.WithField("module", "libcontainerd"),
}
r.shutdownContext, r.shutdownCancel = context.WithCancel(context.Background())
rem = r
for _, option := range options {
if err = option.Apply(r); err != nil {
return
}
}
r.setDefaults()
if err = system.MkdirAll(stateDir, 0700, ""); err != nil {
return
}
if r.startDaemon {
os.Remove(r.GRPC.Address)
if err = r.startContainerd(); err != nil {
return
}
defer func() {
if err != nil {
r.Cleanup()
}
}()
}
// This connection is just used to monitor the connection
client, err := containerd.New(r.GRPC.Address)
if err != nil {
return
}
if _, err := client.Version(context.Background()); err != nil {
system.KillProcess(r.daemonPid)
return nil, errors.Wrapf(err, "unable to get containerd version")
}
go r.monitorConnection(client)
return r, nil
}
func (r *remote) NewClient(ns string, b Backend) (Client, error) {
c := &client{
stateDir: r.stateDir,
logger: r.logger.WithField("namespace", ns),
namespace: ns,
backend: b,
containers: make(map[string]*container),
}
rclient, err := containerd.New(r.GRPC.Address, containerd.WithDefaultNamespace(ns))
if err != nil {
return nil, err
}
c.remote = rclient
go c.processEventStream(r.shutdownContext)
r.Lock()
r.clients = append(r.clients, c)
r.Unlock()
return c, nil
}
func (r *remote) Cleanup() {
if r.daemonPid != -1 {
r.shutdownCancel()
r.stopDaemon()
}
// cleanup some files
os.Remove(filepath.Join(r.stateDir, pidFile))
r.platformCleanup()
}
func (r *remote) getContainerdPid() (int, error) {
pidFile := filepath.Join(r.stateDir, pidFile)
f, err := os.OpenFile(pidFile, os.O_RDWR, 0600)
if err != nil {
if os.IsNotExist(err) {
return -1, nil
}
return -1, err
}
defer f.Close()
b := make([]byte, 8)
n, err := f.Read(b)
if err != nil && err != io.EOF {
return -1, err
}
if n > 0 {
pid, err := strconv.ParseUint(string(b[:n]), 10, 64)
if err != nil {
return -1, err
}
if system.IsProcessAlive(int(pid)) {
return int(pid), nil
}
}
return -1, nil
}
func (r *remote) getContainerdConfig() (string, error) {
path := filepath.Join(r.stateDir, configFile)
f, err := os.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0600)
if err != nil {
return "", errors.Wrapf(err, "failed to open containerd config file at %s", path)
}
defer f.Close()
enc := toml.NewEncoder(f)
if err = enc.Encode(r.Config); err != nil {
return "", errors.Wrapf(err, "failed to encode general config")
}
if err = enc.Encode(r.pluginConfs); err != nil {
return "", errors.Wrapf(err, "failed to encode plugin configs")
}
return path, nil
}
func (r *remote) startContainerd() error {
pid, err := r.getContainerdPid()
if err != nil {
return err
}
if pid != -1 {
r.daemonPid = pid
logrus.WithField("pid", pid).
Infof("libcontainerd: %s is still running", binaryName)
return nil
}
configFile, err := r.getContainerdConfig()
if err != nil {
return err
}
args := []string{"--config", configFile}
cmd := exec.Command(binaryName, args...)
// redirect containerd logs to docker logs
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
cmd.SysProcAttr = containerdSysProcAttr()
// clear the NOTIFY_SOCKET from the env when starting containerd
cmd.Env = nil
for _, e := range os.Environ() {
if !strings.HasPrefix(e, "NOTIFY_SOCKET") {
cmd.Env = append(cmd.Env, e)
}
}
if err := cmd.Start(); err != nil {
return err
}
r.daemonWaitCh = make(chan struct{})
go func() {
// Reap our child when needed
if err := cmd.Wait(); err != nil {
r.logger.WithError(err).Errorf("containerd did not exit successfully")
}
close(r.daemonWaitCh)
}()
r.daemonPid = cmd.Process.Pid
err = ioutil.WriteFile(filepath.Join(r.stateDir, pidFile), []byte(fmt.Sprintf("%d", r.daemonPid)), 0660)
if err != nil {
system.KillProcess(r.daemonPid)
return errors.Wrap(err, "libcontainerd: failed to save daemon pid to disk")
}
logrus.WithField("pid", r.daemonPid).
Infof("libcontainerd: started new %s process", binaryName)
return nil
}
func (r *remote) monitorConnection(monitor *containerd.Client) {
var transientFailureCount = 0
for {
select {
case <-r.shutdownContext.Done():
r.logger.Info("stopping healthcheck following graceful shutdown")
monitor.Close()
return
case <-time.After(500 * time.Millisecond):
}
ctx, cancel := context.WithTimeout(r.shutdownContext, healthCheckTimeout)
_, err := monitor.IsServing(ctx)
cancel()
if err == nil {
transientFailureCount = 0
continue
}
select {
case <-r.shutdownContext.Done():
r.logger.Info("stopping healthcheck following graceful shutdown")
monitor.Close()
return
default:
}
r.logger.WithError(err).WithField("binary", binaryName).Debug("daemon is not responding")
if r.daemonPid == -1 {
continue
}
transientFailureCount++
if transientFailureCount < maxConnectionRetryCount || system.IsProcessAlive(r.daemonPid) {
continue
}
transientFailureCount = 0
if system.IsProcessAlive(r.daemonPid) {
r.logger.WithField("pid", r.daemonPid).Info("killing and restarting containerd")
// Try to get a stack trace
syscall.Kill(r.daemonPid, syscall.SIGUSR1)
<-time.After(100 * time.Millisecond)
system.KillProcess(r.daemonPid)
}
<-r.daemonWaitCh
monitor.Close()
os.Remove(r.GRPC.Address)
if err := r.startContainerd(); err != nil {
r.logger.WithError(err).Error("failed restarting containerd")
continue
}
newMonitor, err := containerd.New(r.GRPC.Address)
if err != nil {
r.logger.WithError(err).Error("failed connect to containerd")
continue
}
monitor = newMonitor
var wg sync.WaitGroup
for _, c := range r.clients {
wg.Add(1)
go func(c *client) {
defer wg.Done()
c.logger.WithField("namespace", c.namespace).Debug("creating new containerd remote client")
c.remote.Close()
remote, err := containerd.New(r.GRPC.Address, containerd.WithDefaultNamespace(c.namespace))
if err != nil {
r.logger.WithError(err).Error("failed to connect to containerd")
// TODO: Better way to handle this?
// This *shouldn't* happen, but this could wind up where the daemon
// is not able to communicate with an eventually up containerd
return
}
c.setRemote(remote)
}(c)
wg.Wait()
}
}
}

View file

@ -0,0 +1,54 @@
package libcontainerd // import "github.com/docker/docker/libcontainerd"
import (
"os"
"path/filepath"
"syscall"
"time"
"github.com/docker/docker/pkg/system"
)
const (
sockFile = "docker-containerd.sock"
debugSockFile = "docker-containerd-debug.sock"
)
func (r *remote) setDefaults() {
if r.GRPC.Address == "" {
r.GRPC.Address = filepath.Join(r.stateDir, sockFile)
}
if r.Debug.Address == "" {
r.Debug.Address = filepath.Join(r.stateDir, debugSockFile)
}
if r.Debug.Level == "" {
r.Debug.Level = "info"
}
if r.OOMScore == 0 {
r.OOMScore = -999
}
if r.snapshotter == "" {
r.snapshotter = "overlay"
}
}
func (r *remote) stopDaemon() {
// Ask the daemon to quit
syscall.Kill(r.daemonPid, syscall.SIGTERM)
// Wait up to 15secs for it to stop
for i := time.Duration(0); i < shutdownTimeout; i += time.Second {
if !system.IsProcessAlive(r.daemonPid) {
break
}
time.Sleep(time.Second)
}
if system.IsProcessAlive(r.daemonPid) {
r.logger.WithField("pid", r.daemonPid).Warn("daemon didn't stop within 15 secs, killing it")
syscall.Kill(r.daemonPid, syscall.SIGKILL)
}
}
func (r *remote) platformCleanup() {
os.Remove(filepath.Join(r.stateDir, sockFile))
}

View file

@ -0,0 +1,141 @@
// +build !windows
package libcontainerd // import "github.com/docker/docker/libcontainerd"
import "fmt"
// WithRemoteAddr sets the external containerd socket to connect to.
func WithRemoteAddr(addr string) RemoteOption {
return rpcAddr(addr)
}
type rpcAddr string
func (a rpcAddr) Apply(r Remote) error {
if remote, ok := r.(*remote); ok {
remote.GRPC.Address = string(a)
return nil
}
return fmt.Errorf("WithRemoteAddr option not supported for this remote")
}
// WithRemoteAddrUser sets the uid and gid to create the RPC address with
func WithRemoteAddrUser(uid, gid int) RemoteOption {
return rpcUser{uid, gid}
}
type rpcUser struct {
uid int
gid int
}
func (u rpcUser) Apply(r Remote) error {
if remote, ok := r.(*remote); ok {
remote.GRPC.UID = u.uid
remote.GRPC.GID = u.gid
return nil
}
return fmt.Errorf("WithRemoteAddr option not supported for this remote")
}
// WithStartDaemon defines if libcontainerd should also run containerd daemon.
func WithStartDaemon(start bool) RemoteOption {
return startDaemon(start)
}
type startDaemon bool
func (s startDaemon) Apply(r Remote) error {
if remote, ok := r.(*remote); ok {
remote.startDaemon = bool(s)
return nil
}
return fmt.Errorf("WithStartDaemon option not supported for this remote")
}
// WithLogLevel defines which log level to starts containerd with.
// This only makes sense if WithStartDaemon() was set to true.
func WithLogLevel(lvl string) RemoteOption {
return logLevel(lvl)
}
type logLevel string
func (l logLevel) Apply(r Remote) error {
if remote, ok := r.(*remote); ok {
remote.Debug.Level = string(l)
return nil
}
return fmt.Errorf("WithDebugLog option not supported for this remote")
}
// WithDebugAddress defines at which location the debug GRPC connection
// should be made
func WithDebugAddress(addr string) RemoteOption {
return debugAddress(addr)
}
type debugAddress string
func (d debugAddress) Apply(r Remote) error {
if remote, ok := r.(*remote); ok {
remote.Debug.Address = string(d)
return nil
}
return fmt.Errorf("WithDebugAddress option not supported for this remote")
}
// WithMetricsAddress defines at which location the debug GRPC connection
// should be made
func WithMetricsAddress(addr string) RemoteOption {
return metricsAddress(addr)
}
type metricsAddress string
func (m metricsAddress) Apply(r Remote) error {
if remote, ok := r.(*remote); ok {
remote.Metrics.Address = string(m)
return nil
}
return fmt.Errorf("WithMetricsAddress option not supported for this remote")
}
// WithSnapshotter defines snapshotter driver should be used
func WithSnapshotter(name string) RemoteOption {
return snapshotter(name)
}
type snapshotter string
func (s snapshotter) Apply(r Remote) error {
if remote, ok := r.(*remote); ok {
remote.snapshotter = string(s)
return nil
}
return fmt.Errorf("WithSnapshotter option not supported for this remote")
}
// WithPlugin allow configuring a containerd plugin
// configuration values passed needs to be quoted if quotes are needed in
// the toml format.
func WithPlugin(name string, conf interface{}) RemoteOption {
return pluginConf{
name: name,
conf: conf,
}
}
type pluginConf struct {
// Name is the name of the plugin
name string
conf interface{}
}
func (p pluginConf) Apply(r Remote) error {
if remote, ok := r.(*remote); ok {
remote.pluginConfs.Plugins[p.name] = p.conf
return nil
}
return fmt.Errorf("WithPlugin option not supported for this remote")
}

View file

@ -0,0 +1,34 @@
package libcontainerd // import "github.com/docker/docker/libcontainerd"
import "fmt"
// WithOOMScore defines the oom_score_adj to set for the containerd process.
func WithOOMScore(score int) RemoteOption {
return oomScore(score)
}
type oomScore int
func (o oomScore) Apply(r Remote) error {
if remote, ok := r.(*remote); ok {
remote.OOMScore = int(o)
return nil
}
return fmt.Errorf("WithOOMScore option not supported for this remote")
}
// WithSubreaper sets whether containerd should register itself as a
// subreaper
func WithSubreaper(reap bool) RemoteOption {
return subreaper(reap)
}
type subreaper bool
func (s subreaper) Apply(r Remote) error {
if remote, ok := r.(*remote); ok {
remote.NoSubreaper = !bool(s)
return nil
}
return fmt.Errorf("WithSubreaper option not supported for this remote")
}

View file

@ -0,0 +1,50 @@
// +build remote_daemon
package libcontainerd // import "github.com/docker/docker/libcontainerd"
import (
"os"
)
const (
grpcPipeName = `\\.\pipe\docker-containerd-containerd`
debugPipeName = `\\.\pipe\docker-containerd-debug`
)
func (r *remote) setDefaults() {
if r.GRPC.Address == "" {
r.GRPC.Address = grpcPipeName
}
if r.Debug.Address == "" {
r.Debug.Address = debugPipeName
}
if r.Debug.Level == "" {
r.Debug.Level = "info"
}
if r.snapshotter == "" {
r.snapshotter = "naive" // TODO(mlaventure): switch to "windows" once implemented
}
}
func (r *remote) stopDaemon() {
p, err := os.FindProcess(r.daemonPid)
if err != nil {
r.logger.WithField("pid", r.daemonPid).Warn("could not find daemon process")
return
}
if err = p.Kill(); err != nil {
r.logger.WithError(err).WithField("pid", r.daemonPid).Warn("could not kill daemon process")
return
}
_, err = p.Wait()
if err != nil {
r.logger.WithError(err).WithField("pid", r.daemonPid).Warn("wait for daemon process")
return
}
}
func (r *remote) platformCleanup() {
// Nothing to do
}

View file

@ -0,0 +1,59 @@
// +build windows
package libcontainerd // import "github.com/docker/docker/libcontainerd"
import (
"sync"
"github.com/sirupsen/logrus"
)
type remote struct {
sync.RWMutex
logger *logrus.Entry
clients []*client
// Options
rootDir string
stateDir string
}
// New creates a fresh instance of libcontainerd remote.
func New(rootDir, stateDir string, options ...RemoteOption) (Remote, error) {
return &remote{
logger: logrus.WithField("module", "libcontainerd"),
rootDir: rootDir,
stateDir: stateDir,
}, nil
}
type client struct {
sync.Mutex
rootDir string
stateDir string
backend Backend
logger *logrus.Entry
eventQ queue
containers map[string]*container
}
func (r *remote) NewClient(ns string, b Backend) (Client, error) {
c := &client{
rootDir: r.rootDir,
stateDir: r.stateDir,
backend: b,
logger: r.logger.WithField("namespace", ns),
containers: make(map[string]*container),
}
r.Lock()
r.clients = append(r.clients, c)
r.Unlock()
return c, nil
}
func (r *remote) Cleanup() {
// Nothing to do
}

View file

@ -0,0 +1,108 @@
package libcontainerd // import "github.com/docker/docker/libcontainerd"
import (
"context"
"time"
"github.com/containerd/containerd"
"github.com/containerd/containerd/cio"
"github.com/opencontainers/runtime-spec/specs-go"
)
// EventType represents a possible event from libcontainerd
type EventType string
// Event constants used when reporting events
const (
EventUnknown EventType = "unknown"
EventExit EventType = "exit"
EventOOM EventType = "oom"
EventCreate EventType = "create"
EventStart EventType = "start"
EventExecAdded EventType = "exec-added"
EventExecStarted EventType = "exec-started"
EventPaused EventType = "paused"
EventResumed EventType = "resumed"
)
// Status represents the current status of a container
type Status string
// Possible container statuses
const (
// Running indicates the process is currently executing
StatusRunning Status = "running"
// Created indicates the process has been created within containerd but the
// user's defined process has not started
StatusCreated Status = "created"
// Stopped indicates that the process has ran and exited
StatusStopped Status = "stopped"
// Paused indicates that the process is currently paused
StatusPaused Status = "paused"
// Pausing indicates that the process is currently switching from a
// running state into a paused state
StatusPausing Status = "pausing"
// Unknown indicates that we could not determine the status from the runtime
StatusUnknown Status = "unknown"
)
// Remote on Linux defines the accesspoint to the containerd grpc API.
// Remote on Windows is largely an unimplemented interface as there is
// no remote containerd.
type Remote interface {
// Client returns a new Client instance connected with given Backend.
NewClient(namespace string, backend Backend) (Client, error)
// Cleanup stops containerd if it was started by libcontainerd.
// Note this is not used on Windows as there is no remote containerd.
Cleanup()
}
// RemoteOption allows to configure parameters of remotes.
// This is unused on Windows.
type RemoteOption interface {
Apply(Remote) error
}
// EventInfo contains the event info
type EventInfo struct {
ContainerID string
ProcessID string
Pid uint32
ExitCode uint32
ExitedAt time.Time
OOMKilled bool
Error error
}
// Backend defines callbacks that the client of the library needs to implement.
type Backend interface {
ProcessEvent(containerID string, event EventType, ei EventInfo) error
}
// Client provides access to containerd features.
type Client interface {
Version(ctx context.Context) (containerd.Version, error)
Restore(ctx context.Context, containerID string, attachStdio StdioCallback) (alive bool, pid int, err error)
Create(ctx context.Context, containerID string, spec *specs.Spec, runtimeOptions interface{}) error
Start(ctx context.Context, containerID, checkpointDir string, withStdin bool, attachStdio StdioCallback) (pid int, err error)
SignalProcess(ctx context.Context, containerID, processID string, signal int) error
Exec(ctx context.Context, containerID, processID string, spec *specs.Process, withStdin bool, attachStdio StdioCallback) (int, error)
ResizeTerminal(ctx context.Context, containerID, processID string, width, height int) error
CloseStdin(ctx context.Context, containerID, processID string) error
Pause(ctx context.Context, containerID string) error
Resume(ctx context.Context, containerID string) error
Stats(ctx context.Context, containerID string) (*Stats, error)
ListPids(ctx context.Context, containerID string) ([]uint32, error)
Summary(ctx context.Context, containerID string) ([]Summary, error)
DeleteTask(ctx context.Context, containerID string) (uint32, time.Time, error)
Delete(ctx context.Context, containerID string) error
Status(ctx context.Context, containerID string) (Status, error)
UpdateResources(ctx context.Context, containerID string, resources *Resources) error
CreateCheckpoint(ctx context.Context, containerID, checkpointDir string, exit bool) error
}
// StdioCallback is called to connect a container or process stdio.
type StdioCallback func(io *cio.DirectIO) (cio.IO, error)

View file

@ -0,0 +1,30 @@
package libcontainerd // import "github.com/docker/docker/libcontainerd"
import (
"time"
"github.com/containerd/cgroups"
specs "github.com/opencontainers/runtime-spec/specs-go"
)
// Summary is not used on linux
type Summary struct{}
// Stats holds metrics properties as returned by containerd
type Stats struct {
Read time.Time
Metrics *cgroups.Metrics
}
func interfaceToStats(read time.Time, v interface{}) *Stats {
return &Stats{
Metrics: v.(*cgroups.Metrics),
Read: read,
}
}
// Resources defines updatable container resource values. TODO: it must match containerd upcoming API
type Resources specs.LinuxResources
// Checkpoints contains the details of a checkpoint
type Checkpoints struct{}

View file

@ -0,0 +1,42 @@
package libcontainerd // import "github.com/docker/docker/libcontainerd"
import (
"time"
"github.com/Microsoft/hcsshim"
opengcs "github.com/Microsoft/opengcs/client"
)
// Summary contains a ProcessList item from HCS to support `top`
type Summary hcsshim.ProcessListItem
// Stats contains statistics from HCS
type Stats struct {
Read time.Time
HCSStats *hcsshim.Statistics
}
func interfaceToStats(read time.Time, v interface{}) *Stats {
return &Stats{
HCSStats: v.(*hcsshim.Statistics),
Read: read,
}
}
// Resources defines updatable container resource values.
type Resources struct{}
// LCOWOption is a CreateOption required for LCOW configuration
type LCOWOption struct {
Config *opengcs.Config
}
// Checkpoint holds the details of a checkpoint (not supported in windows)
type Checkpoint struct {
Name string
}
// Checkpoints contains the details of a checkpoint
type Checkpoints struct {
Checkpoints []*Checkpoint
}

View file

@ -0,0 +1,12 @@
package libcontainerd // import "github.com/docker/docker/libcontainerd"
import "syscall"
// containerdSysProcAttr returns the SysProcAttr to use when exec'ing
// containerd
func containerdSysProcAttr() *syscall.SysProcAttr {
return &syscall.SysProcAttr{
Setsid: true,
Pdeathsig: syscall.SIGKILL,
}
}

View file

@ -0,0 +1,46 @@
package libcontainerd // import "github.com/docker/docker/libcontainerd"
import (
"strings"
"syscall"
opengcs "github.com/Microsoft/opengcs/client"
)
// setupEnvironmentVariables converts a string array of environment variables
// into a map as required by the HCS. Source array is in format [v1=k1] [v2=k2] etc.
func setupEnvironmentVariables(a []string) map[string]string {
r := make(map[string]string)
for _, s := range a {
arr := strings.SplitN(s, "=", 2)
if len(arr) == 2 {
r[arr[0]] = arr[1]
}
}
return r
}
// Apply for the LCOW option is a no-op.
func (s *LCOWOption) Apply(interface{}) error {
return nil
}
// debugGCS is a dirty hack for debugging for Linux Utility VMs. It simply
// runs a bunch of commands inside the UVM, but seriously aides in advanced debugging.
func (c *container) debugGCS() {
if c == nil || c.isWindows || c.hcsContainer == nil {
return
}
cfg := opengcs.Config{
Uvm: c.hcsContainer,
UvmTimeoutSeconds: 600,
}
cfg.DebugGCS()
}
// containerdSysProcAttr returns the SysProcAttr to use when exec'ing
// containerd
func containerdSysProcAttr() *syscall.SysProcAttr {
return nil
}

View file

@ -0,0 +1,13 @@
package libcontainerd // import "github.com/docker/docker/libcontainerd"
import (
"testing"
)
func TestEnvironmentParsing(t *testing.T) {
env := []string{"foo=bar", "car=hat", "a=b=c"}
result := setupEnvironmentVariables(env)
if len(result) != 3 || result["foo"] != "bar" || result["car"] != "hat" || result["a"] != "b=c" {
t.Fatalf("Expected map[foo:bar car:hat a:b=c], got %v", result)
}
}