Use unix-socket as communication channel

Signed-off-by: Alexander Morozov <lk4d4@docker.com>
Signed-off-by: Michael Crosby <crosbymichael@gmail.com>

Conflicts:
	ctr/container.go
This commit is contained in:
Alexander Morozov 2015-12-14 15:54:11 -08:00 committed by Michael Crosby
parent e16dfc36a5
commit ca4191ce41
6 changed files with 47 additions and 21 deletions

View file

@ -57,6 +57,11 @@ var daemonFlags = []cli.Flag{
Value: 60 * time.Second, Value: 60 * time.Second,
Usage: "interval for flushing metrics to the store", Usage: "interval for flushing metrics to the store",
}, },
cli.StringFlag{
Name: "listen,l",
Value: "/run/containerd/containerd.sock",
Usage: "Address on which GRPC API will listen",
},
} }
func main() { func main() {
@ -81,6 +86,7 @@ func main() {
app.Action = func(context *cli.Context) { app.Action = func(context *cli.Context) {
if err := daemon( if err := daemon(
context.String("id"), context.String("id"),
context.String("listen"),
context.String("state-dir"), context.String("state-dir"),
context.Int("concurrency"), context.Int("concurrency"),
); err != nil { ); err != nil {
@ -140,7 +146,7 @@ func processMetrics() {
}() }()
} }
func daemon(id, stateDir string, concurrency int) error { func daemon(id, address, stateDir string, concurrency int) error {
tasks := make(chan *containerd.StartTask, concurrency*100) tasks := make(chan *containerd.StartTask, concurrency*100)
supervisor, err := containerd.NewSupervisor(id, stateDir, tasks) supervisor, err := containerd.NewSupervisor(id, stateDir, tasks)
if err != nil { if err != nil {
@ -166,12 +172,16 @@ func daemon(id, stateDir string, concurrency int) error {
if err := supervisor.Start(); err != nil { if err := supervisor.Start(); err != nil {
return err return err
} }
l, err := net.Listen("tcp", ":8888") if err := os.RemoveAll(address); err != nil {
return err
}
l, err := net.Listen("unix", address)
if err != nil { if err != nil {
return err return err
} }
s := grpc.NewServer() s := grpc.NewServer()
types.RegisterAPIServer(s, server.NewServer(supervisor)) types.RegisterAPIServer(s, server.NewServer(supervisor))
logrus.Debugf("GRPC API listen on %s", address)
return s.Serve(l) return s.Serve(l)
} }

View file

@ -28,7 +28,7 @@ var ListCheckpointCommand = cli.Command{
func listCheckpoints(context *cli.Context) { func listCheckpoints(context *cli.Context) {
var ( var (
c = getClient() c = getClient(context)
id = context.Args().First() id = context.Args().First()
) )
if id == "" { if id == "" {
@ -82,7 +82,7 @@ var CreateCheckpointCommand = cli.Command{
if name == "" { if name == "" {
fatal("checkpoint name cannot be empty", 1) fatal("checkpoint name cannot be empty", 1)
} }
c := getClient() c := getClient(context)
if _, err := c.CreateCheckpoint(netcontext.Background(), &types.CreateCheckpointRequest{ if _, err := c.CreateCheckpoint(netcontext.Background(), &types.CreateCheckpointRequest{
Id: containerID, Id: containerID,
Checkpoint: &types.Checkpoint{ Checkpoint: &types.Checkpoint{
@ -108,7 +108,7 @@ var DeleteCheckpointCommand = cli.Command{
if name == "" { if name == "" {
fatal("checkpoint name cannot be empty", 1) fatal("checkpoint name cannot be empty", 1)
} }
c := getClient() c := getClient(context)
if _, err := c.DeleteCheckpoint(netcontext.Background(), &types.DeleteCheckpointRequest{ if _, err := c.DeleteCheckpoint(netcontext.Background(), &types.DeleteCheckpointRequest{
Id: containerID, Id: containerID,
Name: name, Name: name,

View file

@ -4,10 +4,12 @@ import (
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
"net"
"os" "os"
"path/filepath" "path/filepath"
"syscall" "syscall"
"text/tabwriter" "text/tabwriter"
"time"
"github.com/codegangsta/cli" "github.com/codegangsta/cli"
"github.com/docker/containerd/api/grpc/types" "github.com/docker/containerd/api/grpc/types"
@ -18,8 +20,14 @@ import (
) )
// TODO: parse flags and pass opts // TODO: parse flags and pass opts
func getClient() types.APIClient { func getClient(ctx *cli.Context) types.APIClient {
conn, err := grpc.Dial("localhost:8888", grpc.WithInsecure()) dialOpts := []grpc.DialOption{grpc.WithInsecure()}
dialOpts = append(dialOpts,
grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("unix", addr, timeout)
},
))
conn, err := grpc.Dial(ctx.GlobalString("address"), dialOpts...)
if err != nil { if err != nil {
fatal(err.Error(), 1) fatal(err.Error(), 1)
} }
@ -46,7 +54,7 @@ var ListCommand = cli.Command{
} }
func listContainers(context *cli.Context) { func listContainers(context *cli.Context) {
c := getClient() c := getClient(context)
resp, err := c.State(netcontext.Background(), &types.StateRequest{}) resp, err := c.State(netcontext.Background(), &types.StateRequest{})
if err != nil { if err != nil {
fatal(err.Error(), 1) fatal(err.Error(), 1)
@ -90,7 +98,7 @@ var StartCommand = cli.Command{
if id == "" { if id == "" {
fatal("container id cannot be empty", 1) fatal("container id cannot be empty", 1)
} }
c := getClient() c := getClient(context)
events, err := c.Events(netcontext.Background(), &types.EventsRequest{}) events, err := c.Events(netcontext.Background(), &types.EventsRequest{})
if err != nil { if err != nil {
fatal(err.Error(), 1) fatal(err.Error(), 1)
@ -223,7 +231,7 @@ var KillCommand = cli.Command{
if id == "" { if id == "" {
fatal("container id cannot be empty", 1) fatal("container id cannot be empty", 1)
} }
c := getClient() c := getClient(context)
if _, err := c.Signal(netcontext.Background(), &types.SignalRequest{ if _, err := c.Signal(netcontext.Background(), &types.SignalRequest{
Id: id, Id: id,
Pid: uint32(context.Int("pid")), Pid: uint32(context.Int("pid")),
@ -276,7 +284,7 @@ var ExecCommand = cli.Command{
Gid: uint32(context.Int("gid")), Gid: uint32(context.Int("gid")),
}, },
} }
c := getClient() c := getClient(context)
if _, err := c.AddProcess(netcontext.Background(), p); err != nil { if _, err := c.AddProcess(netcontext.Background(), p); err != nil {
fatal(err.Error(), 1) fatal(err.Error(), 1)
} }
@ -290,7 +298,7 @@ var StatsCommand = cli.Command{
req := &types.StatsRequest{ req := &types.StatsRequest{
Id: context.Args().First(), Id: context.Args().First(),
} }
c := getClient() c := getClient(context)
stream, err := c.GetStats(netcontext.Background(), req) stream, err := c.GetStats(netcontext.Background(), req)
if err != nil { if err != nil {
fatal(err.Error(), 1) fatal(err.Error(), 1)

View file

@ -14,7 +14,7 @@ var EventsCommand = cli.Command{
Name: "events", Name: "events",
Usage: "receive events from the containerd daemon", Usage: "receive events from the containerd daemon",
Action: func(context *cli.Context) { Action: func(context *cli.Context) {
c := getClient() c := getClient(context)
events, err := c.Events(netcontext.Background(), &types.EventsRequest{}) events, err := c.Events(netcontext.Background(), &types.EventsRequest{})
if err != nil { if err != nil {
fatal(err.Error(), 1) fatal(err.Error(), 1)

View file

@ -28,9 +28,9 @@ func main() {
Usage: "enable debug output in the logs", Usage: "enable debug output in the logs",
}, },
cli.StringFlag{ cli.StringFlag{
Name: "addr", Name: "address",
Value: "http://localhost:8888", Value: "/run/containerd/containerd.sock",
Usage: "address to the containerd api", Usage: "address of GRPC API",
}, },
} }
app.Commands = []cli.Command{ app.Commands = []cli.Command{

View file

@ -5,6 +5,7 @@ package main
import ( import (
"flag" "flag"
"net"
"strconv" "strconv"
"sync" "sync"
"time" "time"
@ -17,19 +18,26 @@ import (
func init() { func init() {
flag.StringVar(&bundle, "bundle", "/containers/redis", "the bundle path") flag.StringVar(&bundle, "bundle", "/containers/redis", "the bundle path")
flag.StringVar(&addr, "addr", "/run/containerd/containerd.sock", "address to the container d instance")
flag.IntVar(&count, "count", 1000, "number of containers to run") flag.IntVar(&count, "count", 1000, "number of containers to run")
flag.Parse() flag.Parse()
} }
var ( var (
count int count int
bundle string bundle, addr string
group = sync.WaitGroup{} group = sync.WaitGroup{}
jobs = make(chan string, 20) jobs = make(chan string, 20)
) )
func getClient() types.APIClient { func getClient() types.APIClient {
conn, err := grpc.Dial("localhost:8888", grpc.WithInsecure()) dialOpts := []grpc.DialOption{grpc.WithInsecure()}
dialOpts = append(dialOpts,
grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("unix", addr, timeout)
},
))
conn, err := grpc.Dial(addr, dialOpts...)
if err != nil { if err != nil {
logrus.Fatal(err) logrus.Fatal(err)
} }