From ca4191ce41ec839fefcacfef438efa3790d8d5ce Mon Sep 17 00:00:00 2001 From: Alexander Morozov Date: Mon, 14 Dec 2015 15:54:11 -0800 Subject: [PATCH] Use unix-socket as communication channel Signed-off-by: Alexander Morozov Signed-off-by: Michael Crosby Conflicts: ctr/container.go --- containerd/main.go | 14 ++++++++++++-- ctr/checkpoint.go | 6 +++--- ctr/container.go | 22 +++++++++++++++------- ctr/events.go | 2 +- ctr/main.go | 6 +++--- hack/benchmark.go | 18 +++++++++++++----- 6 files changed, 47 insertions(+), 21 deletions(-) diff --git a/containerd/main.go b/containerd/main.go index 8c6ea83..7aea853 100644 --- a/containerd/main.go +++ b/containerd/main.go @@ -57,6 +57,11 @@ var daemonFlags = []cli.Flag{ Value: 60 * time.Second, Usage: "interval for flushing metrics to the store", }, + cli.StringFlag{ + Name: "listen,l", + Value: "/run/containerd/containerd.sock", + Usage: "Address on which GRPC API will listen", + }, } func main() { @@ -81,6 +86,7 @@ func main() { app.Action = func(context *cli.Context) { if err := daemon( context.String("id"), + context.String("listen"), context.String("state-dir"), context.Int("concurrency"), ); err != nil { @@ -140,7 +146,7 @@ func processMetrics() { }() } -func daemon(id, stateDir string, concurrency int) error { +func daemon(id, address, stateDir string, concurrency int) error { tasks := make(chan *containerd.StartTask, concurrency*100) supervisor, err := containerd.NewSupervisor(id, stateDir, tasks) if err != nil { @@ -166,12 +172,16 @@ func daemon(id, stateDir string, concurrency int) error { if err := supervisor.Start(); err != nil { return err } - l, err := net.Listen("tcp", ":8888") + if err := os.RemoveAll(address); err != nil { + return err + } + l, err := net.Listen("unix", address) if err != nil { return err } s := grpc.NewServer() types.RegisterAPIServer(s, server.NewServer(supervisor)) + logrus.Debugf("GRPC API listen on %s", address) return s.Serve(l) } diff --git a/ctr/checkpoint.go b/ctr/checkpoint.go index 5960640..2d13e48 100644 --- a/ctr/checkpoint.go +++ b/ctr/checkpoint.go @@ -28,7 +28,7 @@ var ListCheckpointCommand = cli.Command{ func listCheckpoints(context *cli.Context) { var ( - c = getClient() + c = getClient(context) id = context.Args().First() ) if id == "" { @@ -82,7 +82,7 @@ var CreateCheckpointCommand = cli.Command{ if name == "" { fatal("checkpoint name cannot be empty", 1) } - c := getClient() + c := getClient(context) if _, err := c.CreateCheckpoint(netcontext.Background(), &types.CreateCheckpointRequest{ Id: containerID, Checkpoint: &types.Checkpoint{ @@ -108,7 +108,7 @@ var DeleteCheckpointCommand = cli.Command{ if name == "" { fatal("checkpoint name cannot be empty", 1) } - c := getClient() + c := getClient(context) if _, err := c.DeleteCheckpoint(netcontext.Background(), &types.DeleteCheckpointRequest{ Id: containerID, Name: name, diff --git a/ctr/container.go b/ctr/container.go index efd06cb..9348a6a 100644 --- a/ctr/container.go +++ b/ctr/container.go @@ -4,10 +4,12 @@ import ( "fmt" "io" "io/ioutil" + "net" "os" "path/filepath" "syscall" "text/tabwriter" + "time" "github.com/codegangsta/cli" "github.com/docker/containerd/api/grpc/types" @@ -18,8 +20,14 @@ import ( ) // TODO: parse flags and pass opts -func getClient() types.APIClient { - conn, err := grpc.Dial("localhost:8888", grpc.WithInsecure()) +func getClient(ctx *cli.Context) types.APIClient { + dialOpts := []grpc.DialOption{grpc.WithInsecure()} + dialOpts = append(dialOpts, + grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) { + return net.DialTimeout("unix", addr, timeout) + }, + )) + conn, err := grpc.Dial(ctx.GlobalString("address"), dialOpts...) if err != nil { fatal(err.Error(), 1) } @@ -46,7 +54,7 @@ var ListCommand = cli.Command{ } func listContainers(context *cli.Context) { - c := getClient() + c := getClient(context) resp, err := c.State(netcontext.Background(), &types.StateRequest{}) if err != nil { fatal(err.Error(), 1) @@ -90,7 +98,7 @@ var StartCommand = cli.Command{ if id == "" { fatal("container id cannot be empty", 1) } - c := getClient() + c := getClient(context) events, err := c.Events(netcontext.Background(), &types.EventsRequest{}) if err != nil { fatal(err.Error(), 1) @@ -223,7 +231,7 @@ var KillCommand = cli.Command{ if id == "" { fatal("container id cannot be empty", 1) } - c := getClient() + c := getClient(context) if _, err := c.Signal(netcontext.Background(), &types.SignalRequest{ Id: id, Pid: uint32(context.Int("pid")), @@ -276,7 +284,7 @@ var ExecCommand = cli.Command{ Gid: uint32(context.Int("gid")), }, } - c := getClient() + c := getClient(context) if _, err := c.AddProcess(netcontext.Background(), p); err != nil { fatal(err.Error(), 1) } @@ -290,7 +298,7 @@ var StatsCommand = cli.Command{ req := &types.StatsRequest{ Id: context.Args().First(), } - c := getClient() + c := getClient(context) stream, err := c.GetStats(netcontext.Background(), req) if err != nil { fatal(err.Error(), 1) diff --git a/ctr/events.go b/ctr/events.go index fd00a92..06a1c54 100644 --- a/ctr/events.go +++ b/ctr/events.go @@ -14,7 +14,7 @@ var EventsCommand = cli.Command{ Name: "events", Usage: "receive events from the containerd daemon", Action: func(context *cli.Context) { - c := getClient() + c := getClient(context) events, err := c.Events(netcontext.Background(), &types.EventsRequest{}) if err != nil { fatal(err.Error(), 1) diff --git a/ctr/main.go b/ctr/main.go index 632f168..54294cd 100644 --- a/ctr/main.go +++ b/ctr/main.go @@ -28,9 +28,9 @@ func main() { Usage: "enable debug output in the logs", }, cli.StringFlag{ - Name: "addr", - Value: "http://localhost:8888", - Usage: "address to the containerd api", + Name: "address", + Value: "/run/containerd/containerd.sock", + Usage: "address of GRPC API", }, } app.Commands = []cli.Command{ diff --git a/hack/benchmark.go b/hack/benchmark.go index 50dc69f..aa537de 100644 --- a/hack/benchmark.go +++ b/hack/benchmark.go @@ -5,6 +5,7 @@ package main import ( "flag" + "net" "strconv" "sync" "time" @@ -17,19 +18,26 @@ import ( func init() { flag.StringVar(&bundle, "bundle", "/containers/redis", "the bundle path") + flag.StringVar(&addr, "addr", "/run/containerd/containerd.sock", "address to the container d instance") flag.IntVar(&count, "count", 1000, "number of containers to run") flag.Parse() } var ( - count int - bundle string - group = sync.WaitGroup{} - jobs = make(chan string, 20) + count int + bundle, addr string + group = sync.WaitGroup{} + jobs = make(chan string, 20) ) func getClient() types.APIClient { - conn, err := grpc.Dial("localhost:8888", grpc.WithInsecure()) + dialOpts := []grpc.DialOption{grpc.WithInsecure()} + dialOpts = append(dialOpts, + grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) { + return net.DialTimeout("unix", addr, timeout) + }, + )) + conn, err := grpc.Dial(addr, dialOpts...) if err != nil { logrus.Fatal(err) }