Add events api to shim
Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
parent
462bdd7669
commit
07c81ccac4
10 changed files with 663 additions and 211 deletions
|
@ -107,16 +107,8 @@ func (e *execProcess) Status() int {
|
|||
|
||||
func (e *execProcess) Exited(status int) {
|
||||
e.status = status
|
||||
}
|
||||
|
||||
func (e *execProcess) Start(_ context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *execProcess) Delete(context context.Context) error {
|
||||
e.Wait()
|
||||
e.io.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *execProcess) Resize(ws runc.WinSize) error {
|
||||
|
|
|
@ -23,7 +23,7 @@ type initProcess struct {
|
|||
pid int
|
||||
}
|
||||
|
||||
func newInitProcess(context context.Context, r *apishim.CreateRequest) (process, error) {
|
||||
func newInitProcess(context context.Context, r *apishim.CreateRequest) (*initProcess, error) {
|
||||
cwd, err := os.Getwd()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
|
@ -67,7 +67,6 @@ func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, w
|
|||
}
|
||||
dest(fw, fr)
|
||||
}
|
||||
|
||||
f, err := fifo.OpenFifo(ctx, stdin, syscall.O_RDONLY, 0)
|
||||
if err != nil {
|
||||
return fmt.Errorf("containerd-shim: opening %s failed: %s", stdin, err)
|
||||
|
|
|
@ -1,18 +1,10 @@
|
|||
package shim
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
runc "github.com/crosbymichael/go-runc"
|
||||
)
|
||||
import runc "github.com/crosbymichael/go-runc"
|
||||
|
||||
type process interface {
|
||||
// Pid returns the pid for the process
|
||||
Pid() int
|
||||
// Start starts the user's defined process inside
|
||||
Start(context.Context) error
|
||||
// Delete deletes the process and closes all open pipes
|
||||
Delete(context.Context) error
|
||||
// Resize resizes the process console
|
||||
Resize(ws runc.WinSize) error
|
||||
// Exited sets the exit status for the process
|
||||
|
|
|
@ -13,16 +13,20 @@ import (
|
|||
|
||||
var emptyResponse = &google_protobuf.Empty{}
|
||||
|
||||
// NewService returns a new shim service that can be used via GRPC
|
||||
func NewService() *Service {
|
||||
return &Service{
|
||||
processes: make(map[int]process),
|
||||
events: make(chan *apishim.Event, 2048),
|
||||
}
|
||||
}
|
||||
|
||||
type Service struct {
|
||||
initPid int
|
||||
mu sync.Mutex
|
||||
processes map[int]process
|
||||
initProcess *initProcess
|
||||
id string
|
||||
mu sync.Mutex
|
||||
processes map[int]process
|
||||
events chan *apishim.Event
|
||||
}
|
||||
|
||||
func (s *Service) Create(ctx context.Context, r *apishim.CreateRequest) (*apishim.CreateResponse, error) {
|
||||
|
@ -31,8 +35,10 @@ func (s *Service) Create(ctx context.Context, r *apishim.CreateRequest) (*apishi
|
|||
return nil, err
|
||||
}
|
||||
s.mu.Lock()
|
||||
s.initProcess = process
|
||||
pid := process.Pid()
|
||||
s.initPid, s.processes[pid] = pid, process
|
||||
s.processes[pid] = process
|
||||
s.id = r.ID
|
||||
s.mu.Unlock()
|
||||
return &apishim.CreateResponse{
|
||||
Pid: uint32(pid),
|
||||
|
@ -40,37 +46,28 @@ func (s *Service) Create(ctx context.Context, r *apishim.CreateRequest) (*apishi
|
|||
}
|
||||
|
||||
func (s *Service) Start(ctx context.Context, r *apishim.StartRequest) (*google_protobuf.Empty, error) {
|
||||
s.mu.Lock()
|
||||
p := s.processes[s.initPid]
|
||||
s.mu.Unlock()
|
||||
if err := p.Start(ctx); err != nil {
|
||||
if err := s.initProcess.Start(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return emptyResponse, nil
|
||||
}
|
||||
|
||||
func (s *Service) Delete(ctx context.Context, r *apishim.DeleteRequest) (*apishim.DeleteResponse, error) {
|
||||
s.mu.Lock()
|
||||
p, ok := s.processes[int(r.Pid)]
|
||||
s.mu.Unlock()
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("process does not exist %d", r.Pid)
|
||||
}
|
||||
if err := p.Delete(ctx); err != nil {
|
||||
if err := s.initProcess.Delete(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s.mu.Lock()
|
||||
delete(s.processes, int(r.Pid))
|
||||
delete(s.processes, s.initProcess.pid)
|
||||
s.mu.Unlock()
|
||||
return &apishim.DeleteResponse{
|
||||
ExitStatus: uint32(p.Status()),
|
||||
ExitStatus: uint32(s.initProcess.Status()),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *Service) Exec(ctx context.Context, r *apishim.ExecRequest) (*apishim.ExecResponse, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
process, err := newExecProcess(ctx, r, s.processes[s.initPid].(*initProcess))
|
||||
process, err := newExecProcess(ctx, r, s.initProcess)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -98,10 +95,25 @@ func (s *Service) Pty(ctx context.Context, r *apishim.PtyRequest) (*google_proto
|
|||
return emptyResponse, nil
|
||||
}
|
||||
|
||||
func (s *Service) Events(r *apishim.EventsRequest, stream apishim.Shim_EventsServer) error {
|
||||
for e := range s.events {
|
||||
if err := stream.Send(e); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) ProcessExit(e utils.Exit) error {
|
||||
s.mu.Lock()
|
||||
if p, ok := s.processes[e.Pid]; ok {
|
||||
p.Exited(e.Status)
|
||||
s.events <- &apishim.Event{
|
||||
Type: apishim.EventType_EXIT,
|
||||
ID: s.id,
|
||||
Pid: uint32(p.Pid()),
|
||||
ExitStatus: uint32(e.Status),
|
||||
}
|
||||
}
|
||||
s.mu.Unlock()
|
||||
return nil
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue