Adds storage driver interface, tests, and two basic implementations

This commit is contained in:
Brian Bland 2014-10-21 15:02:20 -07:00
parent 12e68998e1
commit 3f95694180
12 changed files with 1320 additions and 0 deletions

285
storagedriver/ipc/client.go Normal file
View file

@ -0,0 +1,285 @@
package ipc
import (
"encoding/json"
"io"
"net"
"os"
"os/exec"
"path"
"syscall"
"github.com/docker/libchan"
"github.com/docker/libchan/spdy"
)
type StorageDriverClient struct {
subprocess *exec.Cmd
socket *os.File
transport *spdy.Transport
sender libchan.Sender
}
func NewDriverClient(name string, parameters map[string]string) (*StorageDriverClient, error) {
paramsBytes, err := json.Marshal(parameters)
if err != nil {
return nil, err
}
driverPath := os.ExpandEnv(path.Join("$GOPATH", "bin", name))
if _, err := os.Stat(driverPath); os.IsNotExist(err) {
driverPath = path.Join(path.Dir(os.Args[0]), name)
}
if _, err := os.Stat(driverPath); os.IsNotExist(err) {
driverPath, err = exec.LookPath(name)
if err != nil {
return nil, err
}
}
command := exec.Command(driverPath, string(paramsBytes))
return &StorageDriverClient{
subprocess: command,
}, nil
}
func (driver *StorageDriverClient) Start() error {
fileDescriptors, err := syscall.Socketpair(syscall.AF_LOCAL, syscall.SOCK_STREAM, 0)
if err != nil {
return err
}
childSocket := os.NewFile(uintptr(fileDescriptors[0]), "childSocket")
parentSocket := os.NewFile(uintptr(fileDescriptors[1]), "parentSocket")
driver.subprocess.Stdout = os.Stdout
driver.subprocess.Stderr = os.Stderr
driver.subprocess.ExtraFiles = []*os.File{childSocket}
if err = driver.subprocess.Start(); err != nil {
parentSocket.Close()
return err
}
if err = childSocket.Close(); err != nil {
parentSocket.Close()
return err
}
connection, err := net.FileConn(parentSocket)
if err != nil {
parentSocket.Close()
return err
}
transport, err := spdy.NewClientTransport(connection)
if err != nil {
parentSocket.Close()
return err
}
sender, err := transport.NewSendChannel()
if err != nil {
transport.Close()
parentSocket.Close()
return err
}
driver.socket = parentSocket
driver.transport = transport
driver.sender = sender
return nil
}
func (driver *StorageDriverClient) Stop() error {
closeSenderErr := driver.sender.Close()
closeTransportErr := driver.transport.Close()
closeSocketErr := driver.socket.Close()
killErr := driver.subprocess.Process.Kill()
if closeSenderErr != nil {
return closeSenderErr
} else if closeTransportErr != nil {
return closeTransportErr
} else if closeSocketErr != nil {
return closeSocketErr
}
return killErr
}
func (driver *StorageDriverClient) GetContent(path string) ([]byte, error) {
receiver, remoteSender := libchan.Pipe()
params := map[string]interface{}{"Path": path}
err := driver.sender.Send(&Request{Type: "GetContent", Parameters: params, ResponseChannel: remoteSender})
if err != nil {
return nil, err
}
var response GetContentResponse
err = receiver.Receive(&response)
if err != nil {
return nil, err
}
if response.Error != nil {
return nil, response.Error
}
return response.Content, nil
}
func (driver *StorageDriverClient) PutContent(path string, contents []byte) error {
receiver, remoteSender := libchan.Pipe()
params := map[string]interface{}{"Path": path, "Contents": contents}
err := driver.sender.Send(&Request{Type: "PutContent", Parameters: params, ResponseChannel: remoteSender})
if err != nil {
return err
}
var response PutContentResponse
err = receiver.Receive(&response)
if err != nil {
panic(err)
return err
}
if response.Error != nil {
return response.Error
}
return nil
}
func (driver *StorageDriverClient) ReadStream(path string, offset uint64) (io.ReadCloser, error) {
receiver, remoteSender := libchan.Pipe()
params := map[string]interface{}{"Path": path, "Offset": offset}
err := driver.sender.Send(&Request{Type: "ReadStream", Parameters: params, ResponseChannel: remoteSender})
if err != nil {
return nil, err
}
var response ReadStreamResponse
err = receiver.Receive(&response)
if err != nil {
return nil, err
}
if response.Error != nil {
return nil, response.Error
}
return response.Reader, nil
}
func (driver *StorageDriverClient) WriteStream(path string, offset, size uint64, reader io.ReadCloser) error {
receiver, remoteSender := libchan.Pipe()
params := map[string]interface{}{"Path": path, "Offset": offset, "Size": size, "Reader": WrapReadCloser(reader)}
err := driver.sender.Send(&Request{Type: "WriteStream", Parameters: params, ResponseChannel: remoteSender})
if err != nil {
return err
}
var response WriteStreamResponse
err = receiver.Receive(&response)
if err != nil {
return err
}
if response.Error != nil {
return response.Error
}
return nil
}
func (driver *StorageDriverClient) ResumeWritePosition(path string) (uint64, error) {
receiver, remoteSender := libchan.Pipe()
params := map[string]interface{}{"Path": path}
err := driver.sender.Send(&Request{Type: "ResumeWritePosition", Parameters: params, ResponseChannel: remoteSender})
if err != nil {
return 0, err
}
var response ResumeWritePositionResponse
err = receiver.Receive(&response)
if err != nil {
return 0, err
}
if response.Error != nil {
return 0, response.Error
}
return response.Position, nil
}
func (driver *StorageDriverClient) List(prefix string) ([]string, error) {
receiver, remoteSender := libchan.Pipe()
params := map[string]interface{}{"Prefix": prefix}
err := driver.sender.Send(&Request{Type: "List", Parameters: params, ResponseChannel: remoteSender})
if err != nil {
return nil, err
}
var response ListResponse
err = receiver.Receive(&response)
if err != nil {
return nil, err
}
if response.Error != nil {
return nil, response.Error
}
return response.Keys, nil
}
func (driver *StorageDriverClient) Move(sourcePath string, destPath string) error {
receiver, remoteSender := libchan.Pipe()
params := map[string]interface{}{"SourcePath": sourcePath, "DestPath": destPath}
err := driver.sender.Send(&Request{Type: "Move", Parameters: params, ResponseChannel: remoteSender})
if err != nil {
return err
}
var response MoveResponse
err = receiver.Receive(&response)
if err != nil {
return err
}
if response.Error != nil {
return response.Error
}
return nil
}
func (driver *StorageDriverClient) Delete(path string) error {
receiver, remoteSender := libchan.Pipe()
params := map[string]interface{}{"Path": path}
err := driver.sender.Send(&Request{Type: "Delete", Parameters: params, ResponseChannel: remoteSender})
if err != nil {
return err
}
var response DeleteResponse
err = receiver.Receive(&response)
if err != nil {
return err
}
if response.Error != nil {
return response.Error
}
return nil
}

83
storagedriver/ipc/ipc.go Normal file
View file

@ -0,0 +1,83 @@
package ipc
import (
"errors"
"fmt"
"io"
"reflect"
"github.com/docker/libchan"
)
type Request struct {
Type string
Parameters map[string]interface{}
ResponseChannel libchan.Sender
}
type noWriteReadWriteCloser struct {
io.ReadCloser
}
func (r noWriteReadWriteCloser) Write(p []byte) (n int, err error) {
return 0, errors.New("Write unsupported")
}
func WrapReadCloser(readCloser io.ReadCloser) io.ReadWriteCloser {
return noWriteReadWriteCloser{readCloser}
}
type responseError struct {
Type string
Message string
}
func ResponseError(err error) *responseError {
if err == nil {
return nil
}
return &responseError{
Type: reflect.TypeOf(err).String(),
Message: err.Error(),
}
}
func (err *responseError) Error() string {
return fmt.Sprintf("%s: %s", err.Type, err.Message)
}
type GetContentResponse struct {
Content []byte
Error *responseError
}
type PutContentResponse struct {
Error *responseError
}
type ReadStreamResponse struct {
Reader io.ReadWriteCloser
Error *responseError
}
type WriteStreamResponse struct {
Error *responseError
}
type ResumeWritePositionResponse struct {
Position uint64
Error *responseError
}
type ListResponse struct {
Keys []string
Error *responseError
}
type MoveResponse struct {
Error *responseError
}
type DeleteResponse struct {
Error *responseError
}

160
storagedriver/ipc/server.go Normal file
View file

@ -0,0 +1,160 @@
package ipc
import (
"io"
"net"
"os"
"github.com/docker/docker-registry/storagedriver"
"github.com/docker/libchan"
"github.com/docker/libchan/spdy"
)
func Server(driver storagedriver.StorageDriver) error {
childSocket := os.NewFile(3, "childSocket")
defer childSocket.Close()
conn, err := net.FileConn(childSocket)
if err != nil {
panic(err)
}
defer conn.Close()
if transport, err := spdy.NewServerTransport(conn); err != nil {
panic(err)
} else {
for {
receiver, err := transport.WaitReceiveChannel()
if err != nil {
panic(err)
}
go receive(driver, receiver)
}
return nil
}
}
func receive(driver storagedriver.StorageDriver, receiver libchan.Receiver) {
for {
var request Request
err := receiver.Receive(&request)
if err != nil {
panic(err)
}
go handleRequest(driver, request)
}
}
func handleRequest(driver storagedriver.StorageDriver, request Request) {
switch request.Type {
case "GetContent":
path, _ := request.Parameters["Path"].(string)
content, err := driver.GetContent(path)
response := GetContentResponse{
Content: content,
Error: ResponseError(err),
}
err = request.ResponseChannel.Send(&response)
if err != nil {
panic(err)
}
case "PutContent":
path, _ := request.Parameters["Path"].(string)
contents, _ := request.Parameters["Contents"].([]byte)
err := driver.PutContent(path, contents)
response := PutContentResponse{
Error: ResponseError(err),
}
err = request.ResponseChannel.Send(&response)
if err != nil {
panic(err)
}
case "ReadStream":
var offset uint64
path, _ := request.Parameters["Path"].(string)
offset, ok := request.Parameters["Offset"].(uint64)
if !ok {
offsetSigned, _ := request.Parameters["Offset"].(int64)
offset = uint64(offsetSigned)
}
reader, err := driver.ReadStream(path, offset)
var response ReadStreamResponse
if err != nil {
response = ReadStreamResponse{Error: ResponseError(err)}
} else {
response = ReadStreamResponse{Reader: WrapReadCloser(reader)}
}
err = request.ResponseChannel.Send(&response)
if err != nil {
panic(err)
}
case "WriteStream":
var offset uint64
path, _ := request.Parameters["Path"].(string)
offset, ok := request.Parameters["Offset"].(uint64)
if !ok {
offsetSigned, _ := request.Parameters["Offset"].(int64)
offset = uint64(offsetSigned)
}
size, ok := request.Parameters["Size"].(uint64)
if !ok {
sizeSigned, _ := request.Parameters["Size"].(int64)
size = uint64(sizeSigned)
}
reader, _ := request.Parameters["Reader"].(io.ReadCloser)
err := driver.WriteStream(path, offset, size, reader)
response := WriteStreamResponse{
Error: ResponseError(err),
}
err = request.ResponseChannel.Send(&response)
if err != nil {
panic(err)
}
case "ResumeWritePosition":
path, _ := request.Parameters["Path"].(string)
position, err := driver.ResumeWritePosition(path)
response := ResumeWritePositionResponse{
Position: position,
Error: ResponseError(err),
}
err = request.ResponseChannel.Send(&response)
if err != nil {
panic(err)
}
case "List":
prefix, _ := request.Parameters["Prefix"].(string)
keys, err := driver.List(prefix)
response := ListResponse{
Keys: keys,
Error: ResponseError(err),
}
err = request.ResponseChannel.Send(&response)
if err != nil {
panic(err)
}
case "Move":
sourcePath, _ := request.Parameters["SourcePath"].(string)
destPath, _ := request.Parameters["DestPath"].(string)
err := driver.Move(sourcePath, destPath)
response := MoveResponse{
Error: ResponseError(err),
}
err = request.ResponseChannel.Send(&response)
if err != nil {
panic(err)
}
case "Delete":
path, _ := request.Parameters["Path"].(string)
err := driver.Delete(path)
response := DeleteResponse{
Error: ResponseError(err),
}
err = request.ResponseChannel.Send(&response)
if err != nil {
panic(err)
}
default:
panic(request)
}
}