Adds logic for tracking ipc storage driver process status

This allows requests to not hang if the child process exits
This commit is contained in:
Brian Bland 2014-11-06 14:06:16 -08:00
parent 0e8647f1ce
commit 31df62064d

View file

@ -3,6 +3,7 @@ package ipc
import ( import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"fmt"
"io" "io"
"io/ioutil" "io/ioutil"
"net" "net"
@ -15,23 +16,29 @@ import (
"github.com/docker/libchan/spdy" "github.com/docker/libchan/spdy"
) )
// StorageDriverExecutablePrefix is the prefix which the IPC storage driver loader expects driver // StorageDriverExecutablePrefix is the prefix which the IPC storage driver
// executables to begin with. For example, the s3 driver should be named "registry-storage-s3". // loader expects driver executables to begin with. For example, the s3 driver
// should be named "registry-storagedriver-s3".
const StorageDriverExecutablePrefix = "registry-storagedriver-" const StorageDriverExecutablePrefix = "registry-storagedriver-"
// StorageDriverClient is a storagedriver.StorageDriver implementation using a managed child process // StorageDriverClient is a storagedriver.StorageDriver implementation using a
// communicating over IPC using libchan with a unix domain socket // managed child process communicating over IPC using libchan with a unix domain
// socket
type StorageDriverClient struct { type StorageDriverClient struct {
subprocess *exec.Cmd subprocess *exec.Cmd
exitChan chan error
exitErr error
stopChan chan struct{}
socket *os.File socket *os.File
transport *spdy.Transport transport *spdy.Transport
sender libchan.Sender sender libchan.Sender
version storagedriver.Version version storagedriver.Version
} }
// NewDriverClient constructs a new out-of-process storage driver using the driver name and // NewDriverClient constructs a new out-of-process storage driver using the
// configuration parameters // driver name and configuration parameters
// A user must call Start on this driver client before remote method calls can be made // A user must call Start on this driver client before remote method calls can
// be made
// //
// Looks for drivers in the following locations in order: // Looks for drivers in the following locations in order:
// - Storage drivers directory (to be determined, yet not implemented) // - Storage drivers directory (to be determined, yet not implemented)
@ -56,9 +63,13 @@ func NewDriverClient(name string, parameters map[string]string) (*StorageDriverC
}, nil }, nil
} }
// Start starts the designated child process storage driver and binds a socket to this process for // Start starts the designated child process storage driver and binds a socket
// IPC method calls // to this process for IPC method calls
func (driver *StorageDriverClient) Start() error { func (driver *StorageDriverClient) Start() error {
driver.exitErr = nil
driver.exitChan = make(chan error)
driver.stopChan = make(chan struct{})
fileDescriptors, err := syscall.Socketpair(syscall.AF_LOCAL, syscall.SOCK_STREAM, 0) fileDescriptors, err := syscall.Socketpair(syscall.AF_LOCAL, syscall.SOCK_STREAM, 0)
if err != nil { if err != nil {
return err return err
@ -76,6 +87,8 @@ func (driver *StorageDriverClient) Start() error {
return err return err
} }
go driver.handleSubprocessExit()
if err = childSocket.Close(); err != nil { if err = childSocket.Close(); err != nil {
driver.Stop() driver.Stop()
return err return err
@ -142,6 +155,10 @@ func (driver *StorageDriverClient) Stop() error {
if driver.subprocess != nil { if driver.subprocess != nil {
killErr = driver.subprocess.Process.Kill() killErr = driver.subprocess.Process.Kill()
} }
if driver.stopChan != nil {
driver.stopChan <- struct{}{}
close(driver.stopChan)
}
if closeSenderErr != nil { if closeSenderErr != nil {
return closeSenderErr return closeSenderErr
@ -150,12 +167,17 @@ func (driver *StorageDriverClient) Stop() error {
} else if closeSocketErr != nil { } else if closeSocketErr != nil {
return closeSocketErr return closeSocketErr
} }
return killErr return killErr
} }
// Implement the storagedriver.StorageDriver interface over IPC // Implement the storagedriver.StorageDriver interface over IPC
func (driver *StorageDriverClient) GetContent(path string) ([]byte, error) { func (driver *StorageDriverClient) GetContent(path string) ([]byte, error) {
if err := driver.exited(); err != nil {
return nil, err
}
receiver, remoteSender := libchan.Pipe() receiver, remoteSender := libchan.Pipe()
params := map[string]interface{}{"Path": path} params := map[string]interface{}{"Path": path}
@ -164,8 +186,8 @@ func (driver *StorageDriverClient) GetContent(path string) ([]byte, error) {
return nil, err return nil, err
} }
var response ReadStreamResponse response := new(ReadStreamResponse)
err = receiver.Receive(&response) err = driver.receiveResponse(receiver, response)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -183,6 +205,10 @@ func (driver *StorageDriverClient) GetContent(path string) ([]byte, error) {
} }
func (driver *StorageDriverClient) PutContent(path string, contents []byte) error { func (driver *StorageDriverClient) PutContent(path string, contents []byte) error {
if err := driver.exited(); err != nil {
return err
}
receiver, remoteSender := libchan.Pipe() receiver, remoteSender := libchan.Pipe()
params := map[string]interface{}{"Path": path, "Reader": ioutil.NopCloser(bytes.NewReader(contents))} params := map[string]interface{}{"Path": path, "Reader": ioutil.NopCloser(bytes.NewReader(contents))}
@ -191,8 +217,8 @@ func (driver *StorageDriverClient) PutContent(path string, contents []byte) erro
return err return err
} }
var response WriteStreamResponse response := new(WriteStreamResponse)
err = receiver.Receive(&response) err = driver.receiveResponse(receiver, response)
if err != nil { if err != nil {
return err return err
} }
@ -205,16 +231,19 @@ func (driver *StorageDriverClient) PutContent(path string, contents []byte) erro
} }
func (driver *StorageDriverClient) ReadStream(path string, offset uint64) (io.ReadCloser, error) { func (driver *StorageDriverClient) ReadStream(path string, offset uint64) (io.ReadCloser, error) {
receiver, remoteSender := libchan.Pipe() if err := driver.exited(); err != nil {
return nil, err
}
receiver, remoteSender := libchan.Pipe()
params := map[string]interface{}{"Path": path, "Offset": offset} params := map[string]interface{}{"Path": path, "Offset": offset}
err := driver.sender.Send(&Request{Type: "ReadStream", Parameters: params, ResponseChannel: remoteSender}) err := driver.sender.Send(&Request{Type: "ReadStream", Parameters: params, ResponseChannel: remoteSender})
if err != nil { if err != nil {
return nil, err return nil, err
} }
var response ReadStreamResponse response := new(ReadStreamResponse)
err = receiver.Receive(&response) err = driver.receiveResponse(receiver, response)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -227,16 +256,19 @@ func (driver *StorageDriverClient) ReadStream(path string, offset uint64) (io.Re
} }
func (driver *StorageDriverClient) WriteStream(path string, offset, size uint64, reader io.ReadCloser) error { func (driver *StorageDriverClient) WriteStream(path string, offset, size uint64, reader io.ReadCloser) error {
receiver, remoteSender := libchan.Pipe() if err := driver.exited(); err != nil {
return err
}
receiver, remoteSender := libchan.Pipe()
params := map[string]interface{}{"Path": path, "Offset": offset, "Size": size, "Reader": ioutil.NopCloser(reader)} params := map[string]interface{}{"Path": path, "Offset": offset, "Size": size, "Reader": ioutil.NopCloser(reader)}
err := driver.sender.Send(&Request{Type: "WriteStream", Parameters: params, ResponseChannel: remoteSender}) err := driver.sender.Send(&Request{Type: "WriteStream", Parameters: params, ResponseChannel: remoteSender})
if err != nil { if err != nil {
return err return err
} }
var response WriteStreamResponse response := new(WriteStreamResponse)
err = receiver.Receive(&response) err = driver.receiveResponse(receiver, response)
if err != nil { if err != nil {
return err return err
} }
@ -249,16 +281,19 @@ func (driver *StorageDriverClient) WriteStream(path string, offset, size uint64,
} }
func (driver *StorageDriverClient) CurrentSize(path string) (uint64, error) { func (driver *StorageDriverClient) CurrentSize(path string) (uint64, error) {
receiver, remoteSender := libchan.Pipe() if err := driver.exited(); err != nil {
return 0, err
}
receiver, remoteSender := libchan.Pipe()
params := map[string]interface{}{"Path": path} params := map[string]interface{}{"Path": path}
err := driver.sender.Send(&Request{Type: "CurrentSize", Parameters: params, ResponseChannel: remoteSender}) err := driver.sender.Send(&Request{Type: "CurrentSize", Parameters: params, ResponseChannel: remoteSender})
if err != nil { if err != nil {
return 0, err return 0, err
} }
var response CurrentSizeResponse response := new(CurrentSizeResponse)
err = receiver.Receive(&response) err = driver.receiveResponse(receiver, response)
if err != nil { if err != nil {
return 0, err return 0, err
} }
@ -271,16 +306,19 @@ func (driver *StorageDriverClient) CurrentSize(path string) (uint64, error) {
} }
func (driver *StorageDriverClient) List(path string) ([]string, error) { func (driver *StorageDriverClient) List(path string) ([]string, error) {
receiver, remoteSender := libchan.Pipe() if err := driver.exited(); err != nil {
return nil, err
}
receiver, remoteSender := libchan.Pipe()
params := map[string]interface{}{"Path": path} params := map[string]interface{}{"Path": path}
err := driver.sender.Send(&Request{Type: "List", Parameters: params, ResponseChannel: remoteSender}) err := driver.sender.Send(&Request{Type: "List", Parameters: params, ResponseChannel: remoteSender})
if err != nil { if err != nil {
return nil, err return nil, err
} }
var response ListResponse response := new(ListResponse)
err = receiver.Receive(&response) err = driver.receiveResponse(receiver, response)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -293,16 +331,19 @@ func (driver *StorageDriverClient) List(path string) ([]string, error) {
} }
func (driver *StorageDriverClient) Move(sourcePath string, destPath string) error { func (driver *StorageDriverClient) Move(sourcePath string, destPath string) error {
receiver, remoteSender := libchan.Pipe() if err := driver.exited(); err != nil {
return err
}
receiver, remoteSender := libchan.Pipe()
params := map[string]interface{}{"SourcePath": sourcePath, "DestPath": destPath} params := map[string]interface{}{"SourcePath": sourcePath, "DestPath": destPath}
err := driver.sender.Send(&Request{Type: "Move", Parameters: params, ResponseChannel: remoteSender}) err := driver.sender.Send(&Request{Type: "Move", Parameters: params, ResponseChannel: remoteSender})
if err != nil { if err != nil {
return err return err
} }
var response MoveResponse response := new(MoveResponse)
err = receiver.Receive(&response) err = driver.receiveResponse(receiver, response)
if err != nil { if err != nil {
return err return err
} }
@ -315,16 +356,19 @@ func (driver *StorageDriverClient) Move(sourcePath string, destPath string) erro
} }
func (driver *StorageDriverClient) Delete(path string) error { func (driver *StorageDriverClient) Delete(path string) error {
receiver, remoteSender := libchan.Pipe() if err := driver.exited(); err != nil {
return err
}
receiver, remoteSender := libchan.Pipe()
params := map[string]interface{}{"Path": path} params := map[string]interface{}{"Path": path}
err := driver.sender.Send(&Request{Type: "Delete", Parameters: params, ResponseChannel: remoteSender}) err := driver.sender.Send(&Request{Type: "Delete", Parameters: params, ResponseChannel: remoteSender})
if err != nil { if err != nil {
return err return err
} }
var response DeleteResponse response := new(DeleteResponse)
err = receiver.Receive(&response) err = driver.receiveResponse(receiver, response)
if err != nil { if err != nil {
return err return err
} }
@ -335,3 +379,66 @@ func (driver *StorageDriverClient) Delete(path string) error {
return nil return nil
} }
// handleSubprocessExit populates the exit channel until we have explicitly
// stopped the storage driver subprocess
// Requests can select on driver.exitChan and response receiving and not hang if
// the process exits
func (driver *StorageDriverClient) handleSubprocessExit() {
exitErr := driver.subprocess.Wait()
if exitErr == nil {
exitErr = fmt.Errorf("Storage driver subprocess already exited cleanly")
} else {
exitErr = fmt.Errorf("Storage driver subprocess exited with error: %s", exitErr)
}
driver.exitErr = exitErr
for {
select {
case driver.exitChan <- exitErr:
case <-driver.stopChan:
close(driver.exitChan)
return
}
}
}
// receiveResponse populates the response value with the next result from the
// given receiver, or returns an error if receiving failed or the driver has
// stopped
func (driver *StorageDriverClient) receiveResponse(receiver libchan.Receiver, response interface{}) error {
receiveChan := make(chan error, 1)
go func(receiveChan chan<- error) {
defer close(receiveChan)
receiveChan <- receiver.Receive(response)
}(receiveChan)
var err error
var ok bool
select {
case err = <-receiveChan:
case err, ok = <-driver.exitChan:
go func(receiveChan <-chan error) {
<-receiveChan
}(receiveChan)
if !ok {
err = driver.exitErr
}
}
return err
}
// exited returns an exit error if the driver has exited or nil otherwise
func (driver *StorageDriverClient) exited() error {
select {
case err, ok := <-driver.exitChan:
if !ok {
return driver.exitErr
}
return err
default:
return nil
}
}