pkg/beam/unix.go
Alexander Larsson bf43f17c56 beam: Add simple framing system for UnixConn
This is needed for Send/Recieve to correctly handle borders between
the messages.

The framing uses a single 32bit uint32 length for each frame, of which
the high bit is used to indicate whether the message contains a file
descriptor or not. This is enough to separate out each message sent
and to decide to which message each file descriptors belongs, even
though multiple Sends may be coalesced into a single read, and/or one
Send can be split into multiple writes.

Docker-DCO-1.1-Signed-off-by: Alexander Larsson <alexl@redhat.com> (github: alexlarsson)
Docker-DCO-1.1-Signed-off-by: Solomon Hykes <solomon@docker.com> (github: shykes)
2014-05-01 11:06:01 -07:00

317 lines
7.6 KiB
Go

package beam
import (
"bufio"
"fmt"
"net"
"os"
"syscall"
)
func debugCheckpoint(msg string, args ...interface{}) {
if os.Getenv("DEBUG") == "" {
return
}
os.Stdout.Sync()
tty, _ := os.OpenFile("/dev/tty", os.O_RDWR, 0700)
fmt.Fprintf(tty, msg, args...)
bufio.NewScanner(tty).Scan()
tty.Close()
}
type UnixConn struct {
*net.UnixConn
fds []*os.File
}
// Framing:
// In order to handle framing in Send/Recieve, as these give frame
// boundaries we use a very simple 4 bytes header. It is a big endiand
// uint32 where the high bit is set if the message includes a file
// descriptor. The rest of the uint32 is the length of the next frame.
// We need the bit in order to be able to assign recieved fds to
// the right message, as multiple messages may be coalesced into
// a single recieve operation.
func makeHeader(data []byte, fds []int) ([]byte, error) {
header := make([]byte, 4)
length := uint32(len(data))
if length > 0x7fffffff {
return nil, fmt.Errorf("Data to large")
}
if len(fds) != 0 {
length = length | 0x80000000
}
header[0] = byte((length >> 24) & 0xff)
header[1] = byte((length >> 16) & 0xff)
header[2] = byte((length >> 8) & 0xff)
header[3] = byte((length >> 0) & 0xff)
return header, nil
}
func parseHeader(header []byte) (uint32, bool) {
length := uint32(header[0])<<24 | uint32(header[1])<<16 | uint32(header[2])<<8 | uint32(header[3])
hasFd := length&0x80000000 != 0
length = length & ^uint32(0x80000000)
return length, hasFd
}
func FileConn(f *os.File) (*UnixConn, error) {
conn, err := net.FileConn(f)
if err != nil {
return nil, err
}
uconn, ok := conn.(*net.UnixConn)
if !ok {
conn.Close()
return nil, fmt.Errorf("%d: not a unix connection", f.Fd())
}
return &UnixConn{UnixConn: uconn}, nil
}
// Send sends a new message on conn with data and f as payload and
// attachment, respectively.
// On success, f is closed
func (conn *UnixConn) Send(data []byte, f *os.File) error {
{
var fd int = -1
if f != nil {
fd = int(f.Fd())
}
debugCheckpoint("===DEBUG=== about to send '%s'[%d]. Hit enter to confirm: ", data, fd)
}
var fds []int
if f != nil {
fds = append(fds, int(f.Fd()))
}
if err := conn.sendUnix(data, fds...); err != nil {
return err
}
if f != nil {
f.Close()
}
return nil
}
// Receive waits for a new message on conn, and receives its payload
// and attachment, or an error if any.
//
// If more than 1 file descriptor is sent in the message, they are all
// closed except for the first, which is the attachment.
// It is legal for a message to have no attachment or an empty payload.
func (conn *UnixConn) Receive() (rdata []byte, rf *os.File, rerr error) {
defer func() {
var fd int = -1
if rf != nil {
fd = int(rf.Fd())
}
debugCheckpoint("===DEBUG=== Receive() -> '%s'[%d]. Hit enter to continue.\n", rdata, fd)
}()
// Read header
header := make([]byte, 4)
nRead := uint32(0)
for nRead < 4 {
n, err := conn.receiveUnix(header[nRead:])
if err != nil {
return nil, nil, err
}
nRead = nRead + uint32(n)
}
length, hasFd := parseHeader(header)
if hasFd {
if len(conn.fds) == 0 {
return nil, nil, fmt.Errorf("No expected file descriptor in message")
}
rf = conn.fds[0]
conn.fds = conn.fds[1:]
}
rdata = make([]byte, length)
nRead = 0
for nRead < length {
n, err := conn.receiveUnix(rdata[nRead:])
if err != nil {
return nil, nil, err
}
nRead = nRead + uint32(n)
}
return
}
func (conn *UnixConn) receiveUnix(buf []byte) (int, error) {
oob := make([]byte, syscall.CmsgSpace(4))
bufn, oobn, _, _, err := conn.ReadMsgUnix(buf, oob)
if err != nil {
return 0, err
}
fd := extractFd(oob[:oobn])
if fd != -1 {
f := os.NewFile(uintptr(fd), "")
conn.fds = append(conn.fds, f)
}
return bufn, nil
}
func (conn *UnixConn) sendUnix(data []byte, fds ...int) error {
header, err := makeHeader(data, fds)
if err != nil {
return err
}
// There is a bug in conn.WriteMsgUnix where it doesn't correctly return
// the number of bytes writte (http://code.google.com/p/go/issues/detail?id=7645)
// So, we can't rely on the return value from it. However, we must use it to
// send the fds. In order to handle this we only write one byte using WriteMsgUnix
// (when we have to), as that can only ever block or fully suceed. We then write
// the rest with conn.Write()
// The reader side should not rely on this though, as hopefully this gets fixed
// in go later.
written := 0
if len(fds) != 0 {
oob := syscall.UnixRights(fds...)
wrote, _, err := conn.WriteMsgUnix(header[0:1], oob, nil)
if err != nil {
return err
}
written = written + wrote
}
for written < len(header) {
wrote, err := conn.Write(header[written:])
if err != nil {
return err
}
written = written + wrote
}
written = 0
for written < len(data) {
wrote, err := conn.Write(data[written:])
if err != nil {
return err
}
written = written + wrote
}
return nil
}
func extractFd(oob []byte) int {
// Grab forklock to make sure no forks accidentally inherit the new
// fds before they are made CLOEXEC
// There is a slight race condition between ReadMsgUnix returns and
// when we grap the lock, so this is not perfect. Unfortunately
// There is no way to pass MSG_CMSG_CLOEXEC to recvmsg() nor any
// way to implement non-blocking i/o in go, so this is hard to fix.
syscall.ForkLock.Lock()
defer syscall.ForkLock.Unlock()
scms, err := syscall.ParseSocketControlMessage(oob)
if err != nil {
return -1
}
foundFd := -1
for _, scm := range scms {
fds, err := syscall.ParseUnixRights(&scm)
if err != nil {
continue
}
for _, fd := range fds {
if foundFd == -1 {
syscall.CloseOnExec(fd)
foundFd = fd
} else {
syscall.Close(fd)
}
}
}
return foundFd
}
func socketpair() ([2]int, error) {
return syscall.Socketpair(syscall.AF_LOCAL, syscall.SOCK_STREAM|syscall.FD_CLOEXEC, 0)
}
// SocketPair is a convenience wrapper around the socketpair(2) syscall.
// It returns a unix socket of type SOCK_STREAM in the form of 2 file descriptors
// not bound to the underlying filesystem.
// Messages sent on one end are received on the other, and vice-versa.
// It is the caller's responsibility to close both ends.
func SocketPair() (a *os.File, b *os.File, err error) {
defer func() {
var (
fdA int = -1
fdB int = -1
)
if a != nil {
fdA = int(a.Fd())
}
if b != nil {
fdB = int(b.Fd())
}
debugCheckpoint("===DEBUG=== SocketPair() = [%d-%d]. Hit enter to confirm: ", fdA, fdB)
}()
pair, err := socketpair()
if err != nil {
return nil, nil, err
}
return os.NewFile(uintptr(pair[0]), ""), os.NewFile(uintptr(pair[1]), ""), nil
}
func USocketPair() (*UnixConn, *UnixConn, error) {
debugCheckpoint("===DEBUG=== USocketPair(). Hit enter to confirm: ")
defer debugCheckpoint("===DEBUG=== USocketPair() returned. Hit enter to confirm ")
a, b, err := SocketPair()
if err != nil {
return nil, nil, err
}
defer a.Close()
defer b.Close()
uA, err := FileConn(a)
if err != nil {
return nil, nil, err
}
uB, err := FileConn(b)
if err != nil {
uA.Close()
return nil, nil, err
}
return uA, uB, nil
}
// FdConn wraps a file descriptor in a standard *net.UnixConn object, or
// returns an error if the file descriptor does not point to a unix socket.
// This creates a duplicate file descriptor. It's the caller's responsibility
// to close both.
func FdConn(fd int) (n *net.UnixConn, err error) {
{
debugCheckpoint("===DEBUG=== FdConn([%d]) = (unknown fd). Hit enter to confirm: ", fd)
}
f := os.NewFile(uintptr(fd), fmt.Sprintf("%d", fd))
conn, err := net.FileConn(f)
if err != nil {
return nil, err
}
uconn, ok := conn.(*net.UnixConn)
if !ok {
conn.Close()
return nil, fmt.Errorf("%d: not a unix connection", fd)
}
return uconn, nil
}