Merge branch 'logs'
This commit is contained in:
commit
1e6ebc63bb
18 changed files with 338 additions and 83 deletions
|
@ -1,21 +1,36 @@
|
||||||
package containerd
|
package containerd
|
||||||
|
|
||||||
|
import "github.com/Sirupsen/logrus"
|
||||||
|
|
||||||
type AddProcessEvent struct {
|
type AddProcessEvent struct {
|
||||||
s *Supervisor
|
s *Supervisor
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: add this to worker for concurrent starts??? maybe not because of races where the container
|
||||||
|
// could be stopped and removed...
|
||||||
func (h *AddProcessEvent) Handle(e *Event) error {
|
func (h *AddProcessEvent) Handle(e *Event) error {
|
||||||
container, ok := h.s.containers[e.ID]
|
ci, ok := h.s.containers[e.ID]
|
||||||
if !ok {
|
if !ok {
|
||||||
return ErrContainerNotFound
|
return ErrContainerNotFound
|
||||||
}
|
}
|
||||||
p, err := h.s.runtime.StartProcess(container, *e.Process, e.Stdio)
|
p, io, err := h.s.runtime.StartProcess(ci.container, *e.Process)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
l, err := h.s.log(ci.container.Path(), io)
|
||||||
|
if err != nil {
|
||||||
|
// log the error but continue with the other commands
|
||||||
|
logrus.WithFields(logrus.Fields{
|
||||||
|
"error": err,
|
||||||
|
"id": e.ID,
|
||||||
|
}).Error("log stdio")
|
||||||
|
}
|
||||||
if e.Pid, err = p.Pid(); err != nil {
|
if e.Pid, err = p.Pid(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
h.s.processes[e.Pid] = container
|
h.s.processes[e.Pid] = &containerInfo{
|
||||||
|
container: ci.container,
|
||||||
|
logger: l,
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,10 +38,6 @@ func (s *apiServer) CreateContainer(ctx context.Context, c *types.CreateContaine
|
||||||
Name: c.Checkpoint,
|
Name: c.Checkpoint,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
e.Stdio = &runtime.Stdio{
|
|
||||||
Stderr: c.Stderr,
|
|
||||||
Stdout: c.Stdout,
|
|
||||||
}
|
|
||||||
s.sv.SendEvent(e)
|
s.sv.SendEvent(e)
|
||||||
if err := <-e.Err; err != nil {
|
if err := <-e.Err; err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
|
@ -5,11 +5,11 @@ type CreateCheckpointEvent struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *CreateCheckpointEvent) Handle(e *Event) error {
|
func (h *CreateCheckpointEvent) Handle(e *Event) error {
|
||||||
container, ok := h.s.containers[e.ID]
|
i, ok := h.s.containers[e.ID]
|
||||||
if !ok {
|
if !ok {
|
||||||
return ErrContainerNotFound
|
return ErrContainerNotFound
|
||||||
}
|
}
|
||||||
return container.Checkpoint(*e.Checkpoint)
|
return i.container.Checkpoint(*e.Checkpoint)
|
||||||
}
|
}
|
||||||
|
|
||||||
type DeleteCheckpointEvent struct {
|
type DeleteCheckpointEvent struct {
|
||||||
|
@ -17,9 +17,9 @@ type DeleteCheckpointEvent struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *DeleteCheckpointEvent) Handle(e *Event) error {
|
func (h *DeleteCheckpointEvent) Handle(e *Event) error {
|
||||||
container, ok := h.s.containers[e.ID]
|
i, ok := h.s.containers[e.ID]
|
||||||
if !ok {
|
if !ok {
|
||||||
return ErrContainerNotFound
|
return ErrContainerNotFound
|
||||||
}
|
}
|
||||||
return container.DeleteCheckpoint(e.Checkpoint.Name)
|
return i.container.DeleteCheckpoint(e.Checkpoint.Name)
|
||||||
}
|
}
|
||||||
|
|
60
ctr/logs.go
Normal file
60
ctr/logs.go
Normal file
|
@ -0,0 +1,60 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/codegangsta/cli"
|
||||||
|
"github.com/docker/containerd"
|
||||||
|
)
|
||||||
|
|
||||||
|
var LogsCommand = cli.Command{
|
||||||
|
Name: "logs",
|
||||||
|
Usage: "view binary container logs generated by containerd",
|
||||||
|
Flags: []cli.Flag{
|
||||||
|
cli.BoolFlag{
|
||||||
|
Name: "follow,f",
|
||||||
|
Usage: "follow/tail the logs",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Action: func(context *cli.Context) {
|
||||||
|
path := context.Args().First()
|
||||||
|
if path == "" {
|
||||||
|
fatal("path to the log cannot be empty", 1)
|
||||||
|
}
|
||||||
|
if err := readLogs(path, context.Bool("follow")); err != nil {
|
||||||
|
fatal(err.Error(), 1)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
func readLogs(path string, follow bool) error {
|
||||||
|
f, err := os.Open(path)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
dec := json.NewDecoder(f)
|
||||||
|
for {
|
||||||
|
var msg *containerd.Message
|
||||||
|
if err := dec.Decode(&msg); err != nil {
|
||||||
|
if err == io.EOF {
|
||||||
|
if follow {
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
switch msg.Stream {
|
||||||
|
case "stdout":
|
||||||
|
os.Stdout.Write(msg.Data)
|
||||||
|
case "stderr":
|
||||||
|
os.Stderr.Write(msg.Data)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -34,9 +34,10 @@ func main() {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
app.Commands = []cli.Command{
|
app.Commands = []cli.Command{
|
||||||
ContainersCommand,
|
|
||||||
CheckpointCommand,
|
CheckpointCommand,
|
||||||
|
ContainersCommand,
|
||||||
EventsCommand,
|
EventsCommand,
|
||||||
|
LogsCommand,
|
||||||
}
|
}
|
||||||
app.Before = func(context *cli.Context) error {
|
app.Before = func(context *cli.Context) error {
|
||||||
if context.GlobalBool("debug") {
|
if context.GlobalBool("debug") {
|
||||||
|
|
|
@ -10,10 +10,15 @@ type DeleteEvent struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *DeleteEvent) Handle(e *Event) error {
|
func (h *DeleteEvent) Handle(e *Event) error {
|
||||||
if container, ok := h.s.containers[e.ID]; ok {
|
if i, ok := h.s.containers[e.ID]; ok {
|
||||||
if err := h.deleteContainer(container); err != nil {
|
if err := h.deleteContainer(i.container); err != nil {
|
||||||
logrus.WithField("error", err).Error("containerd: deleting container")
|
logrus.WithField("error", err).Error("containerd: deleting container")
|
||||||
}
|
}
|
||||||
|
if i.logger != nil {
|
||||||
|
if err := i.logger.Close(); err != nil {
|
||||||
|
logrus.WithField("error", err).Error("containerd: close container logger")
|
||||||
|
}
|
||||||
|
}
|
||||||
h.s.notifySubscribers(&Event{
|
h.s.notifySubscribers(&Event{
|
||||||
Type: ExitEventType,
|
Type: ExitEventType,
|
||||||
ID: e.ID,
|
ID: e.ID,
|
||||||
|
|
1
event.go
1
event.go
|
@ -36,7 +36,6 @@ type Event struct {
|
||||||
Timestamp time.Time
|
Timestamp time.Time
|
||||||
ID string
|
ID string
|
||||||
BundlePath string
|
BundlePath string
|
||||||
Stdio *runtime.Stdio
|
|
||||||
Pid int
|
Pid int
|
||||||
Status int
|
Status int
|
||||||
Signal os.Signal
|
Signal os.Signal
|
||||||
|
|
11
exit.go
11
exit.go
|
@ -10,9 +10,9 @@ func (h *ExitEvent) Handle(e *Event) error {
|
||||||
logrus.WithFields(logrus.Fields{"pid": e.Pid, "status": e.Status}).
|
logrus.WithFields(logrus.Fields{"pid": e.Pid, "status": e.Status}).
|
||||||
Debug("containerd: process exited")
|
Debug("containerd: process exited")
|
||||||
// is it the child process of a container
|
// is it the child process of a container
|
||||||
if container, ok := h.s.processes[e.Pid]; ok {
|
if info, ok := h.s.processes[e.Pid]; ok {
|
||||||
ne := NewEvent(ExecExitEventType)
|
ne := NewEvent(ExecExitEventType)
|
||||||
ne.ID = container.ID()
|
ne.ID = info.container.ID()
|
||||||
ne.Pid = e.Pid
|
ne.Pid = e.Pid
|
||||||
ne.Status = e.Status
|
ne.Status = e.Status
|
||||||
h.s.SendEvent(ne)
|
h.s.SendEvent(ne)
|
||||||
|
@ -42,10 +42,13 @@ type ExecExitEvent struct {
|
||||||
|
|
||||||
func (h *ExecExitEvent) Handle(e *Event) error {
|
func (h *ExecExitEvent) Handle(e *Event) error {
|
||||||
// exec process: we remove this process without notifying the main event loop
|
// exec process: we remove this process without notifying the main event loop
|
||||||
container := h.s.processes[e.Pid]
|
info := h.s.processes[e.Pid]
|
||||||
if err := container.RemoveProcess(e.Pid); err != nil {
|
if err := info.container.RemoveProcess(e.Pid); err != nil {
|
||||||
logrus.WithField("error", err).Error("containerd: find container for pid")
|
logrus.WithField("error", err).Error("containerd: find container for pid")
|
||||||
}
|
}
|
||||||
|
if err := info.logger.Close(); err != nil {
|
||||||
|
logrus.WithField("error", err).Error("containerd: close process IO")
|
||||||
|
}
|
||||||
delete(h.s.processes, e.Pid)
|
delete(h.s.processes, e.Pid)
|
||||||
h.s.notifySubscribers(e)
|
h.s.notifySubscribers(e)
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -5,8 +5,8 @@ type GetContainersEvent struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *GetContainersEvent) Handle(e *Event) error {
|
func (h *GetContainersEvent) Handle(e *Event) error {
|
||||||
for _, c := range h.s.containers {
|
for _, i := range h.s.containers {
|
||||||
e.Containers = append(e.Containers, c)
|
e.Containers = append(e.Containers, i.container)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -185,6 +185,15 @@ func (p *libcontainerProcess) Signal(s os.Signal) error {
|
||||||
return p.process.Signal(s)
|
return p.process.Signal(s)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *libcontainerProcess) Close() error {
|
||||||
|
// in close we always need to call wait to close/flush any pipes
|
||||||
|
_, err := p.process.Wait()
|
||||||
|
p.process.Stdin.(io.Closer).Close()
|
||||||
|
p.process.Stdout.(io.Closer).Close()
|
||||||
|
p.process.Stderr.(io.Closer).Close()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
type libcontainerContainer struct {
|
type libcontainerContainer struct {
|
||||||
c libcontainer.Container
|
c libcontainer.Container
|
||||||
initProcess *libcontainerProcess
|
initProcess *libcontainerProcess
|
||||||
|
@ -306,6 +315,7 @@ func (c *libcontainerContainer) SetExited(status int) {
|
||||||
c.exitStatus = status
|
c.exitStatus = status
|
||||||
// meh
|
// meh
|
||||||
c.exited = true
|
c.exited = true
|
||||||
|
c.initProcess.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *libcontainerContainer) Stats() (*runtime.Stat, error) {
|
func (c *libcontainerContainer) Stats() (*runtime.Stat, error) {
|
||||||
|
@ -335,11 +345,13 @@ func (c *libcontainerContainer) Processes() ([]runtime.Process, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *libcontainerContainer) RemoveProcess(pid int) error {
|
func (c *libcontainerContainer) RemoveProcess(pid int) error {
|
||||||
if _, ok := c.additionalProcesses[pid]; !ok {
|
proc, ok := c.additionalProcesses[pid]
|
||||||
|
if !ok {
|
||||||
return runtime.ErrNotChildProcess
|
return runtime.ErrNotChildProcess
|
||||||
}
|
}
|
||||||
|
err := proc.Close()
|
||||||
delete(c.additionalProcesses, pid)
|
delete(c.additionalProcesses, pid)
|
||||||
return nil
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewRuntime(stateDir string) (runtime.Runtime, error) {
|
func NewRuntime(stateDir string) (runtime.Runtime, error) {
|
||||||
|
@ -363,25 +375,29 @@ func (r *libcontainerRuntime) Type() string {
|
||||||
return "libcontainer"
|
return "libcontainer"
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *libcontainerRuntime) Create(id, bundlePath string, stdio *runtime.Stdio) (runtime.Container, error) {
|
func (r *libcontainerRuntime) Create(id, bundlePath string) (runtime.Container, *runtime.IO, error) {
|
||||||
spec, rspec, err := r.loadSpec(
|
spec, rspec, err := r.loadSpec(
|
||||||
filepath.Join(bundlePath, "config.json"),
|
filepath.Join(bundlePath, "config.json"),
|
||||||
filepath.Join(bundlePath, "runtime.json"),
|
filepath.Join(bundlePath, "runtime.json"),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
config, err := r.createLibcontainerConfig(id, bundlePath, spec, rspec)
|
config, err := r.createLibcontainerConfig(id, bundlePath, spec, rspec)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
container, err := r.factory.Create(id, config)
|
container, err := r.factory.Create(id, config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("create container: %v", err)
|
return nil, nil, fmt.Errorf("create container: %v", err)
|
||||||
}
|
}
|
||||||
process, err := r.newProcess(spec.Process, stdio)
|
process, err := r.newProcess(spec.Process)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
i, err := process.InitializeIO(int(spec.Process.User.UID))
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
c := &libcontainerContainer{
|
c := &libcontainerContainer{
|
||||||
c: container,
|
c: container,
|
||||||
|
@ -392,20 +408,28 @@ func (r *libcontainerRuntime) Create(id, bundlePath string, stdio *runtime.Stdio
|
||||||
},
|
},
|
||||||
path: bundlePath,
|
path: bundlePath,
|
||||||
}
|
}
|
||||||
return c, nil
|
return c, &runtime.IO{
|
||||||
|
Stdin: i.Stdin,
|
||||||
|
Stdout: i.Stdout,
|
||||||
|
Stderr: i.Stderr,
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *libcontainerRuntime) StartProcess(ci runtime.Container, p specs.Process, stdio *runtime.Stdio) (runtime.Process, error) {
|
func (r *libcontainerRuntime) StartProcess(ci runtime.Container, p specs.Process) (runtime.Process, *runtime.IO, error) {
|
||||||
c, ok := ci.(*libcontainerContainer)
|
c, ok := ci.(*libcontainerContainer)
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, runtime.ErrInvalidContainerType
|
return nil, nil, runtime.ErrInvalidContainerType
|
||||||
}
|
}
|
||||||
process, err := r.newProcess(p, stdio)
|
process, err := r.newProcess(p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
i, err := process.InitializeIO(int(p.User.UID))
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
if err := c.c.Start(process); err != nil {
|
if err := c.c.Start(process); err != nil {
|
||||||
return nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
lp := &libcontainerProcess{
|
lp := &libcontainerProcess{
|
||||||
process: process,
|
process: process,
|
||||||
|
@ -413,33 +437,22 @@ func (r *libcontainerRuntime) StartProcess(ci runtime.Container, p specs.Process
|
||||||
}
|
}
|
||||||
pid, err := process.Pid()
|
pid, err := process.Pid()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
c.additionalProcesses[pid] = lp
|
c.additionalProcesses[pid] = lp
|
||||||
return lp, nil
|
return lp, &runtime.IO{
|
||||||
|
Stdin: i.Stdin,
|
||||||
|
Stdout: i.Stdout,
|
||||||
|
Stderr: i.Stderr,
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// newProcess returns a new libcontainer Process with the arguments from the
|
// newProcess returns a new libcontainer Process with the arguments from the
|
||||||
// spec and stdio from the current process.
|
// spec and stdio from the current process.
|
||||||
func (r *libcontainerRuntime) newProcess(p specs.Process, stdio *runtime.Stdio) (*libcontainer.Process, error) {
|
func (r *libcontainerRuntime) newProcess(p specs.Process) (*libcontainer.Process, error) {
|
||||||
var (
|
// TODO: support terminals
|
||||||
stderr, stdout io.Writer
|
if p.Terminal {
|
||||||
)
|
return nil, runtime.ErrTerminalsNotSupported
|
||||||
if stdio != nil {
|
|
||||||
if stdio.Stdout != "" {
|
|
||||||
f, err := os.OpenFile(stdio.Stdout, os.O_CREATE|os.O_WRONLY, 0755)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("open stdout: %v", err)
|
|
||||||
}
|
|
||||||
stdout = f
|
|
||||||
}
|
|
||||||
if stdio.Stderr != "" {
|
|
||||||
f, err := os.OpenFile(stdio.Stderr, os.O_CREATE|os.O_WRONLY, 0755)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("open stderr: %v", err)
|
|
||||||
}
|
|
||||||
stderr = f
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return &libcontainer.Process{
|
return &libcontainer.Process{
|
||||||
Args: p.Args,
|
Args: p.Args,
|
||||||
|
@ -447,8 +460,6 @@ func (r *libcontainerRuntime) newProcess(p specs.Process, stdio *runtime.Stdio)
|
||||||
// TODO: fix libcontainer's API to better support uid/gid in a typesafe way.
|
// TODO: fix libcontainer's API to better support uid/gid in a typesafe way.
|
||||||
User: fmt.Sprintf("%d:%d", p.User.UID, p.User.GID),
|
User: fmt.Sprintf("%d:%d", p.User.UID, p.User.GID),
|
||||||
Cwd: p.Cwd,
|
Cwd: p.Cwd,
|
||||||
Stderr: stderr,
|
|
||||||
Stdout: stdout,
|
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
110
log.go
Normal file
110
log.go
Normal file
|
@ -0,0 +1,110 @@
|
||||||
|
package containerd
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/Sirupsen/logrus"
|
||||||
|
)
|
||||||
|
|
||||||
|
type logConfig struct {
|
||||||
|
BundlePath string
|
||||||
|
LogSize int64 // in bytes
|
||||||
|
Stdin io.WriteCloser
|
||||||
|
Stdout io.ReadCloser
|
||||||
|
Stderr io.ReadCloser
|
||||||
|
}
|
||||||
|
|
||||||
|
func newLogger(i *logConfig) (*logger, error) {
|
||||||
|
l := &logger{
|
||||||
|
config: i,
|
||||||
|
messages: make(chan *Message, DefaultBufferSize),
|
||||||
|
}
|
||||||
|
f, err := os.OpenFile(
|
||||||
|
filepath.Join(l.config.BundlePath, "logs.json"),
|
||||||
|
os.O_CREATE|os.O_WRONLY|os.O_APPEND,
|
||||||
|
0655,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
l.f = f
|
||||||
|
hout := &logHandler{
|
||||||
|
stream: "stdout",
|
||||||
|
messages: l.messages,
|
||||||
|
}
|
||||||
|
herr := &logHandler{
|
||||||
|
stream: "stderr",
|
||||||
|
messages: l.messages,
|
||||||
|
}
|
||||||
|
go func() {
|
||||||
|
io.Copy(hout, i.Stdout)
|
||||||
|
}()
|
||||||
|
go func() {
|
||||||
|
io.Copy(herr, i.Stderr)
|
||||||
|
}()
|
||||||
|
l.start()
|
||||||
|
return l, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type Message struct {
|
||||||
|
Stream string `json:"stream"`
|
||||||
|
Timestamp time.Time `json:"timestamp"`
|
||||||
|
Data []byte `json:"data"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type logger struct {
|
||||||
|
config *logConfig
|
||||||
|
f *os.File
|
||||||
|
wg sync.WaitGroup
|
||||||
|
messages chan *Message
|
||||||
|
}
|
||||||
|
|
||||||
|
type logHandler struct {
|
||||||
|
stream string
|
||||||
|
messages chan *Message
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *logHandler) Write(b []byte) (int, error) {
|
||||||
|
h.messages <- &Message{
|
||||||
|
Stream: h.stream,
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
Data: b,
|
||||||
|
}
|
||||||
|
return len(b), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *logger) start() {
|
||||||
|
l.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
l.wg.Done()
|
||||||
|
enc := json.NewEncoder(l.f)
|
||||||
|
for m := range l.messages {
|
||||||
|
if err := enc.Encode(m); err != nil {
|
||||||
|
logrus.WithField("error", err).Error("write log message")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *logger) Close() (err error) {
|
||||||
|
for _, c := range []io.Closer{
|
||||||
|
l.config.Stdin,
|
||||||
|
l.config.Stdout,
|
||||||
|
l.config.Stderr,
|
||||||
|
} {
|
||||||
|
if cerr := c.Close(); err == nil {
|
||||||
|
err = cerr
|
||||||
|
}
|
||||||
|
}
|
||||||
|
close(l.messages)
|
||||||
|
l.wg.Wait()
|
||||||
|
if ferr := l.f.Close(); err == nil {
|
||||||
|
err = ferr
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
|
@ -1,6 +1,7 @@
|
||||||
package runtime
|
package runtime
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"io"
|
||||||
"os"
|
"os"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -8,6 +9,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type Process interface {
|
type Process interface {
|
||||||
|
io.Closer
|
||||||
Pid() (int, error)
|
Pid() (int, error)
|
||||||
Spec() specs.Process
|
Spec() specs.Process
|
||||||
Signal(os.Signal) error
|
Signal(os.Signal) error
|
||||||
|
@ -24,9 +26,24 @@ type State struct {
|
||||||
Status Status
|
Status Status
|
||||||
}
|
}
|
||||||
|
|
||||||
type Stdio struct {
|
type IO struct {
|
||||||
Stderr string
|
Stdin io.WriteCloser
|
||||||
Stdout string
|
Stdout io.ReadCloser
|
||||||
|
Stderr io.ReadCloser
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *IO) Close() error {
|
||||||
|
var oerr error
|
||||||
|
for _, c := range []io.Closer{
|
||||||
|
i.Stdin,
|
||||||
|
i.Stdout,
|
||||||
|
i.Stderr,
|
||||||
|
} {
|
||||||
|
if err := c.Close(); oerr == nil {
|
||||||
|
oerr = err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return oerr
|
||||||
}
|
}
|
||||||
|
|
||||||
type Stat struct {
|
type Stat struct {
|
||||||
|
|
|
@ -12,13 +12,15 @@ var (
|
||||||
ErrCheckpointNotExists = errors.New("containerd: checkpoint does not exist for container")
|
ErrCheckpointNotExists = errors.New("containerd: checkpoint does not exist for container")
|
||||||
ErrCheckpointExists = errors.New("containerd: checkpoint already exists")
|
ErrCheckpointExists = errors.New("containerd: checkpoint already exists")
|
||||||
ErrContainerExited = errors.New("containerd: container has exited")
|
ErrContainerExited = errors.New("containerd: container has exited")
|
||||||
|
ErrTerminalsNotSupported = errors.New("containerd: terminals are not supported for runtime")
|
||||||
)
|
)
|
||||||
|
|
||||||
// Runtime handles containers, containers handle their own actions
|
// Runtime handles containers, containers handle their own actions
|
||||||
type Runtime interface {
|
type Runtime interface {
|
||||||
// Create creates a new container initialized but without it starting it
|
// Type of the runtime
|
||||||
Create(id, bundlePath string, stdio *Stdio) (Container, error)
|
|
||||||
// StartProcess adds a new process to the container
|
|
||||||
StartProcess(Container, specs.Process, *Stdio) (Process, error)
|
|
||||||
Type() string
|
Type() string
|
||||||
|
// Create creates a new container initialized but without it starting it
|
||||||
|
Create(id, bundlePath string) (Container, *IO, error)
|
||||||
|
// StartProcess adds a new process to the container
|
||||||
|
StartProcess(Container, specs.Process) (Process, *IO, error)
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,11 +5,11 @@ type SignalEvent struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *SignalEvent) Handle(e *Event) error {
|
func (h *SignalEvent) Handle(e *Event) error {
|
||||||
container, ok := h.s.containers[e.ID]
|
i, ok := h.s.containers[e.ID]
|
||||||
if !ok {
|
if !ok {
|
||||||
return ErrContainerNotFound
|
return ErrContainerNotFound
|
||||||
}
|
}
|
||||||
processes, err := container.Processes()
|
processes, err := i.container.Processes()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
7
start.go
7
start.go
|
@ -5,15 +5,18 @@ type StartEvent struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *StartEvent) Handle(e *Event) error {
|
func (h *StartEvent) Handle(e *Event) error {
|
||||||
container, err := h.s.runtime.Create(e.ID, e.BundlePath, e.Stdio)
|
container, io, err := h.s.runtime.Create(e.ID, e.BundlePath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
h.s.containerGroup.Add(1)
|
h.s.containerGroup.Add(1)
|
||||||
h.s.containers[e.ID] = container
|
h.s.containers[e.ID] = &containerInfo{
|
||||||
|
container: container,
|
||||||
|
}
|
||||||
ContainersCounter.Inc(1)
|
ContainersCounter.Inc(1)
|
||||||
task := &StartTask{
|
task := &StartTask{
|
||||||
Err: e.Err,
|
Err: e.Err,
|
||||||
|
IO: io,
|
||||||
Container: container,
|
Container: container,
|
||||||
}
|
}
|
||||||
if e.Checkpoint != nil {
|
if e.Checkpoint != nil {
|
||||||
|
|
|
@ -29,8 +29,8 @@ func NewSupervisor(id, stateDir string, tasks chan *StartTask) (*Supervisor, err
|
||||||
}
|
}
|
||||||
s := &Supervisor{
|
s := &Supervisor{
|
||||||
stateDir: stateDir,
|
stateDir: stateDir,
|
||||||
containers: make(map[string]runtime.Container),
|
containers: make(map[string]*containerInfo),
|
||||||
processes: make(map[int]runtime.Container),
|
processes: make(map[int]*containerInfo),
|
||||||
runtime: r,
|
runtime: r,
|
||||||
tasks: tasks,
|
tasks: tasks,
|
||||||
events: make(chan *Event, DefaultBufferSize),
|
events: make(chan *Event, DefaultBufferSize),
|
||||||
|
@ -54,11 +54,16 @@ func NewSupervisor(id, stateDir string, tasks chan *StartTask) (*Supervisor, err
|
||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type containerInfo struct {
|
||||||
|
container runtime.Container
|
||||||
|
logger *logger
|
||||||
|
}
|
||||||
|
|
||||||
type Supervisor struct {
|
type Supervisor struct {
|
||||||
// stateDir is the directory on the system to store container runtime state information.
|
// stateDir is the directory on the system to store container runtime state information.
|
||||||
stateDir string
|
stateDir string
|
||||||
containers map[string]runtime.Container
|
containers map[string]*containerInfo
|
||||||
processes map[int]runtime.Container
|
processes map[int]*containerInfo
|
||||||
handlers map[EventType]Handler
|
handlers map[EventType]Handler
|
||||||
runtime runtime.Runtime
|
runtime runtime.Runtime
|
||||||
events chan *Event
|
events chan *Event
|
||||||
|
@ -78,7 +83,8 @@ func (s *Supervisor) Stop(sig chan os.Signal) {
|
||||||
// Close the tasks channel so that no new containers get started
|
// Close the tasks channel so that no new containers get started
|
||||||
close(s.tasks)
|
close(s.tasks)
|
||||||
// send a SIGTERM to all containers
|
// send a SIGTERM to all containers
|
||||||
for id, c := range s.containers {
|
for id, i := range s.containers {
|
||||||
|
c := i.container
|
||||||
logrus.WithField("id", id).Debug("sending TERM to container processes")
|
logrus.WithField("id", id).Debug("sending TERM to container processes")
|
||||||
procs, err := c.Processes()
|
procs, err := c.Processes()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -193,7 +199,8 @@ func (s *Supervisor) Machine() Machine {
|
||||||
// getContainerForPid returns the container where the provided pid is the pid1 or main
|
// getContainerForPid returns the container where the provided pid is the pid1 or main
|
||||||
// process in the container
|
// process in the container
|
||||||
func (s *Supervisor) getContainerForPid(pid int) (runtime.Container, error) {
|
func (s *Supervisor) getContainerForPid(pid int) (runtime.Container, error) {
|
||||||
for _, container := range s.containers {
|
for _, i := range s.containers {
|
||||||
|
container := i.container
|
||||||
cpid, err := container.Pid()
|
cpid, err := container.Pid()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if lerr, ok := err.(libcontainer.Error); ok {
|
if lerr, ok := err.(libcontainer.Error); ok {
|
||||||
|
@ -214,3 +221,17 @@ func (s *Supervisor) getContainerForPid(pid int) (runtime.Container, error) {
|
||||||
func (s *Supervisor) SendEvent(evt *Event) {
|
func (s *Supervisor) SendEvent(evt *Event) {
|
||||||
s.events <- evt
|
s.events <- evt
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Supervisor) log(path string, i *runtime.IO) (*logger, error) {
|
||||||
|
config := &logConfig{
|
||||||
|
BundlePath: path,
|
||||||
|
Stdin: i.Stdin,
|
||||||
|
Stdout: i.Stdout,
|
||||||
|
Stderr: i.Stderr,
|
||||||
|
}
|
||||||
|
l, err := newLogger(config)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return l, nil
|
||||||
|
}
|
||||||
|
|
|
@ -7,10 +7,11 @@ type UpdateEvent struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *UpdateEvent) Handle(e *Event) error {
|
func (h *UpdateEvent) Handle(e *Event) error {
|
||||||
container, ok := h.s.containers[e.ID]
|
i, ok := h.s.containers[e.ID]
|
||||||
if !ok {
|
if !ok {
|
||||||
return ErrContainerNotFound
|
return ErrContainerNotFound
|
||||||
}
|
}
|
||||||
|
container := i.container
|
||||||
if e.State.Status != "" {
|
if e.State.Status != "" {
|
||||||
switch e.State.Status {
|
switch e.State.Status {
|
||||||
case runtime.Running:
|
case runtime.Running:
|
||||||
|
|
11
worker.go
11
worker.go
|
@ -14,6 +14,7 @@ type Worker interface {
|
||||||
type StartTask struct {
|
type StartTask struct {
|
||||||
Container runtime.Container
|
Container runtime.Container
|
||||||
Checkpoint string
|
Checkpoint string
|
||||||
|
IO *runtime.IO
|
||||||
Err chan error
|
Err chan error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -33,6 +34,16 @@ func (w *worker) Start() {
|
||||||
defer w.wg.Done()
|
defer w.wg.Done()
|
||||||
for t := range w.s.tasks {
|
for t := range w.s.tasks {
|
||||||
started := time.Now()
|
started := time.Now()
|
||||||
|
// start logging the container's stdio
|
||||||
|
l, err := w.s.log(t.Container.Path(), t.IO)
|
||||||
|
if err != nil {
|
||||||
|
evt := NewEvent(DeleteEventType)
|
||||||
|
evt.ID = t.Container.ID()
|
||||||
|
w.s.SendEvent(evt)
|
||||||
|
t.Err <- err
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
w.s.containers[t.Container.ID()].logger = l
|
||||||
if t.Checkpoint != "" {
|
if t.Checkpoint != "" {
|
||||||
if err := t.Container.Restore(t.Checkpoint); err != nil {
|
if err := t.Container.Restore(t.Checkpoint); err != nil {
|
||||||
evt := NewEvent(DeleteEventType)
|
evt := NewEvent(DeleteEventType)
|
||||||
|
|
Loading…
Reference in a new issue