Add state rpc to shim

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
Michael Crosby 2017-01-26 15:09:59 -08:00
parent ead53658cc
commit f431bf4ad4
7 changed files with 856 additions and 189 deletions

File diff suppressed because it is too large Load Diff

View File

@ -12,6 +12,7 @@ service Shim {
rpc Exec(ExecRequest) returns (ExecResponse); rpc Exec(ExecRequest) returns (ExecResponse);
rpc Pty(PtyRequest) returns (google.protobuf.Empty); rpc Pty(PtyRequest) returns (google.protobuf.Empty);
rpc Events(EventsRequest) returns (stream Event); rpc Events(EventsRequest) returns (stream Event);
rpc State(StateRequest) returns (StateResponse);
} }
message CreateRequest { message CreateRequest {
@ -33,6 +34,7 @@ message StartRequest {
} }
message DeleteRequest { message DeleteRequest {
uint32 pid = 1;
} }
message DeleteResponse { message DeleteResponse {
@ -40,11 +42,11 @@ message DeleteResponse {
} }
message ExecRequest { message ExecRequest {
string id = 1 [(gogoproto.customname) = "ID"]; bool terminal = 1;
bool terminal = 2; string stdin = 2;
string stdin = 3; string stdout = 3;
string stdout = 4; string stderr = 4;
string stderr = 5; string selinux_label = 5;
User user = 6; User user = 6;
repeated string args = 7; repeated string args = 7;
repeated string env = 8; repeated string env = 8;
@ -53,7 +55,6 @@ message ExecRequest {
repeated Rlimit rlimits = 11; repeated Rlimit rlimits = 11;
bool no_new_privileges = 12; bool no_new_privileges = 12;
string apparmor_profile = 13; string apparmor_profile = 13;
string selinux_label = 14;
} }
message User { message User {
@ -95,3 +96,21 @@ message Event {
uint32 pid = 3; uint32 pid = 3;
uint32 exit_status = 4; uint32 exit_status = 4;
} }
message StateRequest {
}
message StateResponse {
string id = 1 [(gogoproto.customname) = "ID"];
repeated Process processes = 2;
}
enum State {
STOPPED = 0;
RUNNING = 1;
}
message Process {
uint32 pid = 1;
State state = 2;
}

View File

@ -1,12 +1,14 @@
package main package main
import ( import (
"bytes"
"encoding/json"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"log" "log"
"net" "net"
"os/exec" "os"
"syscall" "strconv"
"time" "time"
gocontext "context" gocontext "context"
@ -14,6 +16,7 @@ import (
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/grpclog" "google.golang.org/grpc/grpclog"
"github.com/Sirupsen/logrus"
"github.com/crosbymichael/console" "github.com/crosbymichael/console"
"github.com/docker/containerd/api/shim" "github.com/docker/containerd/api/shim"
"github.com/urfave/cli" "github.com/urfave/cli"
@ -46,16 +49,8 @@ var shimCommand = cli.Command{
shimStartCommand, shimStartCommand,
shimDeleteCommand, shimDeleteCommand,
shimEventsCommand, shimEventsCommand,
}, shimStateCommand,
Action: func(context *cli.Context) error { shimExecCommand,
cmd := exec.Command("containerd-shim", "--debug")
cmd.SysProcAttr = &syscall.SysProcAttr{}
cmd.SysProcAttr.Setpgid = true
if err := cmd.Start(); err != nil {
return err
}
fmt.Println("new shim started @ ./shim.sock")
return nil
}, },
} }
@ -150,7 +145,13 @@ var shimDeleteCommand = cli.Command{
if err != nil { if err != nil {
return err return err
} }
r, err := service.Delete(gocontext.Background(), &shim.DeleteRequest{}) pid, err := strconv.Atoi(context.Args().First())
if err != nil {
return err
}
r, err := service.Delete(gocontext.Background(), &shim.DeleteRequest{
Pid: uint32(pid),
})
if err != nil { if err != nil {
return err return err
} }
@ -159,6 +160,99 @@ var shimDeleteCommand = cli.Command{
}, },
} }
var shimStateCommand = cli.Command{
Name: "state",
Usage: "get the state of all the processes of the shim",
Action: func(context *cli.Context) error {
service, err := getShimService()
if err != nil {
return err
}
r, err := service.State(gocontext.Background(), &shim.StateRequest{})
if err != nil {
return err
}
data, err := json.Marshal(r)
if err != nil {
return err
}
buf := bytes.NewBuffer(nil)
if err := json.Indent(buf, data, " ", " "); err != nil {
return err
}
buf.WriteTo(os.Stdout)
return nil
},
}
var shimExecCommand = cli.Command{
Name: "exec",
Usage: "exec a new process in the shim's container",
Flags: append(fifoFlags,
cli.BoolFlag{
Name: "attach,a",
Usage: "stay attached to the container and open the fifos",
},
cli.StringSliceFlag{
Name: "env,e",
Usage: "add environment vars",
Value: &cli.StringSlice{},
},
cli.StringFlag{
Name: "cwd",
Usage: "current working directory",
},
),
Action: func(context *cli.Context) error {
service, err := getShimService()
if err != nil {
return err
}
tty := context.Bool("tty")
wg, err := prepareStdio(context.String("stdin"), context.String("stdout"), context.String("stderr"), tty)
if err != nil {
return err
}
rq := &shim.ExecRequest{
Args: []string(context.Args()),
Env: context.StringSlice("env"),
Cwd: context.String("cwd"),
Stdin: context.String("stdin"),
Stdout: context.String("stdout"),
Stderr: context.String("stderr"),
Terminal: tty,
}
r, err := service.Exec(gocontext.Background(), rq)
if err != nil {
return err
}
fmt.Printf("exec running with pid %d\n", r.Pid)
if context.Bool("attach") {
logrus.Info("attaching")
if tty {
current := console.Current()
defer current.Reset()
if err := current.SetRaw(); err != nil {
return err
}
size, err := current.Size()
if err != nil {
return err
}
if _, err := service.Pty(gocontext.Background(), &shim.PtyRequest{
Pid: r.Pid,
Width: uint32(size.Width),
Height: uint32(size.Height),
}); err != nil {
return err
}
}
wg.Wait()
}
return nil
},
}
var shimEventsCommand = cli.Command{ var shimEventsCommand = cli.Command{
Name: "events", Name: "events",
Usage: "get events for a shim", Usage: "get events for a shim",

View File

@ -14,6 +14,7 @@ import (
gocontext "context" gocontext "context"
"github.com/Sirupsen/logrus"
"github.com/docker/containerd/api/execution" "github.com/docker/containerd/api/execution"
"github.com/tonistiigi/fifo" "github.com/tonistiigi/fifo"
"github.com/urfave/cli" "github.com/urfave/cli"
@ -38,6 +39,7 @@ func prepareStdio(stdin, stdout, stderr string, console bool) (*sync.WaitGroup,
}(f) }(f)
go func(w io.WriteCloser) { go func(w io.WriteCloser) {
io.Copy(w, os.Stdin) io.Copy(w, os.Stdin)
logrus.Info("stdin copy finished")
w.Close() w.Close()
}(f) }(f)
@ -54,6 +56,7 @@ func prepareStdio(stdin, stdout, stderr string, console bool) (*sync.WaitGroup,
go func(r io.ReadCloser) { go func(r io.ReadCloser) {
io.Copy(os.Stdout, r) io.Copy(os.Stdout, r)
r.Close() r.Close()
logrus.Info("stdout copy finished")
wg.Done() wg.Done()
}(f) }(f)
@ -71,6 +74,7 @@ func prepareStdio(stdin, stdout, stderr string, console bool) (*sync.WaitGroup,
go func(r io.ReadCloser) { go func(r io.ReadCloser) {
io.Copy(os.Stderr, r) io.Copy(os.Stderr, r)
r.Close() r.Close()
logrus.Info("stderr copy finished")
wg.Done() wg.Done()
}(f) }(f)
} }

View File

@ -16,7 +16,7 @@ import (
type execProcess struct { type execProcess struct {
sync.WaitGroup sync.WaitGroup
id string id int
console console.Console console console.Console
io runc.IO io runc.IO
status int status int
@ -25,19 +25,19 @@ type execProcess struct {
parent *initProcess parent *initProcess
} }
func newExecProcess(context context.Context, r *apishim.ExecRequest, parent *initProcess) (process, error) { func newExecProcess(context context.Context, r *apishim.ExecRequest, parent *initProcess, id int) (process, error) {
cwd, err := os.Getwd() cwd, err := os.Getwd()
if err != nil { if err != nil {
return nil, err return nil, err
} }
e := &execProcess{ e := &execProcess{
id: r.ID, id: id,
parent: parent, parent: parent,
} }
var ( var (
socket *runc.ConsoleSocket socket *runc.ConsoleSocket
io runc.IO io runc.IO
pidfile = filepath.Join(cwd, fmt.Sprintf("%s.pid", r.ID)) pidfile = filepath.Join(cwd, fmt.Sprintf("%d.pid", id))
) )
if r.Terminal { if r.Terminal {
if socket, err = runc.NewConsoleSocket(filepath.Join(cwd, "pty.sock")); err != nil { if socket, err = runc.NewConsoleSocket(filepath.Join(cwd, "pty.sock")); err != nil {
@ -56,9 +56,8 @@ func newExecProcess(context context.Context, r *apishim.ExecRequest, parent *ini
ConsoleSocket: socket, ConsoleSocket: socket,
IO: io, IO: io,
Detach: true, Detach: true,
Tty: socket != nil,
} }
if err := parent.runc.Exec(context, r.ID, processFromRequest(r), opts); err != nil { if err := parent.runc.Exec(context, parent.id, processFromRequest(r), opts); err != nil {
return nil, err return nil, err
} }
pid, err := runc.ReadPidFile(opts.PidFile) pid, err := runc.ReadPidFile(opts.PidFile)
@ -70,13 +69,15 @@ func newExecProcess(context context.Context, r *apishim.ExecRequest, parent *ini
} }
func processFromRequest(r *apishim.ExecRequest) specs.Process { func processFromRequest(r *apishim.ExecRequest) specs.Process {
var user specs.User
if r.User != nil {
user.UID = r.User.Uid
user.GID = r.User.Gid
user.AdditionalGids = r.User.AdditionalGids
}
return specs.Process{ return specs.Process{
Terminal: r.Terminal, Terminal: r.Terminal,
User: specs.User{ User: user,
UID: r.User.Uid,
GID: r.User.Gid,
AdditionalGids: r.User.AdditionalGids,
},
Rlimits: rlimits(r.Rlimits), Rlimits: rlimits(r.Rlimits),
Args: r.Args, Args: r.Args,
Env: r.Env, Env: r.Env,
@ -115,6 +116,10 @@ func (e *execProcess) Exited(status int) {
} }
} }
func (e *execProcess) Delete(ctx context.Context) error {
return nil
}
func (e *execProcess) Resize(ws console.WinSize) error { func (e *execProcess) Resize(ws console.WinSize) error {
if e.console == nil { if e.console == nil {
return nil return nil

View File

@ -1,6 +1,10 @@
package shim package shim
import "github.com/crosbymichael/console" import (
"context"
"github.com/crosbymichael/console"
)
type process interface { type process interface {
// Pid returns the pid for the process // Pid returns the pid for the process
@ -11,4 +15,5 @@ type process interface {
Exited(status int) Exited(status int)
// Status returns the exit status // Status returns the exit status
Status() int Status() int
Delete(context.Context) error
} }

View File

@ -3,6 +3,7 @@ package shim
import ( import (
"fmt" "fmt"
"sync" "sync"
"syscall"
"github.com/crosbymichael/console" "github.com/crosbymichael/console"
apishim "github.com/docker/containerd/api/shim" apishim "github.com/docker/containerd/api/shim"
@ -27,6 +28,7 @@ type Service struct {
mu sync.Mutex mu sync.Mutex
processes map[int]process processes map[int]process
events chan *apishim.Event events chan *apishim.Event
execID int
} }
func (s *Service) Create(ctx context.Context, r *apishim.CreateRequest) (*apishim.CreateResponse, error) { func (s *Service) Create(ctx context.Context, r *apishim.CreateRequest) (*apishim.CreateResponse, error) {
@ -63,21 +65,28 @@ func (s *Service) Start(ctx context.Context, r *apishim.StartRequest) (*google_p
} }
func (s *Service) Delete(ctx context.Context, r *apishim.DeleteRequest) (*apishim.DeleteResponse, error) { func (s *Service) Delete(ctx context.Context, r *apishim.DeleteRequest) (*apishim.DeleteResponse, error) {
if err := s.initProcess.Delete(ctx); err != nil { s.mu.Lock()
p, ok := s.processes[int(r.Pid)]
s.mu.Unlock()
if !ok {
return nil, fmt.Errorf("process does not exist %d", r.Pid)
}
if err := p.Delete(ctx); err != nil {
return nil, err return nil, err
} }
s.mu.Lock() s.mu.Lock()
delete(s.processes, s.initProcess.pid) delete(s.processes, p.Pid())
s.mu.Unlock() s.mu.Unlock()
return &apishim.DeleteResponse{ return &apishim.DeleteResponse{
ExitStatus: uint32(s.initProcess.Status()), ExitStatus: uint32(p.Status()),
}, nil }, nil
} }
func (s *Service) Exec(ctx context.Context, r *apishim.ExecRequest) (*apishim.ExecResponse, error) { func (s *Service) Exec(ctx context.Context, r *apishim.ExecRequest) (*apishim.ExecResponse, error) {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
process, err := newExecProcess(ctx, r, s.initProcess) s.execID++
process, err := newExecProcess(ctx, r, s.initProcess, s.execID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -122,6 +131,29 @@ func (s *Service) Events(r *apishim.EventsRequest, stream apishim.Shim_EventsSer
return nil return nil
} }
func (s *Service) State(ctx context.Context, r *apishim.StateRequest) (*apishim.StateResponse, error) {
o := &apishim.StateResponse{
ID: s.id,
Processes: []*apishim.Process{},
}
s.mu.Lock()
defer s.mu.Unlock()
for _, p := range s.processes {
state := apishim.State_RUNNING
if err := syscall.Kill(p.Pid(), 0); err != nil {
if err != syscall.ESRCH {
return nil, err
}
state = apishim.State_STOPPED
}
o.Processes = append(o.Processes, &apishim.Process{
Pid: uint32(p.Pid()),
State: state,
})
}
return o, nil
}
func (s *Service) ProcessExit(e utils.Exit) error { func (s *Service) ProcessExit(e utils.Exit) error {
s.mu.Lock() s.mu.Lock()
if p, ok := s.processes[e.Pid]; ok { if p, ok := s.processes[e.Pid]; ok {