From ac3cc32dbc37d3b78e6c1f8317a9f952e506042f Mon Sep 17 00:00:00 2001 From: Kenfe-Mickael Laventure Date: Wed, 7 Dec 2016 10:44:05 -0800 Subject: [PATCH] Complete basic support for ctr run Signed-off-by: Kenfe-Mickael Laventure --- cmd/ctr/run.go | 172 ++++++++++++++++++++++++++++++--- execution/container.go | 11 ++- execution/executor.go | 13 ++- execution/executors/oci/oci.go | 92 +++++++++++++++--- execution/service.go | 59 ++++++----- 5 files changed, 290 insertions(+), 57 deletions(-) diff --git a/cmd/ctr/run.go b/cmd/ctr/run.go index 63bd9ef..278ee90 100644 --- a/cmd/ctr/run.go +++ b/cmd/ctr/run.go @@ -2,11 +2,23 @@ package main import ( "fmt" + "io" + "io/ioutil" + "log" + "net" + "os" + "path/filepath" + "sync" + "syscall" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/grpclog" gocontext "context" - "github.com/BurntSushi/toml" "github.com/docker/containerd/api/execution" + "github.com/tonistiigi/fifo" "github.com/urfave/cli" ) @@ -37,11 +49,11 @@ var runCommand = cli.Command{ }, }, Action: func(context *cli.Context) error { - var config runConfig - if _, err := toml.DecodeFile(context.Args().First(), &config); err != nil { - return err - } - id := context.Args().Get(1) + // var config runConfig + // if _, err := toml.DecodeFile(context.Args().First(), &config); err != nil { + // return err + // } + id := context.Args().First() if id == "" { return fmt.Errorf("container id must be provided") } @@ -53,32 +65,168 @@ var runCommand = cli.Command{ if err != nil { return err } - cr, err := executionService.Create(gocontext.Background(), &execution.CreateContainerRequest{ - ID: id, - BundlePath: context.String("bundle"), - }) + + tmpDir, err := getTempDir(id) if err != nil { return err } + defer os.RemoveAll(tmpDir) + + crOpts := &execution.CreateContainerRequest{ + ID: id, + BundlePath: context.String("bundle"), + Stdin: filepath.Join(tmpDir, "stdin"), + Stdout: filepath.Join(tmpDir, "stdout"), + Stderr: filepath.Join(tmpDir, "stderr"), + } + + fwg, err := prepareStdio(crOpts.Stdin, crOpts.Stdout, crOpts.Stderr) + if err != nil { + return err + } + + cr, err := executionService.Create(gocontext.Background(), crOpts) + if err != nil { + return err + } + if _, err := containerService.Start(gocontext.Background(), &execution.StartContainerRequest{ ID: cr.Container.ID, }); err != nil { return err } + // wait for it to die + for { + gcr, err := containerService.Get(gocontext.Background(), &execution.GetContainerRequest{ + ID: cr.Container.ID, + }) + if err != nil { + return err + } + if gcr.Container.Status != execution.Status_RUNNING { + break + } + time.Sleep(100 * time.Millisecond) + } + if _, err := executionService.Delete(gocontext.Background(), &execution.DeleteContainerRequest{ ID: cr.Container.ID, }); err != nil { return err } + + // Ensure we read all io + fwg.Wait() + return nil }, } +var grpcConn *grpc.ClientConn + +func prepareStdio(in, out, err string) (*sync.WaitGroup, error) { + var ( + wg sync.WaitGroup + + dst io.Writer + src io.Reader + close func() + ) + + for _, f := range []struct { + name string + flags int + src bool + reader io.Reader + writer io.Writer + }{ + {in, syscall.O_WRONLY | syscall.O_CREAT | syscall.O_NONBLOCK, false, os.Stdin, nil}, + {out, syscall.O_RDONLY | syscall.O_CREAT | syscall.O_NONBLOCK, true, nil, os.Stdout}, + {err, syscall.O_RDONLY | syscall.O_CREAT | syscall.O_NONBLOCK, true, nil, os.Stderr}, + } { + ff, err := fifo.OpenFifo(gocontext.Background(), f.name, f.flags, 0700) + if err != nil { + return nil, err + } + defer func(c io.Closer) { + if err != nil { + c.Close() + } + }(ff) + + if f.src { + src = ff + dst = f.writer + close = func() { + ff.Close() + wg.Done() + } + wg.Add(1) + } else { + src = f.reader + dst = ff + close = func() { ff.Close() } + } + + go func(dst io.Writer, src io.Reader, close func()) { + io.Copy(dst, src) + close() + }(dst, src, close) + } + + return &wg, nil +} + +func getGRPCConnection(context *cli.Context) (*grpc.ClientConn, error) { + if grpcConn != nil { + return grpcConn, nil + } + + bindSocket := context.GlobalString("socket") + // reset the logger for grpc to log to dev/null so that it does not mess with our stdio + grpclog.SetLogger(log.New(ioutil.Discard, "", log.LstdFlags)) + dialOpts := []grpc.DialOption{grpc.WithInsecure(), grpc.WithTimeout(100 * time.Second)} + dialOpts = append(dialOpts, + grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) { + return net.DialTimeout("unix", bindSocket, timeout) + }, + )) + + conn, err := grpc.Dial(fmt.Sprintf("unix://%s", bindSocket), dialOpts...) + if err != nil { + return nil, err + } + + grpcConn = conn + return grpcConn, nil +} + func getExecutionService(context *cli.Context) (execution.ExecutionServiceClient, error) { - return nil, nil + conn, err := getGRPCConnection(context) + if err != nil { + return nil, err + } + return execution.NewExecutionServiceClient(conn), nil } func getContainerService(context *cli.Context) (execution.ContainerServiceClient, error) { - return nil, nil + conn, err := getGRPCConnection(context) + if err != nil { + return nil, err + } + return execution.NewContainerServiceClient(conn), nil +} + +func getTempDir(id string) (string, error) { + err := os.MkdirAll(filepath.Join(os.TempDir(), "ctr"), 0700) + if err != nil { + return "", err + } + + tmpDir, err := ioutil.TempDir(filepath.Join(os.TempDir(), "ctr"), fmt.Sprintf("%s-", id)) + if err != nil { + return "", err + } + return tmpDir, nil } diff --git a/execution/container.go b/execution/container.go index 9ff5753..8c04149 100644 --- a/execution/container.go +++ b/execution/container.go @@ -2,7 +2,7 @@ package execution import "fmt" -func NewContainer(stateRoot, id, bundle string) (*Container, error) { +func NewContainer(stateRoot, id, bundle, status string) (*Container, error) { stateDir, err := NewStateDir(stateRoot, id) if err != nil { return nil, err @@ -11,16 +11,18 @@ func NewContainer(stateRoot, id, bundle string) (*Container, error) { id: id, bundle: bundle, stateDir: stateDir, + status: status, processes: make(map[string]Process), }, nil } -func LoadContainer(dir StateDir, id, bundle string, initPid int64) *Container { +func LoadContainer(dir StateDir, id, bundle, status string, initPid int64) *Container { return &Container{ id: id, stateDir: dir, bundle: bundle, initPid: initPid, + status: status, processes: make(map[string]Process), } } @@ -30,6 +32,7 @@ type Container struct { bundle string stateDir StateDir initPid int64 + status string processes map[string]Process } @@ -38,6 +41,10 @@ func (c *Container) ID() string { return c.id } +func (c *Container) Status() string { + return c.status +} + func (c *Container) Bundle() string { return c.bundle } diff --git a/execution/executor.go b/execution/executor.go index 65b85a6..963d5db 100644 --- a/execution/executor.go +++ b/execution/executor.go @@ -1,7 +1,6 @@ package execution import ( - "io" "os" "github.com/opencontainers/runtime-spec/specs-go" @@ -9,16 +8,16 @@ import ( type CreateOpts struct { Bundle string - Stdin io.Reader - Stdout io.Writer - Stderr io.Writer + Stdin string + Stdout string + Stderr string } type CreateProcessOpts struct { Spec specs.Process - Stdin io.Reader - Stdout io.Writer - Stderr io.Writer + Stdin string + Stdout string + Stderr string } type Executor interface { diff --git a/execution/executors/oci/oci.go b/execution/executors/oci/oci.go index 942345c..f11f27b 100644 --- a/execution/executors/oci/oci.go +++ b/execution/executors/oci/oci.go @@ -19,6 +19,7 @@ func New(root string) *OCIRuntime { runc: &runc.Runc{ Root: filepath.Join(root, "runc"), }, + ios: make(map[string]runc.IO), } } @@ -26,17 +27,61 @@ type OCIRuntime struct { // root holds runtime state information for the containers root string runc *runc.Runc + + // We need to keep track of the created IO for + ios map[string]runc.IO +} + +func closeRuncIO(io runc.IO) { + if io.Stdin != nil { + io.Stdin.(*os.File).Close() + } + if io.Stdout != nil { + io.Stdout.(*os.File).Close() + } + if io.Stderr != nil { + io.Stderr.(*os.File).Close() + } +} + +func getRuncIO(stdin, stdout, stderr string) (io runc.IO, err error) { + defer func() { + if err != nil { + closeRuncIO(io) + } + }() + if io.Stdin, err = os.OpenFile(stdin, os.O_RDONLY, 0); err != nil { + return + } + if io.Stdout, err = os.OpenFile(stdout, os.O_WRONLY, 0); err != nil { + return + } + if io.Stderr, err = os.OpenFile(stderr, os.O_WRONLY, 0); err != nil { + return + } + return } func (r *OCIRuntime) Create(id string, o execution.CreateOpts) (container *execution.Container, err error) { - if container, err = execution.NewContainer(r.root, id, o.Bundle); err != nil { + io, err := getRuncIO(o.Stdin, o.Stdout, o.Stderr) + if err != nil { return nil, err } defer func() { if err != nil { - container.StateDir().Delete() + closeRuncIO(io) } }() + + if container, err = execution.NewContainer(r.root, id, o.Bundle, "created"); err != nil { + return nil, err + } + defer func(c *execution.Container) { + if err != nil { + c.StateDir().Delete() + } + }(container) + initDir, err := container.StateDir().NewProcess() if err != nil { return nil, err @@ -44,18 +89,20 @@ func (r *OCIRuntime) Create(id string, o execution.CreateOpts) (container *execu pidFile := filepath.Join(initDir, "pid") err = r.runc.Create(id, o.Bundle, &runc.CreateOpts{ PidFile: pidFile, - IO: runc.IO{ - Stdin: o.Stdin, - Stdout: o.Stdout, - Stderr: o.Stderr, - }, + IO: io, }) if err != nil { return nil, err } + defer func() { + if err != nil { + r.runc.Kill(id, int(syscall.SIGKILL)) + r.runc.Delete(id) + } + }() + pid, err := runc.ReadPidFile(pidFile) if err != nil { - // TODO: kill the container if we are going to return return nil, err } process, err := newProcess(filepath.Base(initDir), pid) @@ -65,6 +112,8 @@ func (r *OCIRuntime) Create(id string, o execution.CreateOpts) (container *execu container.AddProcess(process, true) + r.ios[id] = io + return container, nil } @@ -85,6 +134,7 @@ func (r *OCIRuntime) load(runcC *runc.Container) (*execution.Container, error) { execution.StateDir(filepath.Join(r.root, runcC.ID)), runcC.ID, runcC.Bundle, + runcC.Status, int64(runcC.Pid), ) @@ -135,10 +185,13 @@ func (r *OCIRuntime) Load(id string) (*execution.Container, error) { } func (r *OCIRuntime) Delete(c *execution.Container) error { - if err := r.runc.Delete(c.ID()); err != nil { + id := c.ID() + if err := r.runc.Delete(id); err != nil { return err } c.StateDir().Delete() + closeRuncIO(r.ios[id]) + delete(r.ios, id) return nil } @@ -151,6 +204,16 @@ func (r *OCIRuntime) Resume(c *execution.Container) error { } func (r *OCIRuntime) StartProcess(c *execution.Container, o execution.CreateProcessOpts) (p execution.Process, err error) { + io, err := getRuncIO(o.Stdin, o.Stdout, o.Stderr) + if err != nil { + return nil, err + } + defer func() { + if err != nil { + closeRuncIO(io) + } + }() + processStateDir, err := c.StateDir().NewProcess() if err != nil { return nil, err @@ -165,11 +228,7 @@ func (r *OCIRuntime) StartProcess(c *execution.Container, o execution.CreateProc if err := r.runc.ExecProcess(c.ID(), o.Spec, &runc.ExecOpts{ PidFile: pidFile, Detach: true, - IO: runc.IO{ - Stdin: o.Stdin, - Stdout: o.Stdout, - Stderr: o.Stderr, - }, + IO: io, }); err != nil { return nil, err } @@ -185,6 +244,8 @@ func (r *OCIRuntime) StartProcess(c *execution.Container, o execution.CreateProc c.AddProcess(process, false) + r.ios[fmt.Sprintf("%s-%s", c.ID(), process.ID())] = io + return process, nil } @@ -197,5 +258,8 @@ func (r *OCIRuntime) SignalProcess(c *execution.Container, id string, sig os.Sig } func (r *OCIRuntime) DeleteProcess(c *execution.Container, id string) error { + ioID := fmt.Sprintf("%s-%s", c.ID(), id) + closeRuncIO(r.ios[ioID]) + delete(r.ios, ioID) return c.StateDir().DeleteProcess(id) } diff --git a/execution/service.go b/execution/service.go index 8f00912..1cbddd0 100644 --- a/execution/service.go +++ b/execution/service.go @@ -10,6 +10,8 @@ import ( "golang.org/x/net/context" ) +var emptyResponse = &google_protobuf.Empty{} + func New(executor Executor) (*Service, error) { return &Service{ executor: executor, @@ -23,12 +25,13 @@ type Service struct { func (s *Service) Create(ctx context.Context, r *api.CreateContainerRequest) (*api.CreateContainerResponse, error) { // TODO: write io and bundle path to dir - // TODO: open IOs + var err error + container, err := s.executor.Create(r.ID, CreateOpts{ Bundle: r.BundlePath, - // Stdin: r.Stdin, - // Stdout: r.Stdout, - // Stderr: r.Stderr, + Stdin: r.Stdin, + Stdout: r.Stdout, + Stderr: r.Stderr, }) if err != nil { return nil, err @@ -44,13 +47,13 @@ func (s *Service) Create(ctx context.Context, r *api.CreateContainerRequest) (*a func (s *Service) Delete(ctx context.Context, r *api.DeleteContainerRequest) (*google_protobuf.Empty, error) { container, err := s.executor.Load(r.ID) if err != nil { - return nil, err + return emptyResponse, err } if err = s.executor.Delete(container); err != nil { - return nil, err + return emptyResponse, err } - return nil, nil + return emptyResponse, nil } func (s *Service) List(ctx context.Context, r *api.ListContainersRequest) (*api.ListContainersResponse, error) { @@ -75,7 +78,7 @@ func (s *Service) Get(ctx context.Context, r *api.GetContainerRequest) (*api.Get } func (s *Service) Update(ctx context.Context, r *api.UpdateContainerRequest) (*google_protobuf.Empty, error) { - return nil, nil + return emptyResponse, nil } func (s *Service) Pause(ctx context.Context, r *api.PauseContainerRequest) (*google_protobuf.Empty, error) { @@ -83,7 +86,7 @@ func (s *Service) Pause(ctx context.Context, r *api.PauseContainerRequest) (*goo if err != nil { return nil, err } - return nil, s.executor.Pause(container) + return emptyResponse, s.executor.Pause(container) } func (s *Service) Resume(ctx context.Context, r *api.ResumeContainerRequest) (*google_protobuf.Empty, error) { @@ -91,7 +94,7 @@ func (s *Service) Resume(ctx context.Context, r *api.ResumeContainerRequest) (*g if err != nil { return nil, err } - return nil, s.executor.Resume(container) + return emptyResponse, s.executor.Resume(container) } func (s *Service) Start(ctx context.Context, r *api.StartContainerRequest) (*google_protobuf.Empty, error) { @@ -99,7 +102,7 @@ func (s *Service) Start(ctx context.Context, r *api.StartContainerRequest) (*goo if err != nil { return nil, err } - return nil, s.executor.Start(container) + return emptyResponse, s.executor.Start(container) } func (s *Service) StartProcess(ctx context.Context, r *api.StartProcessRequest) (*api.StartProcessResponse, error) { @@ -110,12 +113,11 @@ func (s *Service) StartProcess(ctx context.Context, r *api.StartProcessRequest) // TODO: generate spec var spec specs.Process - // TODO: open IOs process, err := s.executor.StartProcess(container, CreateProcessOpts{ - Spec: spec, - // Stdin: r.Stdin, - // Stdout: r.Stdout, - // Stderr: r.Stderr, + Spec: spec, + Stdin: r.Stdin, + Stdout: r.Stdout, + Stderr: r.Stderr, }) if err != nil { return nil, err @@ -145,24 +147,24 @@ func (s *Service) GetProcess(ctx context.Context, r *api.GetProcessRequest) (*ap func (s *Service) SignalProcess(ctx context.Context, r *api.SignalProcessRequest) (*google_protobuf.Empty, error) { container, err := s.executor.Load(r.Container.ID) if err != nil { - return nil, err + return emptyResponse, err } process := container.GetProcess(r.Process.ID) if process == nil { return nil, fmt.Errorf("Make me a constant! Process not foumd!") } - return nil, process.Signal(syscall.Signal(r.Signal)) + return emptyResponse, process.Signal(syscall.Signal(r.Signal)) } func (s *Service) DeleteProcess(ctx context.Context, r *api.DeleteProcessRequest) (*google_protobuf.Empty, error) { container, err := s.executor.Load(r.Container.ID) if err != nil { - return nil, err + return emptyResponse, err } if err := s.executor.DeleteProcess(container, r.Process.ID); err != nil { - return nil, err + return emptyResponse, err } - return nil, nil + return emptyResponse, nil } func (s *Service) ListProcesses(ctx context.Context, r *api.ListProcessesRequest) (*api.ListProcessesResponse, error) { @@ -182,10 +184,23 @@ var ( ) func toGRPCContainer(container *Container) *api.Container { - return &api.Container{ + c := &api.Container{ ID: container.ID(), BundlePath: container.Bundle(), } + status := container.Status() + switch status { + case "created": + c.Status = api.Status_CREATED + case "running": + c.Status = api.Status_RUNNING + case "stopped": + c.Status = api.Status_STOPPED + case "paused": + c.Status = api.Status_PAUSED + } + + return c } func toGRPCProcesses(processes []Process) []*api.Process {