8c74da3983
With this changeset, image store access is now moved to completely accessible over GRPC. No clients manipulate the image store database directly and the GRPC client is fully featured. The metadata database is now managed by the daemon and access coordinated via services. Signed-off-by: Stephen J Day <stephen.day@docker.com>
172 lines
4.1 KiB
Go
172 lines
4.1 KiB
Go
package main
|
|
|
|
import (
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"log"
|
|
"net"
|
|
"os"
|
|
"path/filepath"
|
|
"sync"
|
|
"syscall"
|
|
"time"
|
|
|
|
gocontext "context"
|
|
|
|
contentapi "github.com/containerd/containerd/api/services/content"
|
|
"github.com/containerd/containerd/api/services/execution"
|
|
imagesapi "github.com/containerd/containerd/api/services/images"
|
|
rootfsapi "github.com/containerd/containerd/api/services/rootfs"
|
|
"github.com/containerd/containerd/api/types/container"
|
|
"github.com/containerd/containerd/content"
|
|
"github.com/containerd/containerd/images"
|
|
contentservice "github.com/containerd/containerd/services/content"
|
|
imagesservice "github.com/containerd/containerd/services/images"
|
|
"github.com/pkg/errors"
|
|
"github.com/tonistiigi/fifo"
|
|
"github.com/urfave/cli"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/grpclog"
|
|
)
|
|
|
|
var grpcConn *grpc.ClientConn
|
|
|
|
func prepareStdio(stdin, stdout, stderr string, console bool) (*sync.WaitGroup, error) {
|
|
var wg sync.WaitGroup
|
|
ctx := gocontext.Background()
|
|
|
|
f, err := fifo.OpenFifo(ctx, stdin, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer func(c io.Closer) {
|
|
if err != nil {
|
|
c.Close()
|
|
}
|
|
}(f)
|
|
go func(w io.WriteCloser) {
|
|
io.Copy(w, os.Stdin)
|
|
w.Close()
|
|
}(f)
|
|
|
|
f, err = fifo.OpenFifo(ctx, stdout, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer func(c io.Closer) {
|
|
if err != nil {
|
|
c.Close()
|
|
}
|
|
}(f)
|
|
wg.Add(1)
|
|
go func(r io.ReadCloser) {
|
|
io.Copy(os.Stdout, r)
|
|
r.Close()
|
|
wg.Done()
|
|
}(f)
|
|
|
|
f, err = fifo.OpenFifo(ctx, stderr, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer func(c io.Closer) {
|
|
if err != nil {
|
|
c.Close()
|
|
}
|
|
}(f)
|
|
if !console {
|
|
wg.Add(1)
|
|
go func(r io.ReadCloser) {
|
|
io.Copy(os.Stderr, r)
|
|
r.Close()
|
|
wg.Done()
|
|
}(f)
|
|
}
|
|
|
|
return &wg, nil
|
|
}
|
|
|
|
func getGRPCConnection(context *cli.Context) (*grpc.ClientConn, error) {
|
|
if grpcConn != nil {
|
|
return grpcConn, nil
|
|
}
|
|
|
|
bindSocket := context.GlobalString("socket")
|
|
// reset the logger for grpc to log to dev/null so that it does not mess with our stdio
|
|
grpclog.SetLogger(log.New(ioutil.Discard, "", log.LstdFlags))
|
|
dialOpts := []grpc.DialOption{grpc.WithInsecure(), grpc.WithTimeout(100 * time.Second)}
|
|
dialOpts = append(dialOpts,
|
|
grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
|
|
return net.DialTimeout("unix", bindSocket, timeout)
|
|
},
|
|
))
|
|
|
|
conn, err := grpc.Dial(fmt.Sprintf("unix://%s", bindSocket), dialOpts...)
|
|
if err != nil {
|
|
return nil, errors.Wrapf(err, "failed to dial %q", bindSocket)
|
|
}
|
|
|
|
grpcConn = conn
|
|
return grpcConn, nil
|
|
}
|
|
|
|
func getExecutionService(context *cli.Context) (execution.ContainerServiceClient, error) {
|
|
conn, err := getGRPCConnection(context)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return execution.NewContainerServiceClient(conn), nil
|
|
}
|
|
|
|
func getContentProvider(context *cli.Context) (content.Provider, error) {
|
|
conn, err := getGRPCConnection(context)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return contentservice.NewProviderFromClient(contentapi.NewContentClient(conn)), nil
|
|
}
|
|
|
|
func getRootFSService(context *cli.Context) (rootfsapi.RootFSClient, error) {
|
|
conn, err := getGRPCConnection(context)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return rootfsapi.NewRootFSClient(conn), nil
|
|
}
|
|
|
|
func getImageStore(clicontext *cli.Context) (images.Store, error) {
|
|
conn, err := getGRPCConnection(clicontext)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return imagesservice.NewStoreFromClient(imagesapi.NewImagesClient(conn)), nil
|
|
}
|
|
|
|
func getTempDir(id string) (string, error) {
|
|
err := os.MkdirAll(filepath.Join(os.TempDir(), "ctr"), 0700)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
tmpDir, err := ioutil.TempDir(filepath.Join(os.TempDir(), "ctr"), fmt.Sprintf("%s-", id))
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
return tmpDir, nil
|
|
}
|
|
|
|
func waitContainer(events execution.ContainerService_EventsClient, response *execution.CreateResponse) (uint32, error) {
|
|
for {
|
|
e, err := events.Recv()
|
|
if err != nil {
|
|
return 255, err
|
|
}
|
|
if e.Type != container.Event_EXIT {
|
|
continue
|
|
}
|
|
if e.ID == response.ID &&
|
|
e.Pid == response.Pid {
|
|
return e.ExitStatus, nil
|
|
}
|
|
}
|
|
}
|