Move runtime implementation types to pkg
Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
parent
779cb69e6d
commit
c24abdde1b
8 changed files with 60 additions and 51 deletions
|
@ -8,6 +8,7 @@ import (
|
||||||
|
|
||||||
"github.com/Sirupsen/logrus"
|
"github.com/Sirupsen/logrus"
|
||||||
"github.com/docker/containerd"
|
"github.com/docker/containerd"
|
||||||
|
"github.com/docker/containerd/runtime"
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
"github.com/opencontainers/specs"
|
"github.com/opencontainers/specs"
|
||||||
)
|
)
|
||||||
|
@ -51,8 +52,8 @@ func (s *server) updateContainer(w http.ResponseWriter, r *http.Request) {
|
||||||
}
|
}
|
||||||
e := containerd.NewEvent(containerd.UpdateContainerEventType)
|
e := containerd.NewEvent(containerd.UpdateContainerEventType)
|
||||||
e.ID = id
|
e.ID = id
|
||||||
e.State = &containerd.State{
|
e.State = &runtime.State{
|
||||||
Status: containerd.Status(string(state.Status)),
|
Status: runtime.Status(string(state.Status)),
|
||||||
}
|
}
|
||||||
s.supervisor.SendEvent(e)
|
s.supervisor.SendEvent(e)
|
||||||
if err := <-e.Err; err != nil {
|
if err := <-e.Err; err != nil {
|
||||||
|
@ -214,7 +215,7 @@ func writeContainers(w http.ResponseWriter, e *containerd.Event) error {
|
||||||
return json.NewEncoder(w).Encode(&state)
|
return json.NewEncoder(w).Encode(&state)
|
||||||
}
|
}
|
||||||
|
|
||||||
func createProcess(in containerd.Process) *Process {
|
func createProcess(in runtime.Process) *Process {
|
||||||
pid, err := in.Pid()
|
pid, err := in.Pid()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithField("error", err).Error("get process pid")
|
logrus.WithField("error", err).Error("get process pid")
|
||||||
|
@ -248,7 +249,7 @@ func (s *server) createContainer(w http.ResponseWriter, r *http.Request) {
|
||||||
e := containerd.NewEvent(containerd.StartContainerEventType)
|
e := containerd.NewEvent(containerd.StartContainerEventType)
|
||||||
e.ID = id
|
e.ID = id
|
||||||
e.BundlePath = c.BundlePath
|
e.BundlePath = c.BundlePath
|
||||||
e.Stdio = &containerd.Stdio{
|
e.Stdio = &runtime.Stdio{
|
||||||
Stderr: c.Stderr,
|
Stderr: c.Stderr,
|
||||||
Stdout: c.Stdout,
|
Stdout: c.Stdout,
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,9 @@
|
||||||
package containerd
|
package containerd
|
||||||
|
|
||||||
import "github.com/Sirupsen/logrus"
|
import (
|
||||||
|
"github.com/Sirupsen/logrus"
|
||||||
|
"github.com/docker/containerd/runtime"
|
||||||
|
)
|
||||||
|
|
||||||
type DeleteEvent struct {
|
type DeleteEvent struct {
|
||||||
s *Supervisor
|
s *Supervisor
|
||||||
|
@ -16,7 +19,7 @@ func (h *DeleteEvent) Handle(e *Event) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *DeleteEvent) deleteContainer(container Container) error {
|
func (h *DeleteEvent) deleteContainer(container runtime.Container) error {
|
||||||
delete(h.s.containers, container.ID())
|
delete(h.s.containers, container.ID())
|
||||||
return container.Delete()
|
return container.Delete()
|
||||||
}
|
}
|
||||||
|
|
12
event.go
12
event.go
|
@ -4,6 +4,7 @@ import (
|
||||||
"os"
|
"os"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/docker/containerd/runtime"
|
||||||
"github.com/opencontainers/specs"
|
"github.com/opencontainers/specs"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -27,23 +28,18 @@ func NewEvent(t EventType) *Event {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type Stdio struct {
|
|
||||||
Stderr string `json:"stderr,omitempty"`
|
|
||||||
Stdout string `json:"stdout,omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type Event struct {
|
type Event struct {
|
||||||
Type EventType `json:"type"`
|
Type EventType `json:"type"`
|
||||||
Timestamp time.Time `json:"timestamp"`
|
Timestamp time.Time `json:"timestamp"`
|
||||||
ID string `json:"id,omitempty"`
|
ID string `json:"id,omitempty"`
|
||||||
BundlePath string `json:"bundlePath,omitempty"`
|
BundlePath string `json:"bundlePath,omitempty"`
|
||||||
Stdio *Stdio `json:"stdio,omitempty"`
|
Stdio *runtime.Stdio `json:"stdio,omitempty"`
|
||||||
Pid int `json:"pid,omitempty"`
|
Pid int `json:"pid,omitempty"`
|
||||||
Status int `json:"status,omitempty"`
|
Status int `json:"status,omitempty"`
|
||||||
Signal os.Signal `json:"signal,omitempty"`
|
Signal os.Signal `json:"signal,omitempty"`
|
||||||
Process *specs.Process `json:"process,omitempty"`
|
Process *specs.Process `json:"process,omitempty"`
|
||||||
State *State `json:"state,omitempty"`
|
State *runtime.State `json:"state,omitempty"`
|
||||||
Containers []Container `json:"-"`
|
Containers []runtime.Container `json:"-"`
|
||||||
Err chan error `json:"-"`
|
Err chan error `json:"-"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
package containerd
|
package runtime
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"os"
|
"os"
|
||||||
|
@ -22,6 +22,11 @@ type State struct {
|
||||||
Status Status `json:"status,omitempty"`
|
Status Status `json:"status,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type Stdio struct {
|
||||||
|
Stderr string `json:"stderr,omitempty"`
|
||||||
|
Stdout string `json:"stdout,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
type Container interface {
|
type Container interface {
|
||||||
// ID returns the container ID
|
// ID returns the container ID
|
||||||
ID() string
|
ID() string
|
|
@ -1,4 +1,4 @@
|
||||||
package containerd
|
package runtime
|
||||||
|
|
||||||
import "github.com/opencontainers/specs"
|
import "github.com/opencontainers/specs"
|
||||||
|
|
|
@ -8,11 +8,12 @@ import (
|
||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"runtime"
|
goruntime "runtime"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"syscall"
|
"syscall"
|
||||||
|
|
||||||
|
"github.com/docker/containerd/runtime"
|
||||||
"github.com/opencontainers/runc/libcontainer"
|
"github.com/opencontainers/runc/libcontainer"
|
||||||
"github.com/opencontainers/runc/libcontainer/configs"
|
"github.com/opencontainers/runc/libcontainer/configs"
|
||||||
_ "github.com/opencontainers/runc/libcontainer/nsenter"
|
_ "github.com/opencontainers/runc/libcontainer/nsenter"
|
||||||
|
@ -149,8 +150,8 @@ var mountPropagationMapping = map[string]int{
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
if len(os.Args) > 1 && os.Args[1] == "init" {
|
if len(os.Args) > 1 && os.Args[1] == "init" {
|
||||||
runtime.GOMAXPROCS(1)
|
goruntime.GOMAXPROCS(1)
|
||||||
runtime.LockOSThread()
|
goruntime.LockOSThread()
|
||||||
factory, _ := libcontainer.New("")
|
factory, _ := libcontainer.New("")
|
||||||
if err := factory.StartInitialization(); err != nil {
|
if err := factory.StartInitialization(); err != nil {
|
||||||
fmt.Fprint(os.Stderr, err)
|
fmt.Fprint(os.Stderr, err)
|
||||||
|
@ -199,8 +200,8 @@ func (c *libcontainerContainer) Pause() error {
|
||||||
return c.c.Pause()
|
return c.c.Pause()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *libcontainerContainer) State() State {
|
func (c *libcontainerContainer) State() runtime.State {
|
||||||
s := State{}
|
s := runtime.State{}
|
||||||
// TODO: what to do with error
|
// TODO: what to do with error
|
||||||
state, err := c.c.Status()
|
state, err := c.c.Status()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -208,9 +209,9 @@ func (c *libcontainerContainer) State() State {
|
||||||
}
|
}
|
||||||
switch state {
|
switch state {
|
||||||
case libcontainer.Paused, libcontainer.Pausing:
|
case libcontainer.Paused, libcontainer.Pausing:
|
||||||
s.Status = Paused
|
s.Status = runtime.Paused
|
||||||
default:
|
default:
|
||||||
s.Status = Running
|
s.Status = runtime.Running
|
||||||
}
|
}
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
@ -241,8 +242,8 @@ func (c *libcontainerContainer) Delete() error {
|
||||||
return c.c.Destroy()
|
return c.c.Destroy()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *libcontainerContainer) Processes() ([]Process, error) {
|
func (c *libcontainerContainer) Processes() ([]runtime.Process, error) {
|
||||||
procs := []Process{
|
procs := []runtime.Process{
|
||||||
c.initProcess,
|
c.initProcess,
|
||||||
}
|
}
|
||||||
for _, p := range c.additionalProcesses {
|
for _, p := range c.additionalProcesses {
|
||||||
|
@ -259,7 +260,7 @@ func (c *libcontainerContainer) RemoveProcess(pid int) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewRuntime(stateDir string) (Runtime, error) {
|
func NewRuntime(stateDir string) (runtime.Runtime, error) {
|
||||||
f, err := libcontainer.New(stateDir, libcontainer.Cgroupfs, func(l *libcontainer.LinuxFactory) error {
|
f, err := libcontainer.New(stateDir, libcontainer.Cgroupfs, func(l *libcontainer.LinuxFactory) error {
|
||||||
//l.CriuPath = context.GlobalString("criu")
|
//l.CriuPath = context.GlobalString("criu")
|
||||||
return nil
|
return nil
|
||||||
|
@ -276,7 +277,7 @@ type libcontainerRuntime struct {
|
||||||
factory libcontainer.Factory
|
factory libcontainer.Factory
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *libcontainerRuntime) Create(id, bundlePath string, stdio *Stdio) (Container, error) {
|
func (r *libcontainerRuntime) Create(id, bundlePath string, stdio *runtime.Stdio) (runtime.Container, error) {
|
||||||
spec, rspec, err := r.loadSpec(
|
spec, rspec, err := r.loadSpec(
|
||||||
filepath.Join(bundlePath, "config.json"),
|
filepath.Join(bundlePath, "config.json"),
|
||||||
filepath.Join(bundlePath, "runtime.json"),
|
filepath.Join(bundlePath, "runtime.json"),
|
||||||
|
@ -308,7 +309,7 @@ func (r *libcontainerRuntime) Create(id, bundlePath string, stdio *Stdio) (Conta
|
||||||
return c, nil
|
return c, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *libcontainerRuntime) StartProcess(ci Container, p specs.Process, stdio *Stdio) (Process, error) {
|
func (r *libcontainerRuntime) StartProcess(ci runtime.Container, p specs.Process, stdio *runtime.Stdio) (runtime.Process, error) {
|
||||||
c, ok := ci.(*libcontainerContainer)
|
c, ok := ci.(*libcontainerContainer)
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, errInvalidContainerType
|
return nil, errInvalidContainerType
|
||||||
|
@ -334,7 +335,7 @@ func (r *libcontainerRuntime) StartProcess(ci Container, p specs.Process, stdio
|
||||||
|
|
||||||
// newProcess returns a new libcontainer Process with the arguments from the
|
// newProcess returns a new libcontainer Process with the arguments from the
|
||||||
// spec and stdio from the current process.
|
// spec and stdio from the current process.
|
||||||
func (r *libcontainerRuntime) newProcess(p specs.Process, stdio *Stdio) (*libcontainer.Process, error) {
|
func (r *libcontainerRuntime) newProcess(p specs.Process, stdio *runtime.Stdio) (*libcontainer.Process, error) {
|
||||||
var (
|
var (
|
||||||
stderr, stdout io.Writer
|
stderr, stdout io.Writer
|
||||||
)
|
)
|
||||||
|
|
|
@ -3,11 +3,12 @@ package containerd
|
||||||
import (
|
import (
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"runtime"
|
goruntime "runtime"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/Sirupsen/logrus"
|
"github.com/Sirupsen/logrus"
|
||||||
|
"github.com/docker/containerd/runtime"
|
||||||
"github.com/opencontainers/runc/libcontainer"
|
"github.com/opencontainers/runc/libcontainer"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -17,7 +18,7 @@ func NewSupervisor(stateDir string, concurrency int) (*Supervisor, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
// register counters
|
// register counters
|
||||||
runtime, err := NewRuntime(stateDir)
|
r, err := NewRuntime(stateDir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -27,9 +28,9 @@ func NewSupervisor(stateDir string, concurrency int) (*Supervisor, error) {
|
||||||
}
|
}
|
||||||
s := &Supervisor{
|
s := &Supervisor{
|
||||||
stateDir: stateDir,
|
stateDir: stateDir,
|
||||||
containers: make(map[string]Container),
|
containers: make(map[string]runtime.Container),
|
||||||
processes: make(map[int]Container),
|
processes: make(map[int]runtime.Container),
|
||||||
runtime: runtime,
|
runtime: r,
|
||||||
tasks: make(chan *startTask, concurrency*100),
|
tasks: make(chan *startTask, concurrency*100),
|
||||||
journal: j,
|
journal: j,
|
||||||
}
|
}
|
||||||
|
@ -54,10 +55,10 @@ func NewSupervisor(stateDir string, concurrency int) (*Supervisor, error) {
|
||||||
type Supervisor struct {
|
type Supervisor struct {
|
||||||
// stateDir is the directory on the system to store container runtime state information.
|
// stateDir is the directory on the system to store container runtime state information.
|
||||||
stateDir string
|
stateDir string
|
||||||
containers map[string]Container
|
containers map[string]runtime.Container
|
||||||
processes map[int]Container
|
processes map[int]runtime.Container
|
||||||
handlers map[EventType]Handler
|
handlers map[EventType]Handler
|
||||||
runtime Runtime
|
runtime runtime.Runtime
|
||||||
journal *journal
|
journal *journal
|
||||||
events chan *Event
|
events chan *Event
|
||||||
tasks chan *startTask
|
tasks chan *startTask
|
||||||
|
@ -86,7 +87,7 @@ func (s *Supervisor) Start(events chan *Event) error {
|
||||||
go func() {
|
go func() {
|
||||||
// allocate an entire thread to this goroutine for the main event loop
|
// allocate an entire thread to this goroutine for the main event loop
|
||||||
// so that nothing else is scheduled over the top of it.
|
// so that nothing else is scheduled over the top of it.
|
||||||
runtime.LockOSThread()
|
goruntime.LockOSThread()
|
||||||
for e := range events {
|
for e := range events {
|
||||||
s.journal.write(e)
|
s.journal.write(e)
|
||||||
h, ok := s.handlers[e.Type]
|
h, ok := s.handlers[e.Type]
|
||||||
|
@ -107,7 +108,7 @@ func (s *Supervisor) Start(events chan *Event) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Supervisor) getContainerForPid(pid int) (Container, error) {
|
func (s *Supervisor) getContainerForPid(pid int) (runtime.Container, error) {
|
||||||
for _, container := range s.containers {
|
for _, container := range s.containers {
|
||||||
cpid, err := container.Pid()
|
cpid, err := container.Pid()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -131,7 +132,7 @@ func (s *Supervisor) SendEvent(evt *Event) {
|
||||||
}
|
}
|
||||||
|
|
||||||
type startTask struct {
|
type startTask struct {
|
||||||
container Container
|
container runtime.Container
|
||||||
err chan error
|
err chan error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,7 @@
|
||||||
package containerd
|
package containerd
|
||||||
|
|
||||||
|
import "github.com/docker/containerd/runtime"
|
||||||
|
|
||||||
type UpdateEvent struct {
|
type UpdateEvent struct {
|
||||||
s *Supervisor
|
s *Supervisor
|
||||||
}
|
}
|
||||||
|
@ -11,11 +13,11 @@ func (h *UpdateEvent) Handle(e *Event) error {
|
||||||
}
|
}
|
||||||
if e.State.Status != "" {
|
if e.State.Status != "" {
|
||||||
switch e.State.Status {
|
switch e.State.Status {
|
||||||
case Running:
|
case runtime.Running:
|
||||||
if err := container.Resume(); err != nil {
|
if err := container.Resume(); err != nil {
|
||||||
return ErrUnknownContainerStatus
|
return ErrUnknownContainerStatus
|
||||||
}
|
}
|
||||||
case Paused:
|
case runtime.Paused:
|
||||||
if err := container.Pause(); err != nil {
|
if err := container.Pause(); err != nil {
|
||||||
return ErrUnknownContainerStatus
|
return ErrUnknownContainerStatus
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue