move work on execution service
Signed-off-by: Kenfe-Mickael Laventure <mickael.laventure@gmail.com>
This commit is contained in:
		
							parent
							
								
									dd5f74edec
								
							
						
					
					
						commit
						c857213b4c
					
				
					 15 changed files with 596 additions and 1016 deletions
				
			
		|  | @ -1,85 +1,29 @@ | |||
| package execution | ||||
| 
 | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"os" | ||||
| 	"path/filepath" | ||||
| 
 | ||||
| 	specs "github.com/opencontainers/runtime-spec/specs-go" | ||||
| ) | ||||
| 
 | ||||
| type ContainerController interface { | ||||
| 	Pause(*Container) error | ||||
| 	Resume(*Container) error | ||||
| 	Status(*Container) (Status, error) | ||||
| 	Process(c *Container, pid int) (*Process, error) | ||||
| 	Processes(*Container) ([]*Process, error) | ||||
| } | ||||
| 
 | ||||
| func NewContainer(c ContainerController) *Container { | ||||
| 	return &Container{ | ||||
| 		controller: c, | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| type Container struct { | ||||
| 	ID         string | ||||
| 	Bundle     string | ||||
| 	Root       string | ||||
| 	controller ContainerController | ||||
| 	ID       string | ||||
| 	Bundle   string | ||||
| 	StateDir StateDir | ||||
| 
 | ||||
| 	processes map[int]*Process | ||||
| 	processes map[string]Process | ||||
| } | ||||
| 
 | ||||
| func (c *Container) Process(pid int) (*Process, error) { | ||||
| 	for _, p := range c.processes { | ||||
| 		if p.Pid == pid { | ||||
| 			return p, nil | ||||
| 		} | ||||
| 	} | ||||
| 	return nil, fmt.Errorf("todo make real error") | ||||
| func (c *Container) AddProcess(p Process) { | ||||
| 	c.processes[p.ID()] = p | ||||
| } | ||||
| 
 | ||||
| func (c *Container) CreateProcess(spec *specs.Process) (*Process, error) { | ||||
| 	if err := os.MkdirAll(filepath.Join(c.Root, c.getNextProcessID()), 0660); err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	process := &Process{ | ||||
| 		Spec:       spec, | ||||
| 		controller: c.controller, | ||||
| 	} | ||||
| 	c.processes = append(c.processes, process) | ||||
| 	return process, nil | ||||
| func (c *Container) GetProcess(id string) Process { | ||||
| 	return c.processes[id] | ||||
| } | ||||
| 
 | ||||
| func (c *Container) DeleteProcess(pid int) error { | ||||
| 	process, ok := c.processes[pid] | ||||
| 	if !ok { | ||||
| 		return fmt.Errorf("it no here") | ||||
| 	} | ||||
| 	if process.Status() != Stopped { | ||||
| 		return fmt.Errorf("tototoit not stopped ok?") | ||||
| 	} | ||||
| 	delete(c.processes, pid) | ||||
| 	return os.RemoveAll(p.Root) | ||||
| func (c *Container) RemoveProcess(id string) { | ||||
| 	delete(c.processes, id) | ||||
| } | ||||
| 
 | ||||
| func (c *Container) Processes() []*Process { | ||||
| 	var out []*Process | ||||
| func (c *Container) Processes() []Process { | ||||
| 	var out []Process | ||||
| 	for _, p := range c.processes { | ||||
| 		out = append(out, p) | ||||
| 	} | ||||
| 	return out | ||||
| } | ||||
| 
 | ||||
| func (c *Container) Pause() error { | ||||
| 	return c.controller.Pause(c) | ||||
| } | ||||
| 
 | ||||
| func (c *Container) Resume() error { | ||||
| 	return c.controller.Resume(c) | ||||
| } | ||||
| 
 | ||||
| func (c *Container) Status() (Status, error) { | ||||
| 	return c.controller.Status(c) | ||||
| } | ||||
|  |  | |||
|  | @ -1,6 +1,11 @@ | |||
| package execution | ||||
| 
 | ||||
| import "io" | ||||
| import ( | ||||
| 	"io" | ||||
| 	"os" | ||||
| 
 | ||||
| 	"github.com/opencontainers/runtime-spec/specs-go" | ||||
| ) | ||||
| 
 | ||||
| type CreateOpts struct { | ||||
| 	Bundle string | ||||
|  | @ -9,9 +14,23 @@ type CreateOpts struct { | |||
| 	Stderr io.Writer | ||||
| } | ||||
| 
 | ||||
| type CreateProcessOpts struct { | ||||
| 	Spec   specs.Process | ||||
| 	Stdin  io.Reader | ||||
| 	Stdout io.Writer | ||||
| 	Stderr io.Writer | ||||
| } | ||||
| 
 | ||||
| type Executor interface { | ||||
| 	Create(id string, o CreateOpts) (*Container, error) | ||||
| 	Pause(*Container) error | ||||
| 	Resume(*Container) error | ||||
| 	Status(*Container) (Status, error) | ||||
| 	List() ([]*Container, error) | ||||
| 	Load(id string) (*Container, error) | ||||
| 	Delete(string) error | ||||
| 	Delete(*Container) error | ||||
| 
 | ||||
| 	StartProcess(*Container, CreateProcessOpts) (Process, error) | ||||
| 	SignalProcess(*Container, os.Signal) error | ||||
| 	DeleteProcess(*Container, string) error | ||||
| } | ||||
|  |  | |||
|  | @ -1,13 +1,13 @@ | |||
| package executors | ||||
| 
 | ||||
| import "github.com/docker/containerd" | ||||
| import "github.com/docker/containerd/execution" | ||||
| 
 | ||||
| var executors = make(map[string]func() containerd.Executor) | ||||
| var executors = make(map[string]func() execution.Executor) | ||||
| 
 | ||||
| func Register(name string, e func() containerd.Executor) { | ||||
| func Register(name string, e func() execution.Executor) { | ||||
| 	executors[name] = e | ||||
| } | ||||
| 
 | ||||
| func Get(name string) func() containerd.Executor { | ||||
| func Get(name string) func() execution.Executor { | ||||
| 	return executors[name] | ||||
| } | ||||
|  |  | |||
|  | @ -1,23 +0,0 @@ | |||
| package oci | ||||
| 
 | ||||
| import "github.com/docker/containerd/execution" | ||||
| 
 | ||||
| type containerController struct { | ||||
| 	root string | ||||
| } | ||||
| 
 | ||||
| func (c *containerController) Process(container *execution.Container, pid int) (*execution.Process, error) { | ||||
| 
 | ||||
| } | ||||
| 
 | ||||
| func (c *containerController) Processes(container *execution.Container) ([]*execution.Process, error) { | ||||
| 
 | ||||
| } | ||||
| 
 | ||||
| func (c *containerController) Pause(container *execution.Container) error { | ||||
| 	return command(c.root, "pause", container.ID).Run() | ||||
| } | ||||
| 
 | ||||
| func (c *containerController) Resume(container *execution.Container) error { | ||||
| 	return command(c.root, "resume", container.ID).Run() | ||||
| } | ||||
|  | @ -1,16 +1,14 @@ | |||
| package oci | ||||
| 
 | ||||
| import ( | ||||
| 	"encoding/json" | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"io/ioutil" | ||||
| 	"os/exec" | ||||
| 	"os" | ||||
| 	"path/filepath" | ||||
| 	"strconv" | ||||
| 	"time" | ||||
| 	"syscall" | ||||
| 
 | ||||
| 	"github.com/docker/containerd" | ||||
| 	"github.com/crosbymichael/go-runc" | ||||
| 	"github.com/docker/containerd/execution" | ||||
| 	"github.com/docker/containerd/executors" | ||||
| ) | ||||
|  | @ -25,107 +23,178 @@ func init() { | |||
| func New(root string) *OCIRuntime { | ||||
| 	return &OCIRuntime{ | ||||
| 		root: root, | ||||
| 		Runc: &runc.Runc{ | ||||
| 			Root: filepath.Join(root, "runc"), | ||||
| 		}, | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| type OCIRuntime struct { | ||||
| 	// root holds runtime state information for the containers | ||||
| 	// launched by the runtime | ||||
| 	root string | ||||
| 	runc *runc.Runc | ||||
| } | ||||
| 
 | ||||
| func (r *OCIRuntime) Create(id string, o execution.CreateOpts) (*execution.Container, error) { | ||||
| 	var err error | ||||
| 
 | ||||
| 	stateDir, err := NewStateDir(r.root, id) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	initStateDir, err := stateDir.NewProcess() | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	// /run/runc/redis/1/pid | ||||
| 	pidFile := filepath.Join(r.root, id, "1", "pid") | ||||
| 	cmd := command(r.root, "create", | ||||
| 		"--pid-file", pidFile, | ||||
| 		"--bundle", o.Bundle, | ||||
| 		id, | ||||
| 	) | ||||
| 	cmd.Stdin, cmd.Stdout, cmd.Stderr = o.Stdin, o.Stdout, o.Stderr | ||||
| 	if err := cmd.Run(); err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	// TODO: kill on error | ||||
| 	data, err := ioutil.ReadFile(pidFile) | ||||
| 	pidFile := filepath.Join(initStateDir, "pid") | ||||
| 	err = r.runc.Create(id, o.Bundle, &runc.CreateOpts{ | ||||
| 		Pidfile: pidfile, | ||||
| 		Stdin:   o.Stdin, | ||||
| 		Stdout:  o.Stdout, | ||||
| 		Stderr:  o.Stderr, | ||||
| 	}) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	pid, err := strconv.Atoi(string(data)) | ||||
| 	pid, err := runc.ReadPifFile(pidfile) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	container := execution.NewContainer(r) | ||||
| 	container.ID = id | ||||
| 	container.Root = filepath.Join(r.root, id) | ||||
| 	container.Bundle = o.Bundle | ||||
| 	process, err := container.CreateProcess(nil) | ||||
| 	process, err := newProcess(pid) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	process.Pid = pid | ||||
| 	process.Stdin = o.Stdin | ||||
| 	process.Stdout = o.Stdout | ||||
| 	process.Stderr = o.Stderr | ||||
| 
 | ||||
| 	container := &execution.Container{ | ||||
| 		ID:       id, | ||||
| 		Bundle:   o.Bundle, | ||||
| 		StateDir: stateDir, | ||||
| 	} | ||||
| 	container.AddProcess(process) | ||||
| 
 | ||||
| 	return container, nil | ||||
| } | ||||
| 
 | ||||
| func (r *OCIRuntime) Load(id string) (containerd.ProcessDelegate, error) { | ||||
| 	data, err := r.Command("state", id).Output() | ||||
| func (r *OCIRuntime) load(runcC *runc.Container) (*execution.Container, error) { | ||||
| 	container := &execution.Container{ | ||||
| 		ID:       runcC.ID, | ||||
| 		Bundle:   runcC.Bundle, | ||||
| 		StateDir: StateDir(filepath.Join(r.root, runcC.ID)), | ||||
| 	} | ||||
| 
 | ||||
| 	process, err := newProcess(runcC.Pid) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	var s state | ||||
| 	if err := json.Unmarshal(data, &s); err != nil { | ||||
| 	container.AddProcess(process) | ||||
| 
 | ||||
| 	// /run/containerd/container-id/processess/process-id | ||||
| 	dirs, err := ioutil.ReadDir(filepath.Join(container.Root)) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	return newProcess(s.Pid) | ||||
| 
 | ||||
| 	return container, nil | ||||
| } | ||||
| 
 | ||||
| func (r *OCIRuntime) Delete(id string) error { | ||||
| 	return command(r.root, "delete", id).Run() | ||||
| func (r *OCIRuntime) List() ([]*execution.Container, error) { | ||||
| 	runcCs, err := r.runc.List() | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	containers := make([]*execution.Container) | ||||
| 	for _, c := range runcCs { | ||||
| 		container, err := r.load(c) | ||||
| 		if err != nil { | ||||
| 			return nil, err | ||||
| 		} | ||||
| 		containers = append(containers, container) | ||||
| 	} | ||||
| 
 | ||||
| 	return containers, nil | ||||
| } | ||||
| 
 | ||||
| func (r *OCIRuntime) Exec(c *containerd.Container, p *containerd.Process) (containerd.ProcessDelegate, error) { | ||||
| 	f, err := ioutil.TempFile(filepath.Join(r.root, c.ID()), "process") | ||||
| func (r *OCIRuntime) Load(id string) (*execution.Container, error) { | ||||
| 	runcC, err := r.runc.State(id) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	path := f.Name() | ||||
| 	pidFile := fmt.Sprintf("%s/%s.pid", filepath.Join(r.root, c.ID()), filepath.Base(path)) | ||||
| 	err = json.NewEncoder(f).Encode(p.Spec()) | ||||
| 	f.Close() | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	cmd := r.Command("exec", "--detach", "--process", path, "--pid-file", pidFile, c.ID()) | ||||
| 	cmd.Stdin, cmd.Stdout, cmd.Stderr = p.Stdin, p.Stdout, p.Stderr | ||||
| 	if err := cmd.Run(); err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	data, err := ioutil.ReadFile(pidFile) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	i, err := strconv.Atoi(string(data)) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	return newProcess(i) | ||||
| 
 | ||||
| 	return r.load(runcC) | ||||
| } | ||||
| 
 | ||||
| type state struct { | ||||
| 	ID          string            `json:"id"` | ||||
| 	Pid         int               `json:"pid"` | ||||
| 	Status      string            `json:"status"` | ||||
| 	Bundle      string            `json:"bundle"` | ||||
| 	Rootfs      string            `json:"rootfs"` | ||||
| 	Created     time.Time         `json:"created"` | ||||
| 	Annotations map[string]string `json:"annotations"` | ||||
| func (r *OCIRuntime) Delete(c *execution.Container) error { | ||||
| 	if err := r.runc.Delete(c.ID); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	c.StateDir.Delete() | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func command(root, args ...string) *exec.Cmd { | ||||
| 	return exec.Command("runc", append( | ||||
| 		[]string{"--root", root}, | ||||
| 		args...)...) | ||||
| func (r *OCIRuntime) Pause(c *execution.Container) error { | ||||
| 	return r.runc.Pause(c.ID) | ||||
| } | ||||
| 
 | ||||
| func (r *OCIRuntime) Resume(c *execution.Container) error { | ||||
| 	return r.runc.Resume(c.ID) | ||||
| } | ||||
| 
 | ||||
| func (r *OCIRuntime) StartProcess(c *execution.Container, o CreateProcessOpts) (execution.Process, error) { | ||||
| 	var err error | ||||
| 
 | ||||
| 	processStateDir, err := c.StateDir.NewProcess() | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	defer func() { | ||||
| 		if err != nil { | ||||
| 			c.StateDir.DeleteProcess(filepath.Base(processStateDir)) | ||||
| 		} | ||||
| 	}() | ||||
| 
 | ||||
| 	pidFile := filepath.Join(processStateDir, id) | ||||
| 	err := r.runc.ExecProcess(c.ID, o.spec, &runc.ExecOpts{ | ||||
| 		PidFile: pidfile, | ||||
| 		Detach:  true, | ||||
| 		Stdin:   o.stdin, | ||||
| 		Stdout:  o.stdout, | ||||
| 		Stderr:  o.stderr, | ||||
| 	}) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	pid, err := runc.ReadPidFile(pidfile) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	process, err := newProcess(pid) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	container.AddProcess(process) | ||||
| 
 | ||||
| 	return process, nil | ||||
| } | ||||
| 
 | ||||
| func (r *OCIRuntime) SignalProcess(c *execution.Container, id string, sig os.Signal) error { | ||||
| 	process := c.GetProcess(id) | ||||
| 	if process == nil { | ||||
| 		return fmt.Errorf("Make a Process Not Found error") | ||||
| 	} | ||||
| 	return syscall.Kill(int(process.Pid()), os.Signal) | ||||
| } | ||||
| 
 | ||||
| func (r *OCIRuntime) GetProcess(c *execution.Container, id string) process { | ||||
| 	return c.GetProcess(id) | ||||
| } | ||||
| 
 | ||||
| func (r *OCIRuntime) DeleteProcess(c *execution.Container, id string) error { | ||||
| 	c.StateDir.DeleteProcess(id) | ||||
| 	return nil | ||||
| } | ||||
|  |  | |||
|  | @ -3,9 +3,11 @@ package oci | |||
| import ( | ||||
| 	"os" | ||||
| 	"syscall" | ||||
| 
 | ||||
| 	"github.com/docker/containerd/execution" | ||||
| ) | ||||
| 
 | ||||
| func newProcess(pid int) (*process, error) { | ||||
| func newProcess(pid int) (execution.Process, error) { | ||||
| 	proc, err := os.FindProcess(pid) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
|  |  | |||
|  | @ -1,41 +1,18 @@ | |||
| package execution | ||||
| 
 | ||||
| import ( | ||||
| 	"io" | ||||
| 	"os" | ||||
| 
 | ||||
| 	specs "github.com/opencontainers/runtime-spec/specs-go" | ||||
| 	"github.com/opencontainers/runtime-spec/specs-go" | ||||
| ) | ||||
| 
 | ||||
| type ProcessController interface { | ||||
| 	Start(*Process) error | ||||
| 	Status(*Process) (Status, error) | ||||
| 	Wait(*Process) (uint32, error) | ||||
| 	Signal(*Process, os.Signal) error | ||||
| } | ||||
| type Process interface { | ||||
| 	ID() string | ||||
| 	Pid() int64 | ||||
| 	Spec() *specs.Process | ||||
| 
 | ||||
| type Process struct { | ||||
| 	Pid    int | ||||
| 	Spec   *specs.Process | ||||
| 	Stdin  io.Reader | ||||
| 	Stdout io.Writer | ||||
| 	Stderr io.Writer | ||||
| 
 | ||||
| 	controller ProcessController | ||||
| } | ||||
| 
 | ||||
| func (p *Process) Status() (Status, error) { | ||||
| 	return p.controller.Status(p) | ||||
| } | ||||
| 
 | ||||
| func (p *Process) Wait() (uint32, error) { | ||||
| 	return p.controller.Wait(p) | ||||
| } | ||||
| 
 | ||||
| func (p *Process) Signal(s os.Signal) error { | ||||
| 	return p.controller.Signal(p, s) | ||||
| } | ||||
| 
 | ||||
| func (p *Process) Start() error { | ||||
| 	return p.controller.Start(p) | ||||
| 	Start() error | ||||
| 	Status() (Status, error) | ||||
| 	Wait() (uint32, error) | ||||
| 	Signal(os.Signal) error | ||||
| } | ||||
|  |  | |||
							
								
								
									
										68
									
								
								execution/statedir.go
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										68
									
								
								execution/statedir.go
									
										
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,68 @@ | |||
| package execution | ||||
| 
 | ||||
| import ( | ||||
| 	"io/ioutil" | ||||
| 	"os" | ||||
| 	"path/filepath" | ||||
| ) | ||||
| 
 | ||||
| const processesDir = "processes" | ||||
| 
 | ||||
| type StateDir string | ||||
| 
 | ||||
| func NewStateDir(root, id string) (StateDir, error) { | ||||
| 	path := filepath.Join(root, id) | ||||
| 	err := os.Mkdir(path, 0700) | ||||
| 	if err != nil { | ||||
| 		return "", err | ||||
| 	} | ||||
| 
 | ||||
| 	err = os.Mkdir(filepath.Join(path, processesDir), 0700) | ||||
| 	if err != nil { | ||||
| 		os.RemoveAll(path) | ||||
| 		return "", err | ||||
| 	} | ||||
| 
 | ||||
| 	return StateDir(path), err | ||||
| } | ||||
| 
 | ||||
| func (s StateDir) Delete() error { | ||||
| 	return os.RemoveAll(string(s)) | ||||
| } | ||||
| 
 | ||||
| func (s StateDir) NewProcess(id string) (string, error) { | ||||
| 	// TODO: generate id | ||||
| 	newPath := filepath.Join(string(s), "1") | ||||
| 	err := os.Mkdir(newPath, 0755) | ||||
| 	if err != nil { | ||||
| 		return "", err | ||||
| 	} | ||||
| 
 | ||||
| 	return newPath, nil | ||||
| } | ||||
| 
 | ||||
| func (s StateDir) ProcessDir(id string) string { | ||||
| 	return filepath.Join(string(s), id) | ||||
| } | ||||
| 
 | ||||
| func (s StateDir) DeleteProcess(id string) error { | ||||
| 	return os.RemoveAll(filepath.Join(string(s), id)) | ||||
| } | ||||
| 
 | ||||
| func (s StateDir) Processes() ([]string, error) { | ||||
| 	basepath := filepath.Join(string(s), processesDir) | ||||
| 	dirs, err := ioutil.ReadDir(basepath) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	paths := make([]string, 0) | ||||
| 	for _, d := range dirs { | ||||
| 
 | ||||
| 		if d.IsDir() { | ||||
| 			paths = append(paths, filepath.Join(basepath, d.Name())) | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	return paths, nil | ||||
| } | ||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue