Remove daemon subcommand
Removing subcommands for the containerd binary because we will have a separate ctctl or other named binary for client actions. Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
parent
8f8f365787
commit
ba46df11b4
3 changed files with 102 additions and 205 deletions
|
@ -1,114 +0,0 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
"runtime"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/codegangsta/cli"
|
||||
"github.com/docker/containerd"
|
||||
"github.com/docker/containerd/api/v1"
|
||||
"github.com/opencontainers/runc/libcontainer/utils"
|
||||
"github.com/rcrowley/go-metrics"
|
||||
)
|
||||
|
||||
var DaemonCommand = cli.Command{
|
||||
Name: "daemon",
|
||||
Flags: []cli.Flag{
|
||||
cli.StringFlag{
|
||||
Name: "state-dir",
|
||||
Value: "/run/containerd",
|
||||
Usage: "runtime state directory",
|
||||
},
|
||||
cli.IntFlag{
|
||||
Name: "buffer-size",
|
||||
Value: 2048,
|
||||
Usage: "set the channel buffer size for events and signals",
|
||||
},
|
||||
},
|
||||
Action: func(context *cli.Context) {
|
||||
if context.GlobalBool("debug") {
|
||||
l := log.New(os.Stdout, "[containerd] ", log.LstdFlags)
|
||||
goRoutineCounter := metrics.NewGauge()
|
||||
metrics.DefaultRegistry.Register("goroutines", goRoutineCounter)
|
||||
for name, m := range containerd.Metrics() {
|
||||
if err := metrics.DefaultRegistry.Register(name, m); err != nil {
|
||||
logrus.Fatal(err)
|
||||
}
|
||||
}
|
||||
go func() {
|
||||
for range time.Tick(30 * time.Second) {
|
||||
goRoutineCounter.Update(int64(runtime.NumGoroutine()))
|
||||
}
|
||||
}()
|
||||
go metrics.Log(metrics.DefaultRegistry, 60*time.Second, l)
|
||||
}
|
||||
if err := daemon(context.String("state-dir"), 10, context.Int("buffer-size")); err != nil {
|
||||
logrus.Fatal(err)
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
func daemon(stateDir string, concurrency, bufferSize int) error {
|
||||
supervisor, err := containerd.NewSupervisor(stateDir, concurrency)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
events := make(chan *containerd.Event, bufferSize)
|
||||
// start the signal handler in the background.
|
||||
go startSignalHandler(supervisor, bufferSize)
|
||||
if err := supervisor.Start(events); err != nil {
|
||||
return err
|
||||
}
|
||||
server := v1.NewServer(supervisor)
|
||||
return http.ListenAndServe("localhost:8888", server)
|
||||
}
|
||||
|
||||
func startSignalHandler(supervisor *containerd.Supervisor, bufferSize int) {
|
||||
logrus.Debug("containerd: starting signal handler")
|
||||
signals := make(chan os.Signal, bufferSize)
|
||||
signal.Notify(signals)
|
||||
for s := range signals {
|
||||
switch s {
|
||||
case syscall.SIGTERM, syscall.SIGINT, syscall.SIGSTOP:
|
||||
supervisor.Close()
|
||||
os.Exit(0)
|
||||
case syscall.SIGCHLD:
|
||||
exits, err := reap()
|
||||
if err != nil {
|
||||
logrus.WithField("error", err).Error("containerd: reaping child processes")
|
||||
}
|
||||
for _, e := range exits {
|
||||
supervisor.SendEvent(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func reap() (exits []*containerd.Event, err error) {
|
||||
var (
|
||||
ws syscall.WaitStatus
|
||||
rus syscall.Rusage
|
||||
)
|
||||
for {
|
||||
pid, err := syscall.Wait4(-1, &ws, syscall.WNOHANG, &rus)
|
||||
if err != nil {
|
||||
if err == syscall.ECHILD {
|
||||
return exits, nil
|
||||
}
|
||||
return exits, err
|
||||
}
|
||||
if pid <= 0 {
|
||||
return exits, nil
|
||||
}
|
||||
e := containerd.NewEvent(containerd.ExitEventType)
|
||||
e.Pid = pid
|
||||
e.Status = utils.ExitStatus(ws)
|
||||
exits = append(exits, e)
|
||||
}
|
||||
}
|
|
@ -1,85 +0,0 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/codegangsta/cli"
|
||||
"github.com/docker/containerd"
|
||||
)
|
||||
|
||||
var JournalCommand = cli.Command{
|
||||
Name: "journal",
|
||||
Usage: "interact with the containerd journal",
|
||||
Subcommands: []cli.Command{
|
||||
JournalReplyCommand,
|
||||
},
|
||||
}
|
||||
|
||||
var JournalReplyCommand = cli.Command{
|
||||
Name: "replay",
|
||||
Usage: "replay a journal to get containerd's state syncronized after a crash",
|
||||
Flags: []cli.Flag{
|
||||
cli.StringFlag{
|
||||
Name: "addr",
|
||||
Value: "localhost:8888",
|
||||
Usage: "address of the containerd daemon",
|
||||
},
|
||||
},
|
||||
Action: func(context *cli.Context) {
|
||||
if err := replay(context.Args().First(), context.String("addr")); err != nil {
|
||||
logrus.Fatal(err)
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
func replay(path, addr string) error {
|
||||
f, err := os.Open(path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
dec := json.NewDecoder(f)
|
||||
var events []*containerd.Event
|
||||
type entry struct {
|
||||
Event *containerd.Event `json:"event"`
|
||||
}
|
||||
for dec.More() {
|
||||
var e entry
|
||||
if err := dec.Decode(&e); err != nil {
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
return err
|
||||
}
|
||||
events = append(events, e.Event)
|
||||
}
|
||||
c := &http.Client{}
|
||||
for _, e := range events {
|
||||
switch e.Type {
|
||||
case containerd.ExitEventType, containerd.DeleteEventType:
|
||||
// ignore these types of events
|
||||
continue
|
||||
}
|
||||
data, err := json.Marshal(e)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
fmt.Printf("sending %q event\n", e.Type)
|
||||
r, err := c.Post("http://"+filepath.Join(addr, "event"), "application/json", bytes.NewBuffer(data))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if r.Body != nil {
|
||||
io.Copy(os.Stdout, r.Body)
|
||||
r.Body.Close()
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
|
@ -1,10 +1,20 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
"runtime"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/codegangsta/cli"
|
||||
"github.com/docker/containerd"
|
||||
"github.com/docker/containerd/api/v1"
|
||||
"github.com/opencontainers/runc/libcontainer/utils"
|
||||
"github.com/rcrowley/go-metrics"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -23,21 +33,107 @@ func main() {
|
|||
Email: "crosbymichael@gmail.com",
|
||||
},
|
||||
}
|
||||
app.Commands = []cli.Command{
|
||||
DaemonCommand,
|
||||
JournalCommand,
|
||||
}
|
||||
app.Flags = []cli.Flag{
|
||||
cli.BoolFlag{Name: "debug", Usage: "enable debug output in the logs"},
|
||||
// cli.StringFlag{Name: "metrics", Value: "stdout", Usage: "metrics file"},
|
||||
cli.BoolFlag{
|
||||
Name: "debug",
|
||||
Usage: "enable debug output in the logs",
|
||||
},
|
||||
cli.StringFlag{
|
||||
Name: "state-dir",
|
||||
Value: "/run/containerd",
|
||||
Usage: "runtime state directory",
|
||||
},
|
||||
cli.IntFlag{
|
||||
Name: "buffer-size",
|
||||
Value: 2048,
|
||||
Usage: "set the channel buffer size for events and signals",
|
||||
},
|
||||
}
|
||||
app.Before = func(context *cli.Context) error {
|
||||
if context.GlobalBool("debug") {
|
||||
logrus.SetLevel(logrus.DebugLevel)
|
||||
l := log.New(os.Stdout, "[containerd] ", log.LstdFlags)
|
||||
goRoutineCounter := metrics.NewGauge()
|
||||
metrics.DefaultRegistry.Register("goroutines", goRoutineCounter)
|
||||
for name, m := range containerd.Metrics() {
|
||||
if err := metrics.DefaultRegistry.Register(name, m); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
go func() {
|
||||
for range time.Tick(30 * time.Second) {
|
||||
goRoutineCounter.Update(int64(runtime.NumGoroutine()))
|
||||
}
|
||||
}()
|
||||
go metrics.Log(metrics.DefaultRegistry, 60*time.Second, l)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
app.Action = func(context *cli.Context) {
|
||||
if err := daemon(context.String("state-dir"), 10, context.Int("buffer-size")); err != nil {
|
||||
logrus.Fatal(err)
|
||||
}
|
||||
}
|
||||
if err := app.Run(os.Args); err != nil {
|
||||
logrus.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func daemon(stateDir string, concurrency, bufferSize int) error {
|
||||
supervisor, err := containerd.NewSupervisor(stateDir, concurrency)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
events := make(chan *containerd.Event, bufferSize)
|
||||
// start the signal handler in the background.
|
||||
go startSignalHandler(supervisor, bufferSize)
|
||||
if err := supervisor.Start(events); err != nil {
|
||||
return err
|
||||
}
|
||||
server := v1.NewServer(supervisor)
|
||||
return http.ListenAndServe("localhost:8888", server)
|
||||
}
|
||||
|
||||
func startSignalHandler(supervisor *containerd.Supervisor, bufferSize int) {
|
||||
logrus.Debug("containerd: starting signal handler")
|
||||
signals := make(chan os.Signal, bufferSize)
|
||||
signal.Notify(signals)
|
||||
for s := range signals {
|
||||
switch s {
|
||||
case syscall.SIGTERM, syscall.SIGINT, syscall.SIGSTOP:
|
||||
supervisor.Close()
|
||||
os.Exit(0)
|
||||
case syscall.SIGCHLD:
|
||||
exits, err := reap()
|
||||
if err != nil {
|
||||
logrus.WithField("error", err).Error("containerd: reaping child processes")
|
||||
}
|
||||
for _, e := range exits {
|
||||
supervisor.SendEvent(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func reap() (exits []*containerd.Event, err error) {
|
||||
var (
|
||||
ws syscall.WaitStatus
|
||||
rus syscall.Rusage
|
||||
)
|
||||
for {
|
||||
pid, err := syscall.Wait4(-1, &ws, syscall.WNOHANG, &rus)
|
||||
if err != nil {
|
||||
if err == syscall.ECHILD {
|
||||
return exits, nil
|
||||
}
|
||||
return exits, err
|
||||
}
|
||||
if pid <= 0 {
|
||||
return exits, nil
|
||||
}
|
||||
e := containerd.NewEvent(containerd.ExitEventType)
|
||||
e.Pid = pid
|
||||
e.Status = utils.ExitStatus(ws)
|
||||
exits = append(exits, e)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue