Merge pull request #670 from stevvooe/remove-ipc

Remove half-baked Storage Driver IPC support
This commit is contained in:
Stephen Day 2015-06-30 19:21:54 -07:00
commit 940b865bc0
10 changed files with 49 additions and 911 deletions

View file

@ -59,11 +59,5 @@ func init() {
return "" return ""
} }
testsuites.RegisterInProcessSuite(azureDriverConstructor, skipCheck) testsuites.RegisterSuite(azureDriverConstructor, skipCheck)
// testsuites.RegisterIPCSuite(driverName, map[string]string{
// paramAccountName: accountName,
// paramAccountKey: accountKey,
// paramContainer: container,
// paramRealm: realm,
// }, skipCheck)
} }

View file

@ -33,30 +33,14 @@ func Register(name string, factory StorageDriverFactory) {
driverFactories[name] = factory driverFactories[name] = factory
} }
// Create a new storagedriver.StorageDriver with the given name and parameters // Create a new storagedriver.StorageDriver with the given name and
// To run in-process, the StorageDriverFactory must first be registered with the given name // parameters. To use a driver, the StorageDriverFactory must first be
// If no in-process drivers are found with the given name, this attempts to create an IPC driver // registered with the given name. If no drivers are found, an
// If no in-process or external drivers are found, an InvalidStorageDriverError is returned // InvalidStorageDriverError is returned
func Create(name string, parameters map[string]interface{}) (storagedriver.StorageDriver, error) { func Create(name string, parameters map[string]interface{}) (storagedriver.StorageDriver, error) {
driverFactory, ok := driverFactories[name] driverFactory, ok := driverFactories[name]
if !ok { if !ok {
return nil, InvalidStorageDriverError{name} return nil, InvalidStorageDriverError{name}
// NOTE(stevvooe): We are disabling storagedriver ipc for now, as the
// server and client need to be updated for the changed API calls and
// there were some problems libchan hanging. We'll phase this
// functionality back in over the next few weeks.
// No registered StorageDriverFactory found, try ipc
// driverClient, err := ipc.NewDriverClient(name, parameters)
// if err != nil {
// return nil, InvalidStorageDriverError{name}
// }
// err = driverClient.Start()
// if err != nil {
// return nil, err
// }
// return driverClient, nil
} }
return driverFactory.Create(parameters) return driverFactory.Create(parameters)
} }

View file

@ -20,10 +20,7 @@ func init() {
} }
defer os.Remove(root) defer os.Remove(root)
testsuites.RegisterInProcessSuite(func() (storagedriver.StorageDriver, error) { testsuites.RegisterSuite(func() (storagedriver.StorageDriver, error) {
return New(root), nil return New(root), nil
}, testsuites.NeverSkip) }, testsuites.NeverSkip)
// BUG(stevvooe): IPC is broken so we're disabling for now. Will revisit later.
// testsuites.RegisterIPCSuite(driverName, map[string]string{"rootdirectory": root}, testsuites.NeverSkip)
} }

View file

@ -5,7 +5,6 @@ import (
storagedriver "github.com/docker/distribution/registry/storage/driver" storagedriver "github.com/docker/distribution/registry/storage/driver"
"github.com/docker/distribution/registry/storage/driver/testsuites" "github.com/docker/distribution/registry/storage/driver/testsuites"
"gopkg.in/check.v1" "gopkg.in/check.v1"
) )
@ -16,9 +15,5 @@ func init() {
inmemoryDriverConstructor := func() (storagedriver.StorageDriver, error) { inmemoryDriverConstructor := func() (storagedriver.StorageDriver, error) {
return New(), nil return New(), nil
} }
testsuites.RegisterInProcessSuite(inmemoryDriverConstructor, testsuites.NeverSkip) testsuites.RegisterSuite(inmemoryDriverConstructor, testsuites.NeverSkip)
// BUG(stevvooe): Disable flaky IPC tests for now when we can troubleshoot
// the problems with libchan.
// testsuites.RegisterIPCSuite(driverName, nil, testsuites.NeverSkip)
} }

View file

@ -1,454 +0,0 @@
// +build ignore
package ipc
import (
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net"
"os"
"os/exec"
"syscall"
storagedriver "github.com/docker/distribution/registry/storage/driver"
"github.com/docker/libchan"
"github.com/docker/libchan/spdy"
)
// StorageDriverExecutablePrefix is the prefix which the IPC storage driver
// loader expects driver executables to begin with. For example, the s3 driver
// should be named "registry-storagedriver-s3".
const StorageDriverExecutablePrefix = "registry-storagedriver-"
// StorageDriverClient is a storagedriver.StorageDriver implementation using a
// managed child process communicating over IPC using libchan with a unix domain
// socket
type StorageDriverClient struct {
subprocess *exec.Cmd
exitChan chan error
exitErr error
stopChan chan struct{}
socket *os.File
transport *spdy.Transport
sender libchan.Sender
version storagedriver.Version
}
// NewDriverClient constructs a new out-of-process storage driver using the
// driver name and configuration parameters
// A user must call Start on this driver client before remote method calls can
// be made
//
// Looks for drivers in the following locations in order:
// - Storage drivers directory (to be determined, yet not implemented)
// - $GOPATH/bin
// - $PATH
func NewDriverClient(name string, parameters map[string]string) (*StorageDriverClient, error) {
paramsBytes, err := json.Marshal(parameters)
if err != nil {
return nil, err
}
driverExecName := StorageDriverExecutablePrefix + name
driverPath, err := exec.LookPath(driverExecName)
if err != nil {
return nil, err
}
command := exec.Command(driverPath, string(paramsBytes))
return &StorageDriverClient{
subprocess: command,
}, nil
}
// Start starts the designated child process storage driver and binds a socket
// to this process for IPC method calls
func (driver *StorageDriverClient) Start() error {
driver.exitErr = nil
driver.exitChan = make(chan error)
driver.stopChan = make(chan struct{})
fileDescriptors, err := syscall.Socketpair(syscall.AF_LOCAL, syscall.SOCK_STREAM, 0)
if err != nil {
return err
}
childSocket := os.NewFile(uintptr(fileDescriptors[0]), "childSocket")
driver.socket = os.NewFile(uintptr(fileDescriptors[1]), "parentSocket")
driver.subprocess.Stdout = os.Stdout
driver.subprocess.Stderr = os.Stderr
driver.subprocess.ExtraFiles = []*os.File{childSocket}
if err = driver.subprocess.Start(); err != nil {
driver.Stop()
return err
}
go driver.handleSubprocessExit()
if err = childSocket.Close(); err != nil {
driver.Stop()
return err
}
connection, err := net.FileConn(driver.socket)
if err != nil {
driver.Stop()
return err
}
driver.transport, err = spdy.NewClientTransport(connection)
if err != nil {
driver.Stop()
return err
}
driver.sender, err = driver.transport.NewSendChannel()
if err != nil {
driver.Stop()
return err
}
// Check the driver's version to determine compatibility
receiver, remoteSender := libchan.Pipe()
err = driver.sender.Send(&Request{Type: "Version", ResponseChannel: remoteSender})
if err != nil {
driver.Stop()
return err
}
var response VersionResponse
err = receiver.Receive(&response)
if err != nil {
driver.Stop()
return err
}
if response.Error != nil {
return response.Error.Unwrap()
}
driver.version = response.Version
if driver.version.Major() != storagedriver.CurrentVersion.Major() || driver.version.Minor() > storagedriver.CurrentVersion.Minor() {
return IncompatibleVersionError{driver.version}
}
return nil
}
// Stop stops the child process storage driver
// storagedriver.StorageDriver methods called after Stop will fail
func (driver *StorageDriverClient) Stop() error {
var closeSenderErr, closeTransportErr, closeSocketErr, killErr error
if driver.sender != nil {
closeSenderErr = driver.sender.Close()
}
if driver.transport != nil {
closeTransportErr = driver.transport.Close()
}
if driver.socket != nil {
closeSocketErr = driver.socket.Close()
}
if driver.subprocess != nil {
killErr = driver.subprocess.Process.Kill()
}
if driver.stopChan != nil {
close(driver.stopChan)
}
if closeSenderErr != nil {
return closeSenderErr
} else if closeTransportErr != nil {
return closeTransportErr
} else if closeSocketErr != nil {
return closeSocketErr
}
return killErr
}
// Implement the storagedriver.StorageDriver interface over IPC
// GetContent retrieves the content stored at "path" as a []byte.
func (driver *StorageDriverClient) GetContent(path string) ([]byte, error) {
if err := driver.exited(); err != nil {
return nil, err
}
receiver, remoteSender := libchan.Pipe()
params := map[string]interface{}{"Path": path}
err := driver.sender.Send(&Request{Type: "GetContent", Parameters: params, ResponseChannel: remoteSender})
if err != nil {
return nil, err
}
response := new(ReadStreamResponse)
err = driver.receiveResponse(receiver, response)
if err != nil {
return nil, err
}
if response.Error != nil {
return nil, response.Error.Unwrap()
}
defer response.Reader.Close()
contents, err := ioutil.ReadAll(response.Reader)
if err != nil {
return nil, err
}
return contents, nil
}
// PutContent stores the []byte content at a location designated by "path".
func (driver *StorageDriverClient) PutContent(path string, contents []byte) error {
if err := driver.exited(); err != nil {
return err
}
receiver, remoteSender := libchan.Pipe()
params := map[string]interface{}{"Path": path, "Reader": ioutil.NopCloser(bytes.NewReader(contents))}
err := driver.sender.Send(&Request{Type: "PutContent", Parameters: params, ResponseChannel: remoteSender})
if err != nil {
return err
}
response := new(WriteStreamResponse)
err = driver.receiveResponse(receiver, response)
if err != nil {
return err
}
if response.Error != nil {
return response.Error.Unwrap()
}
return nil
}
// ReadStream retrieves an io.ReadCloser for the content stored at "path" with a
// given byte offset.
func (driver *StorageDriverClient) ReadStream(path string, offset int64) (io.ReadCloser, error) {
if err := driver.exited(); err != nil {
return nil, err
}
receiver, remoteSender := libchan.Pipe()
params := map[string]interface{}{"Path": path, "Offset": offset}
err := driver.sender.Send(&Request{Type: "ReadStream", Parameters: params, ResponseChannel: remoteSender})
if err != nil {
return nil, err
}
response := new(ReadStreamResponse)
err = driver.receiveResponse(receiver, response)
if err != nil {
return nil, err
}
if response.Error != nil {
return nil, response.Error.Unwrap()
}
return response.Reader, nil
}
// WriteStream stores the contents of the provided io.ReadCloser at a location
// designated by the given path.
func (driver *StorageDriverClient) WriteStream(path string, offset, size int64, reader io.ReadCloser) error {
if err := driver.exited(); err != nil {
return err
}
receiver, remoteSender := libchan.Pipe()
params := map[string]interface{}{"Path": path, "Offset": offset, "Size": size, "Reader": reader}
err := driver.sender.Send(&Request{Type: "WriteStream", Parameters: params, ResponseChannel: remoteSender})
if err != nil {
return err
}
response := new(WriteStreamResponse)
err = driver.receiveResponse(receiver, response)
if err != nil {
return err
}
if response.Error != nil {
return response.Error.Unwrap()
}
return nil
}
// CurrentSize retrieves the curernt size in bytes of the object at the given
// path.
func (driver *StorageDriverClient) CurrentSize(path string) (uint64, error) {
if err := driver.exited(); err != nil {
return 0, err
}
receiver, remoteSender := libchan.Pipe()
params := map[string]interface{}{"Path": path}
err := driver.sender.Send(&Request{Type: "CurrentSize", Parameters: params, ResponseChannel: remoteSender})
if err != nil {
return 0, err
}
response := new(CurrentSizeResponse)
err = driver.receiveResponse(receiver, response)
if err != nil {
return 0, err
}
if response.Error != nil {
return 0, response.Error.Unwrap()
}
return response.Position, nil
}
// List returns a list of the objects that are direct descendants of the given
// path.
func (driver *StorageDriverClient) List(path string) ([]string, error) {
if err := driver.exited(); err != nil {
return nil, err
}
receiver, remoteSender := libchan.Pipe()
params := map[string]interface{}{"Path": path}
err := driver.sender.Send(&Request{Type: "List", Parameters: params, ResponseChannel: remoteSender})
if err != nil {
return nil, err
}
response := new(ListResponse)
err = driver.receiveResponse(receiver, response)
if err != nil {
return nil, err
}
if response.Error != nil {
return nil, response.Error.Unwrap()
}
return response.Keys, nil
}
// Move moves an object stored at sourcePath to destPath, removing the original
// object.
func (driver *StorageDriverClient) Move(sourcePath string, destPath string) error {
if err := driver.exited(); err != nil {
return err
}
receiver, remoteSender := libchan.Pipe()
params := map[string]interface{}{"SourcePath": sourcePath, "DestPath": destPath}
err := driver.sender.Send(&Request{Type: "Move", Parameters: params, ResponseChannel: remoteSender})
if err != nil {
return err
}
response := new(MoveResponse)
err = driver.receiveResponse(receiver, response)
if err != nil {
return err
}
if response.Error != nil {
return response.Error.Unwrap()
}
return nil
}
// Delete recursively deletes all objects stored at "path" and its subpaths.
func (driver *StorageDriverClient) Delete(path string) error {
if err := driver.exited(); err != nil {
return err
}
receiver, remoteSender := libchan.Pipe()
params := map[string]interface{}{"Path": path}
err := driver.sender.Send(&Request{Type: "Delete", Parameters: params, ResponseChannel: remoteSender})
if err != nil {
return err
}
response := new(DeleteResponse)
err = driver.receiveResponse(receiver, response)
if err != nil {
return err
}
if response.Error != nil {
return response.Error.Unwrap()
}
return nil
}
// handleSubprocessExit populates the exit channel until we have explicitly
// stopped the storage driver subprocess
// Requests can select on driver.exitChan and response receiving and not hang if
// the process exits
func (driver *StorageDriverClient) handleSubprocessExit() {
exitErr := driver.subprocess.Wait()
if exitErr == nil {
exitErr = fmt.Errorf("Storage driver subprocess already exited cleanly")
} else {
exitErr = fmt.Errorf("Storage driver subprocess exited with error: %s", exitErr)
}
driver.exitErr = exitErr
for {
select {
case driver.exitChan <- exitErr:
case <-driver.stopChan:
close(driver.exitChan)
return
}
}
}
// receiveResponse populates the response value with the next result from the
// given receiver, or returns an error if receiving failed or the driver has
// stopped
func (driver *StorageDriverClient) receiveResponse(receiver libchan.Receiver, response interface{}) error {
receiveChan := make(chan error, 1)
go func(receiver libchan.Receiver, receiveChan chan<- error) {
receiveChan <- receiver.Receive(response)
}(receiver, receiveChan)
var err error
var ok bool
select {
case err = <-receiveChan:
case err, ok = <-driver.exitChan:
if !ok {
err = driver.exitErr
}
}
return err
}
// exited returns an exit error if the driver has exited or nil otherwise
func (driver *StorageDriverClient) exited() error {
select {
case err, ok := <-driver.exitChan:
if !ok {
return driver.exitErr
}
return err
default:
return nil
}
}

View file

@ -1,148 +0,0 @@
// +build ignore
package ipc
import (
"fmt"
"io"
"reflect"
storagedriver "github.com/docker/distribution/registry/storage/driver"
"github.com/docker/libchan"
)
// StorageDriver is the interface which IPC storage drivers must implement. As external storage
// drivers may be defined to use a different version of the storagedriver.StorageDriver interface,
// we use an additional version check to determine compatiblity.
type StorageDriver interface {
// Version returns the storagedriver.StorageDriver interface version which this storage driver
// implements, which is used to determine driver compatibility
Version() (storagedriver.Version, error)
}
// IncompatibleVersionError is returned when a storage driver is using an incompatible version of
// the storagedriver.StorageDriver api
type IncompatibleVersionError struct {
version storagedriver.Version
}
func (e IncompatibleVersionError) Error() string {
return fmt.Sprintf("Incompatible storage driver version: %s", e.version)
}
// Request defines a remote method call request
// A return value struct is to be sent over the ResponseChannel
type Request struct {
Type string `codec:",omitempty"`
Parameters map[string]interface{} `codec:",omitempty"`
ResponseChannel libchan.Sender `codec:",omitempty"`
}
// ResponseError is a serializable error type.
// The Type and Parameters may be used to reconstruct the same error on the
// client side, falling back to using the Type and Message if this cannot be
// done.
type ResponseError struct {
Type string `codec:",omitempty"`
Message string `codec:",omitempty"`
Parameters map[string]interface{} `codec:",omitempty"`
}
// WrapError wraps an error in a serializable struct containing the error's type
// and message.
func WrapError(err error) *ResponseError {
if err == nil {
return nil
}
v := reflect.ValueOf(err)
re := ResponseError{
Type: v.Type().String(),
Message: err.Error(),
}
if v.Kind() == reflect.Struct {
re.Parameters = make(map[string]interface{})
for i := 0; i < v.NumField(); i++ {
field := v.Type().Field(i)
re.Parameters[field.Name] = v.Field(i).Interface()
}
}
return &re
}
// Unwrap returns the underlying error if it can be reconstructed, or the
// original ResponseError otherwise.
func (err *ResponseError) Unwrap() error {
var errVal reflect.Value
var zeroVal reflect.Value
switch err.Type {
case "storagedriver.PathNotFoundError":
errVal = reflect.ValueOf(&storagedriver.PathNotFoundError{})
case "storagedriver.InvalidOffsetError":
errVal = reflect.ValueOf(&storagedriver.InvalidOffsetError{})
}
if errVal == zeroVal {
return err
}
for k, v := range err.Parameters {
fieldVal := errVal.Elem().FieldByName(k)
if fieldVal == zeroVal {
return err
}
fieldVal.Set(reflect.ValueOf(v))
}
if unwrapped, ok := errVal.Elem().Interface().(error); ok {
return unwrapped
}
return err
}
func (err *ResponseError) Error() string {
return fmt.Sprintf("%s: %s", err.Type, err.Message)
}
// IPC method call response object definitions
// VersionResponse is a response for a Version request
type VersionResponse struct {
Version storagedriver.Version `codec:",omitempty"`
Error *ResponseError `codec:",omitempty"`
}
// ReadStreamResponse is a response for a ReadStream request
type ReadStreamResponse struct {
Reader io.ReadCloser `codec:",omitempty"`
Error *ResponseError `codec:",omitempty"`
}
// WriteStreamResponse is a response for a WriteStream request
type WriteStreamResponse struct {
Error *ResponseError `codec:",omitempty"`
}
// CurrentSizeResponse is a response for a CurrentSize request
type CurrentSizeResponse struct {
Position uint64 `codec:",omitempty"`
Error *ResponseError `codec:",omitempty"`
}
// ListResponse is a response for a List request
type ListResponse struct {
Keys []string `codec:",omitempty"`
Error *ResponseError `codec:",omitempty"`
}
// MoveResponse is a response for a Move request
type MoveResponse struct {
Error *ResponseError `codec:",omitempty"`
}
// DeleteResponse is a response for a Delete request
type DeleteResponse struct {
Error *ResponseError `codec:",omitempty"`
}

View file

@ -1,178 +0,0 @@
// +build ignore
package ipc
import (
"bytes"
"io"
"io/ioutil"
"net"
"os"
"reflect"
storagedriver "github.com/docker/distribution/registry/storage/driver"
"github.com/docker/libchan"
"github.com/docker/libchan/spdy"
)
// StorageDriverServer runs a new IPC server handling requests for the given
// storagedriver.StorageDriver
// This explicitly uses file descriptor 3 for IPC communication, as storage drivers are spawned in
// client.go
//
// To create a new out-of-process driver, create a main package which calls StorageDriverServer with
// a storagedriver.StorageDriver
func StorageDriverServer(driver storagedriver.StorageDriver) error {
childSocket := os.NewFile(3, "childSocket")
defer childSocket.Close()
conn, err := net.FileConn(childSocket)
if err != nil {
panic(err)
}
defer conn.Close()
if transport, err := spdy.NewServerTransport(conn); err != nil {
panic(err)
} else {
for {
receiver, err := transport.WaitReceiveChannel()
if err == io.EOF {
return nil
} else if err != nil {
panic(err)
}
go receive(driver, receiver)
}
}
}
// receive receives new storagedriver.StorageDriver method requests and creates a new goroutine to
// handle each request
// Requests are expected to be of type ipc.Request as the parameters are unknown until the request
// type is deserialized
func receive(driver storagedriver.StorageDriver, receiver libchan.Receiver) {
for {
var request Request
err := receiver.Receive(&request)
if err == io.EOF {
return
} else if err != nil {
panic(err)
}
go handleRequest(driver, request)
}
}
// handleRequest handles storagedriver.StorageDriver method requests as defined in client.go
// Responds to requests using the Request.ResponseChannel
func handleRequest(driver storagedriver.StorageDriver, request Request) {
switch request.Type {
case "Version":
err := request.ResponseChannel.Send(&VersionResponse{Version: storagedriver.CurrentVersion})
if err != nil {
panic(err)
}
case "GetContent":
path, _ := request.Parameters["Path"].(string)
content, err := driver.GetContent(path)
var response ReadStreamResponse
if err != nil {
response = ReadStreamResponse{Error: WrapError(err)}
} else {
response = ReadStreamResponse{Reader: ioutil.NopCloser(bytes.NewReader(content))}
}
err = request.ResponseChannel.Send(&response)
if err != nil {
panic(err)
}
case "PutContent":
path, _ := request.Parameters["Path"].(string)
reader, _ := request.Parameters["Reader"].(io.ReadCloser)
contents, err := ioutil.ReadAll(reader)
defer reader.Close()
if err == nil {
err = driver.PutContent(path, contents)
}
response := WriteStreamResponse{
Error: WrapError(err),
}
err = request.ResponseChannel.Send(&response)
if err != nil {
panic(err)
}
case "ReadStream":
path, _ := request.Parameters["Path"].(string)
// Depending on serialization method, Offset may be converted to any int/uint type
offset := reflect.ValueOf(request.Parameters["Offset"]).Convert(reflect.TypeOf(int64(0))).Int()
reader, err := driver.ReadStream(path, offset)
var response ReadStreamResponse
if err != nil {
response = ReadStreamResponse{Error: WrapError(err)}
} else {
response = ReadStreamResponse{Reader: reader}
}
err = request.ResponseChannel.Send(&response)
if err != nil {
panic(err)
}
case "WriteStream":
path, _ := request.Parameters["Path"].(string)
// Depending on serialization method, Offset may be converted to any int/uint type
offset := reflect.ValueOf(request.Parameters["Offset"]).Convert(reflect.TypeOf(int64(0))).Int()
// Depending on serialization method, Size may be converted to any int/uint type
size := reflect.ValueOf(request.Parameters["Size"]).Convert(reflect.TypeOf(int64(0))).Int()
reader, _ := request.Parameters["Reader"].(io.ReadCloser)
err := driver.WriteStream(path, offset, size, reader)
response := WriteStreamResponse{
Error: WrapError(err),
}
err = request.ResponseChannel.Send(&response)
if err != nil {
panic(err)
}
case "CurrentSize":
path, _ := request.Parameters["Path"].(string)
position, err := driver.CurrentSize(path)
response := CurrentSizeResponse{
Position: position,
Error: WrapError(err),
}
err = request.ResponseChannel.Send(&response)
if err != nil {
panic(err)
}
case "List":
path, _ := request.Parameters["Path"].(string)
keys, err := driver.List(path)
response := ListResponse{
Keys: keys,
Error: WrapError(err),
}
err = request.ResponseChannel.Send(&response)
if err != nil {
panic(err)
}
case "Move":
sourcePath, _ := request.Parameters["SourcePath"].(string)
destPath, _ := request.Parameters["DestPath"].(string)
err := driver.Move(sourcePath, destPath)
response := MoveResponse{
Error: WrapError(err),
}
err = request.ResponseChannel.Send(&response)
if err != nil {
panic(err)
}
case "Delete":
path, _ := request.Parameters["Path"].(string)
err := driver.Delete(path)
response := DeleteResponse{
Error: WrapError(err),
}
err = request.ResponseChannel.Send(&response)
if err != nil {
panic(err)
}
default:
panic(request)
}
}

View file

@ -36,5 +36,5 @@ func init() {
return "" return ""
} }
testsuites.RegisterInProcessSuite(driverConstructor, skipCheck) testsuites.RegisterSuite(driverConstructor, skipCheck)
} }

View file

@ -17,7 +17,8 @@ import (
// Hook up gocheck into the "go test" runner. // Hook up gocheck into the "go test" runner.
func Test(t *testing.T) { check.TestingT(t) } func Test(t *testing.T) { check.TestingT(t) }
type S3DriverConstructor func(rootDirectory string) (*Driver, error) var s3DriverConstructor func(rootDirectory string) (*Driver, error)
var skipS3 func() string
func init() { func init() {
accessKey := os.Getenv("AWS_ACCESS_KEY") accessKey := os.Getenv("AWS_ACCESS_KEY")
@ -33,7 +34,7 @@ func init() {
} }
defer os.Remove(root) defer os.Remove(root)
s3DriverConstructor := func(rootDirectory string) (*Driver, error) { s3DriverConstructor = func(rootDirectory string) (*Driver, error) {
encryptBool := false encryptBool := false
if encrypt != "" { if encrypt != "" {
encryptBool, err = strconv.ParseBool(encrypt) encryptBool, err = strconv.ParseBool(encrypt)
@ -74,79 +75,64 @@ func init() {
} }
// Skip S3 storage driver tests if environment variable parameters are not provided // Skip S3 storage driver tests if environment variable parameters are not provided
skipCheck := func() string { skipS3 = func() string {
if accessKey == "" || secretKey == "" || region == "" || bucket == "" || encrypt == "" { if accessKey == "" || secretKey == "" || region == "" || bucket == "" || encrypt == "" {
return "Must set AWS_ACCESS_KEY, AWS_SECRET_KEY, AWS_REGION, S3_BUCKET, and S3_ENCRYPT to run S3 tests" return "Must set AWS_ACCESS_KEY, AWS_SECRET_KEY, AWS_REGION, S3_BUCKET, and S3_ENCRYPT to run S3 tests"
} }
return "" return ""
} }
driverConstructor := func() (storagedriver.StorageDriver, error) { testsuites.RegisterSuite(func() (storagedriver.StorageDriver, error) {
return s3DriverConstructor(root) return s3DriverConstructor(root)
}, skipS3)
}
func TestEmptyRootList(t *testing.T) {
if skipS3() != "" {
t.Skip(skipS3())
} }
testsuites.RegisterInProcessSuite(driverConstructor, skipCheck)
// s3Constructor := func() (*Driver, error) {
// return s3DriverConstructor(aws.GetRegion(region))
// }
RegisterS3DriverSuite(s3DriverConstructor, skipCheck)
// testsuites.RegisterIPCSuite(driverName, map[string]string{
// "accesskey": accessKey,
// "secretkey": secretKey,
// "region": region.Name,
// "bucket": bucket,
// "encrypt": encrypt,
// }, skipCheck)
// }
}
func RegisterS3DriverSuite(s3DriverConstructor S3DriverConstructor, skipCheck testsuites.SkipCheck) {
check.Suite(&S3DriverSuite{
Constructor: s3DriverConstructor,
SkipCheck: skipCheck,
})
}
type S3DriverSuite struct {
Constructor S3DriverConstructor
testsuites.SkipCheck
}
func (suite *S3DriverSuite) SetUpSuite(c *check.C) {
if reason := suite.SkipCheck(); reason != "" {
c.Skip(reason)
}
}
func (suite *S3DriverSuite) TestEmptyRootList(c *check.C) {
validRoot, err := ioutil.TempDir("", "driver-") validRoot, err := ioutil.TempDir("", "driver-")
c.Assert(err, check.IsNil) if err != nil {
t.Fatalf("unexpected error creating temporary directory: %v", err)
}
defer os.Remove(validRoot) defer os.Remove(validRoot)
rootedDriver, err := suite.Constructor(validRoot) rootedDriver, err := s3DriverConstructor(validRoot)
c.Assert(err, check.IsNil) if err != nil {
emptyRootDriver, err := suite.Constructor("") t.Fatalf("unexpected error creating rooted driver: %v", err)
c.Assert(err, check.IsNil) }
slashRootDriver, err := suite.Constructor("/")
c.Assert(err, check.IsNil) emptyRootDriver, err := s3DriverConstructor("")
if err != nil {
t.Fatalf("unexpected error creating empty root driver: %v", err)
}
slashRootDriver, err := s3DriverConstructor("/")
if err != nil {
t.Fatalf("unexpected error creating slash root driver: %v", err)
}
filename := "/test" filename := "/test"
contents := []byte("contents") contents := []byte("contents")
ctx := context.Background() ctx := context.Background()
err = rootedDriver.PutContent(ctx, filename, contents) err = rootedDriver.PutContent(ctx, filename, contents)
c.Assert(err, check.IsNil) if err != nil {
t.Fatalf("unexpected error creating content: %v", err)
}
defer rootedDriver.Delete(ctx, filename) defer rootedDriver.Delete(ctx, filename)
keys, err := emptyRootDriver.List(ctx, "/") keys, err := emptyRootDriver.List(ctx, "/")
for _, path := range keys { for _, path := range keys {
c.Assert(storagedriver.PathRegexp.MatchString(path), check.Equals, true) if !storagedriver.PathRegexp.MatchString(path) {
t.Fatalf("unexpected string in path: %q != %q", path, storagedriver.PathRegexp)
}
} }
keys, err = slashRootDriver.List(ctx, "/") keys, err = slashRootDriver.List(ctx, "/")
for _, path := range keys { for _, path := range keys {
c.Assert(storagedriver.PathRegexp.MatchString(path), check.Equals, true) if !storagedriver.PathRegexp.MatchString(path) {
t.Fatalf("unexpected string in path: %q != %q", path, storagedriver.PathRegexp)
}
} }
} }

View file

@ -22,9 +22,9 @@ import (
// Test hooks up gocheck into the "go test" runner. // Test hooks up gocheck into the "go test" runner.
func Test(t *testing.T) { check.TestingT(t) } func Test(t *testing.T) { check.TestingT(t) }
// RegisterInProcessSuite registers an in-process storage driver test suite with // RegisterSuite registers an in-process storage driver test suite with
// the go test runner. // the go test runner.
func RegisterInProcessSuite(driverConstructor DriverConstructor, skipCheck SkipCheck) { func RegisterSuite(driverConstructor DriverConstructor, skipCheck SkipCheck) {
check.Suite(&DriverSuite{ check.Suite(&DriverSuite{
Constructor: driverConstructor, Constructor: driverConstructor,
SkipCheck: skipCheck, SkipCheck: skipCheck,
@ -32,39 +32,6 @@ func RegisterInProcessSuite(driverConstructor DriverConstructor, skipCheck SkipC
}) })
} }
// RegisterIPCSuite registers a storage driver test suite which runs the named
// driver as a child process with the given parameters.
func RegisterIPCSuite(driverName string, ipcParams map[string]string, skipCheck SkipCheck) {
panic("ipc testing is disabled for now")
// NOTE(stevvooe): IPC testing is disabled for now. Uncomment the code
// block before and remove the panic when we phase it back in.
// suite := &DriverSuite{
// Constructor: func() (storagedriver.StorageDriver, error) {
// d, err := ipc.NewDriverClient(driverName, ipcParams)
// if err != nil {
// return nil, err
// }
// err = d.Start()
// if err != nil {
// return nil, err
// }
// return d, nil
// },
// SkipCheck: skipCheck,
// }
// suite.Teardown = func() error {
// if suite.StorageDriver == nil {
// return nil
// }
// driverClient := suite.StorageDriver.(*ipc.StorageDriverClient)
// return driverClient.Stop()
// }
// check.Suite(suite)
}
// SkipCheck is a function used to determine if a test suite should be skipped. // SkipCheck is a function used to determine if a test suite should be skipped.
// If a SkipCheck returns a non-empty skip reason, the suite is skipped with // If a SkipCheck returns a non-empty skip reason, the suite is skipped with
// the given reason. // the given reason.
@ -82,9 +49,8 @@ type DriverConstructor func() (storagedriver.StorageDriver, error)
type DriverTeardown func() error type DriverTeardown func() error
// DriverSuite is a gocheck test suite designed to test a // DriverSuite is a gocheck test suite designed to test a
// storagedriver.StorageDriver. // storagedriver.StorageDriver. The intended way to create a DriverSuite is
// The intended way to create a DriverSuite is with RegisterInProcessSuite or // with RegisterSuite.
// RegisterIPCSuite.
type DriverSuite struct { type DriverSuite struct {
Constructor DriverConstructor Constructor DriverConstructor
Teardown DriverTeardown Teardown DriverTeardown
@ -841,10 +807,6 @@ func (suite *DriverSuite) TestConcurrentStreamReads(c *check.C) {
// TestConcurrentFileStreams checks that multiple *os.File objects can be passed // TestConcurrentFileStreams checks that multiple *os.File objects can be passed
// in to WriteStream concurrently without hanging. // in to WriteStream concurrently without hanging.
func (suite *DriverSuite) TestConcurrentFileStreams(c *check.C) { func (suite *DriverSuite) TestConcurrentFileStreams(c *check.C) {
// if _, isIPC := suite.StorageDriver.(*ipc.StorageDriverClient); isIPC {
// c.Skip("Need to fix out-of-process concurrency")
// }
numStreams := 32 numStreams := 32
if testing.Short() { if testing.Short() {