remove unused beam, will be back later as libchan
Docker-DCO-1.1-Signed-off-by: Victor Vieux <vieux@docker.com> (github: vieux)
This commit is contained in:
parent
b8d9b7615f
commit
dd675e63ba
28 changed files with 0 additions and 2695 deletions
|
@ -1 +0,0 @@
|
|||
Solomon Hykes <solomon@docker.com> (@shykes)
|
166
beam/beam.go
166
beam/beam.go
|
@ -1,166 +0,0 @@
|
|||
package beam
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
)
|
||||
|
||||
type Sender interface {
|
||||
Send([]byte, *os.File) error
|
||||
}
|
||||
|
||||
type Receiver interface {
|
||||
Receive() ([]byte, *os.File, error)
|
||||
}
|
||||
|
||||
type ReceiveCloser interface {
|
||||
Receiver
|
||||
Close() error
|
||||
}
|
||||
|
||||
type SendCloser interface {
|
||||
Sender
|
||||
Close() error
|
||||
}
|
||||
|
||||
type ReceiveSender interface {
|
||||
Receiver
|
||||
Sender
|
||||
}
|
||||
|
||||
const (
|
||||
R = iota
|
||||
W
|
||||
)
|
||||
|
||||
func sendPipe(dst Sender, data []byte, mode int) (*os.File, error) {
|
||||
r, w, err := os.Pipe()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var (
|
||||
remote *os.File
|
||||
local *os.File
|
||||
)
|
||||
if mode == R {
|
||||
remote = r
|
||||
local = w
|
||||
} else if mode == W {
|
||||
remote = w
|
||||
local = r
|
||||
}
|
||||
if err := dst.Send(data, remote); err != nil {
|
||||
local.Close()
|
||||
remote.Close()
|
||||
return nil, err
|
||||
}
|
||||
return local, nil
|
||||
|
||||
}
|
||||
|
||||
// SendRPipe create a pipe and sends its *read* end attached in a beam message
|
||||
// to `dst`, with `data` as the message payload.
|
||||
// It returns the *write* end of the pipe, or an error.
|
||||
func SendRPipe(dst Sender, data []byte) (*os.File, error) {
|
||||
return sendPipe(dst, data, R)
|
||||
}
|
||||
|
||||
// SendWPipe create a pipe and sends its *read* end attached in a beam message
|
||||
// to `dst`, with `data` as the message payload.
|
||||
// It returns the *write* end of the pipe, or an error.
|
||||
func SendWPipe(dst Sender, data []byte) (*os.File, error) {
|
||||
return sendPipe(dst, data, W)
|
||||
}
|
||||
|
||||
func SendConn(dst Sender, data []byte) (conn *UnixConn, err error) {
|
||||
local, remote, err := SocketPair()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer func() {
|
||||
if err != nil {
|
||||
local.Close()
|
||||
remote.Close()
|
||||
}
|
||||
}()
|
||||
conn, err = FileConn(local)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
local.Close()
|
||||
if err := dst.Send(data, remote); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
func ReceiveConn(src Receiver) ([]byte, *UnixConn, error) {
|
||||
for {
|
||||
data, f, err := src.Receive()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
if f == nil {
|
||||
// Skip empty attachments
|
||||
continue
|
||||
}
|
||||
conn, err := FileConn(f)
|
||||
if err != nil {
|
||||
// Skip beam attachments which are not connections
|
||||
// (for example might be a regular file, directory etc)
|
||||
continue
|
||||
}
|
||||
return data, conn, nil
|
||||
}
|
||||
panic("impossibru!")
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
func Copy(dst Sender, src Receiver) (int, error) {
|
||||
var n int
|
||||
for {
|
||||
payload, attachment, err := src.Receive()
|
||||
if err == io.EOF {
|
||||
return n, nil
|
||||
} else if err != nil {
|
||||
return n, err
|
||||
}
|
||||
if err := dst.Send(payload, attachment); err != nil {
|
||||
if attachment != nil {
|
||||
attachment.Close()
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
n++
|
||||
}
|
||||
panic("impossibru!")
|
||||
return n, nil
|
||||
}
|
||||
|
||||
// MsgDesc returns a human readable description of a beam message, usually
|
||||
// for debugging purposes.
|
||||
func MsgDesc(payload []byte, attachment *os.File) string {
|
||||
var filedesc string = "<nil>"
|
||||
if attachment != nil {
|
||||
filedesc = fmt.Sprintf("%d", attachment.Fd())
|
||||
}
|
||||
return fmt.Sprintf("'%s'[%s]", payload, filedesc)
|
||||
}
|
||||
|
||||
type devnull struct{}
|
||||
|
||||
func Devnull() ReceiveSender {
|
||||
return devnull{}
|
||||
}
|
||||
|
||||
func (d devnull) Send(p []byte, a *os.File) error {
|
||||
if a != nil {
|
||||
a.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d devnull) Receive() ([]byte, *os.File, error) {
|
||||
return nil, nil, io.EOF
|
||||
}
|
|
@ -1,39 +0,0 @@
|
|||
package beam
|
||||
|
||||
import (
|
||||
"github.com/dotcloud/docker/pkg/beam/data"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestSendConn(t *testing.T) {
|
||||
a, b, err := USocketPair()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer a.Close()
|
||||
defer b.Close()
|
||||
go func() {
|
||||
conn, err := SendConn(a, data.Empty().Set("type", "connection").Bytes())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := conn.Send(data.Empty().Set("foo", "bar").Bytes(), nil); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
conn.CloseWrite()
|
||||
}()
|
||||
payload, conn, err := ReceiveConn(b)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if val := data.Message(string(payload)).Get("type"); val == nil || val[0] != "connection" {
|
||||
t.Fatalf("%v != %v\n", val, "connection")
|
||||
}
|
||||
msg, _, err := conn.Receive()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if val := data.Message(string(msg)).Get("foo"); val == nil || val[0] != "bar" {
|
||||
t.Fatalf("%v != %v\n", val, "bar")
|
||||
}
|
||||
}
|
|
@ -1,115 +0,0 @@
|
|||
package data
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
||||
func Encode(obj map[string][]string) string {
|
||||
var msg string
|
||||
msg += encodeHeader(0)
|
||||
for k, values := range obj {
|
||||
msg += encodeNamedList(k, values)
|
||||
}
|
||||
return msg
|
||||
}
|
||||
|
||||
func encodeHeader(msgtype int) string {
|
||||
return fmt.Sprintf("%03.3d;", msgtype)
|
||||
}
|
||||
|
||||
func encodeString(s string) string {
|
||||
return fmt.Sprintf("%d:%s,", len(s), s)
|
||||
}
|
||||
|
||||
var EncodeString = encodeString
|
||||
var DecodeString = decodeString
|
||||
|
||||
func encodeList(l []string) string {
|
||||
values := make([]string, 0, len(l))
|
||||
for _, s := range l {
|
||||
values = append(values, encodeString(s))
|
||||
}
|
||||
return encodeString(strings.Join(values, ""))
|
||||
}
|
||||
|
||||
func encodeNamedList(name string, l []string) string {
|
||||
return encodeString(name) + encodeList(l)
|
||||
}
|
||||
|
||||
func Decode(msg string) (map[string][]string, error) {
|
||||
msgtype, skip, err := decodeHeader(msg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if msgtype != 0 {
|
||||
// FIXME: use special error type so the caller can easily ignore
|
||||
return nil, fmt.Errorf("unknown message type: %d", msgtype)
|
||||
}
|
||||
msg = msg[skip:]
|
||||
obj := make(map[string][]string)
|
||||
for len(msg) > 0 {
|
||||
k, skip, err := decodeString(msg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
msg = msg[skip:]
|
||||
values, skip, err := decodeList(msg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
msg = msg[skip:]
|
||||
obj[k] = values
|
||||
}
|
||||
return obj, nil
|
||||
}
|
||||
|
||||
func decodeList(msg string) ([]string, int, error) {
|
||||
blob, skip, err := decodeString(msg)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
var l []string
|
||||
for len(blob) > 0 {
|
||||
v, skipv, err := decodeString(blob)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
l = append(l, v)
|
||||
blob = blob[skipv:]
|
||||
}
|
||||
return l, skip, nil
|
||||
}
|
||||
|
||||
func decodeString(msg string) (string, int, error) {
|
||||
parts := strings.SplitN(msg, ":", 2)
|
||||
if len(parts) != 2 {
|
||||
return "", 0, fmt.Errorf("invalid format: no column")
|
||||
}
|
||||
var length int
|
||||
if l, err := strconv.ParseUint(parts[0], 10, 64); err != nil {
|
||||
return "", 0, err
|
||||
} else {
|
||||
length = int(l)
|
||||
}
|
||||
if len(parts[1]) < length+1 {
|
||||
return "", 0, fmt.Errorf("message '%s' is %d bytes, expected at least %d", parts[1], len(parts[1]), length+1)
|
||||
}
|
||||
payload := parts[1][:length+1]
|
||||
if payload[length] != ',' {
|
||||
return "", 0, fmt.Errorf("message is not comma-terminated")
|
||||
}
|
||||
return payload[:length], len(parts[0]) + 1 + length + 1, nil
|
||||
}
|
||||
|
||||
func decodeHeader(msg string) (int, int, error) {
|
||||
if len(msg) < 4 {
|
||||
return 0, 0, fmt.Errorf("message too small")
|
||||
}
|
||||
msgtype, err := strconv.ParseInt(msg[:3], 10, 32)
|
||||
if err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
return int(msgtype), 4, nil
|
||||
}
|
|
@ -1,129 +0,0 @@
|
|||
package data
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestEncodeHelloWorld(t *testing.T) {
|
||||
input := "hello world!"
|
||||
output := encodeString(input)
|
||||
expectedOutput := "12:hello world!,"
|
||||
if output != expectedOutput {
|
||||
t.Fatalf("'%v' != '%v'", output, expectedOutput)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEncodeEmptyString(t *testing.T) {
|
||||
input := ""
|
||||
output := encodeString(input)
|
||||
expectedOutput := "0:,"
|
||||
if output != expectedOutput {
|
||||
t.Fatalf("'%v' != '%v'", output, expectedOutput)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEncodeEmptyList(t *testing.T) {
|
||||
input := []string{}
|
||||
output := encodeList(input)
|
||||
expectedOutput := "0:,"
|
||||
if output != expectedOutput {
|
||||
t.Fatalf("'%v' != '%v'", output, expectedOutput)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEncodeEmptyMap(t *testing.T) {
|
||||
input := make(map[string][]string)
|
||||
output := Encode(input)
|
||||
expectedOutput := "000;"
|
||||
if output != expectedOutput {
|
||||
t.Fatalf("'%v' != '%v'", output, expectedOutput)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEncode1Key1Value(t *testing.T) {
|
||||
input := make(map[string][]string)
|
||||
input["hello"] = []string{"world"}
|
||||
output := Encode(input)
|
||||
expectedOutput := "000;5:hello,8:5:world,,"
|
||||
if output != expectedOutput {
|
||||
t.Fatalf("'%v' != '%v'", output, expectedOutput)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEncode1Key2Value(t *testing.T) {
|
||||
input := make(map[string][]string)
|
||||
input["hello"] = []string{"beautiful", "world"}
|
||||
output := Encode(input)
|
||||
expectedOutput := "000;5:hello,20:9:beautiful,5:world,,"
|
||||
if output != expectedOutput {
|
||||
t.Fatalf("'%v' != '%v'", output, expectedOutput)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEncodeEmptyValue(t *testing.T) {
|
||||
input := make(map[string][]string)
|
||||
input["foo"] = []string{}
|
||||
output := Encode(input)
|
||||
expectedOutput := "000;3:foo,0:,"
|
||||
if output != expectedOutput {
|
||||
t.Fatalf("'%v' != '%v'", output, expectedOutput)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEncodeBinaryKey(t *testing.T) {
|
||||
input := make(map[string][]string)
|
||||
input["foo\x00bar\x7f"] = []string{}
|
||||
output := Encode(input)
|
||||
expectedOutput := "000;8:foo\x00bar\x7f,0:,"
|
||||
if output != expectedOutput {
|
||||
t.Fatalf("'%v' != '%v'", output, expectedOutput)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEncodeBinaryValue(t *testing.T) {
|
||||
input := make(map[string][]string)
|
||||
input["foo\x00bar\x7f"] = []string{"\x01\x02\x03\x04"}
|
||||
output := Encode(input)
|
||||
expectedOutput := "000;8:foo\x00bar\x7f,7:4:\x01\x02\x03\x04,,"
|
||||
if output != expectedOutput {
|
||||
t.Fatalf("'%v' != '%v'", output, expectedOutput)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDecodeString(t *testing.T) {
|
||||
validEncodedStrings := []struct {
|
||||
input string
|
||||
output string
|
||||
skip int
|
||||
}{
|
||||
{"3:foo,", "foo", 6},
|
||||
{"5:hello,", "hello", 8},
|
||||
{"5:hello,5:world,", "hello", 8},
|
||||
}
|
||||
for _, sample := range validEncodedStrings {
|
||||
output, skip, err := decodeString(sample.input)
|
||||
if err != nil {
|
||||
t.Fatalf("error decoding '%v': %v", sample.input, err)
|
||||
}
|
||||
if skip != sample.skip {
|
||||
t.Fatalf("invalid skip: %v!=%v", skip, sample.skip)
|
||||
}
|
||||
if output != sample.output {
|
||||
t.Fatalf("invalid output: %v!=%v", output, sample.output)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestDecode1Key1Value(t *testing.T) {
|
||||
input := "000;3:foo,6:3:bar,,"
|
||||
output, err := Decode(input)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if v, exists := output["foo"]; !exists {
|
||||
t.Fatalf("wrong output: %v\n", output)
|
||||
} else if len(v) != 1 || strings.Join(v, "") != "bar" {
|
||||
t.Fatalf("wrong output: %v\n", output)
|
||||
}
|
||||
}
|
|
@ -1,103 +0,0 @@
|
|||
package data
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type Message string
|
||||
|
||||
func Empty() Message {
|
||||
return Message(Encode(nil))
|
||||
}
|
||||
|
||||
func Parse(args []string) Message {
|
||||
data := make(map[string][]string)
|
||||
for _, word := range args {
|
||||
if strings.Contains(word, "=") {
|
||||
kv := strings.SplitN(word, "=", 2)
|
||||
key := kv[0]
|
||||
var val string
|
||||
if len(kv) == 2 {
|
||||
val = kv[1]
|
||||
}
|
||||
data[key] = []string{val}
|
||||
}
|
||||
}
|
||||
return Message(Encode(data))
|
||||
}
|
||||
|
||||
func (m Message) Add(k, v string) Message {
|
||||
data, err := Decode(string(m))
|
||||
if err != nil {
|
||||
return m
|
||||
}
|
||||
if values, exists := data[k]; exists {
|
||||
data[k] = append(values, v)
|
||||
} else {
|
||||
data[k] = []string{v}
|
||||
}
|
||||
return Message(Encode(data))
|
||||
}
|
||||
|
||||
func (m Message) Set(k string, v ...string) Message {
|
||||
data, err := Decode(string(m))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
return m
|
||||
}
|
||||
data[k] = v
|
||||
return Message(Encode(data))
|
||||
}
|
||||
|
||||
func (m Message) Del(k string) Message {
|
||||
data, err := Decode(string(m))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
return m
|
||||
}
|
||||
delete(data, k)
|
||||
return Message(Encode(data))
|
||||
}
|
||||
|
||||
func (m Message) Get(k string) []string {
|
||||
data, err := Decode(string(m))
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
v, exists := data[k]
|
||||
if !exists {
|
||||
return nil
|
||||
}
|
||||
return v
|
||||
}
|
||||
|
||||
// GetOne returns the last value added at the key k,
|
||||
// or an empty string if there is no value.
|
||||
func (m Message) GetOne(k string) string {
|
||||
var v string
|
||||
if vals := m.Get(k); len(vals) > 0 {
|
||||
v = vals[len(vals)-1]
|
||||
}
|
||||
return v
|
||||
}
|
||||
|
||||
func (m Message) Pretty() string {
|
||||
data, err := Decode(string(m))
|
||||
if err != nil {
|
||||
return ""
|
||||
}
|
||||
entries := make([]string, 0, len(data))
|
||||
for k, values := range data {
|
||||
entries = append(entries, fmt.Sprintf("%s=%s", k, strings.Join(values, ",")))
|
||||
}
|
||||
return strings.Join(entries, " ")
|
||||
}
|
||||
|
||||
func (m Message) String() string {
|
||||
return string(m)
|
||||
}
|
||||
|
||||
func (m Message) Bytes() []byte {
|
||||
return []byte(m)
|
||||
}
|
|
@ -1,61 +0,0 @@
|
|||
package data
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestEmptyMessage(t *testing.T) {
|
||||
m := Empty()
|
||||
if m.String() != Encode(nil) {
|
||||
t.Fatalf("%v != %v", m.String(), Encode(nil))
|
||||
}
|
||||
}
|
||||
|
||||
func TestSetMessage(t *testing.T) {
|
||||
m := Empty().Set("foo", "bar")
|
||||
output := m.String()
|
||||
expectedOutput := "000;3:foo,6:3:bar,,"
|
||||
if output != expectedOutput {
|
||||
t.Fatalf("'%v' != '%v'", output, expectedOutput)
|
||||
}
|
||||
decodedOutput, err := Decode(output)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(decodedOutput) != 1 {
|
||||
t.Fatalf("wrong output data: %#v\n", decodedOutput)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSetMessageTwice(t *testing.T) {
|
||||
m := Empty().Set("foo", "bar").Set("ga", "bu")
|
||||
output := m.String()
|
||||
expectedOutput := "000;3:foo,6:3:bar,,2:ga,5:2:bu,,"
|
||||
if output != expectedOutput {
|
||||
t.Fatalf("'%v' != '%v'", output, expectedOutput)
|
||||
}
|
||||
decodedOutput, err := Decode(output)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(decodedOutput) != 2 {
|
||||
t.Fatalf("wrong output data: %#v\n", decodedOutput)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSetDelMessage(t *testing.T) {
|
||||
m := Empty().Set("foo", "bar").Del("foo")
|
||||
output := m.String()
|
||||
expectedOutput := Encode(nil)
|
||||
if output != expectedOutput {
|
||||
t.Fatalf("'%v' != '%v'", output, expectedOutput)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetOne(t *testing.T) {
|
||||
m := Empty().Set("shadok words", "ga", "bu", "zo", "meu")
|
||||
val := m.GetOne("shadok words")
|
||||
if val != "meu" {
|
||||
t.Fatalf("%#v", val)
|
||||
}
|
||||
}
|
|
@ -1,92 +0,0 @@
|
|||
##
|
||||
## Netstrings spec copied as-is from http://cr.yp.to/proto/netstrings.txt
|
||||
##
|
||||
|
||||
Netstrings
|
||||
D. J. Bernstein, djb@pobox.com
|
||||
19970201
|
||||
|
||||
|
||||
1. Introduction
|
||||
|
||||
A netstring is a self-delimiting encoding of a string. Netstrings are
|
||||
very easy to generate and to parse. Any string may be encoded as a
|
||||
netstring; there are no restrictions on length or on allowed bytes.
|
||||
Another virtue of a netstring is that it declares the string size up
|
||||
front. Thus an application can check in advance whether it has enough
|
||||
space to store the entire string.
|
||||
|
||||
Netstrings may be used as a basic building block for reliable network
|
||||
protocols. Most high-level protocols, in effect, transmit a sequence
|
||||
of strings; those strings may be encoded as netstrings and then
|
||||
concatenated into a sequence of characters, which in turn may be
|
||||
transmitted over a reliable stream protocol such as TCP.
|
||||
|
||||
Note that netstrings can be used recursively. The result of encoding
|
||||
a sequence of strings is a single string. A series of those encoded
|
||||
strings may in turn be encoded into a single string. And so on.
|
||||
|
||||
In this document, a string of 8-bit bytes may be written in two
|
||||
different forms: as a series of hexadecimal numbers between angle
|
||||
brackets, or as a sequence of ASCII characters between double quotes.
|
||||
For example, <68 65 6c 6c 6f 20 77 6f 72 6c 64 21> is a string of
|
||||
length 12; it is the same as the string "hello world!".
|
||||
|
||||
Although this document restricts attention to strings of 8-bit bytes,
|
||||
netstrings could be used with any 6-bit-or-larger character set.
|
||||
|
||||
|
||||
2. Definition
|
||||
|
||||
Any string of 8-bit bytes may be encoded as [len]":"[string]",".
|
||||
Here [string] is the string and [len] is a nonempty sequence of ASCII
|
||||
digits giving the length of [string] in decimal. The ASCII digits are
|
||||
<30> for 0, <31> for 1, and so on up through <39> for 9. Extra zeros
|
||||
at the front of [len] are prohibited: [len] begins with <30> exactly
|
||||
when [string] is empty.
|
||||
|
||||
For example, the string "hello world!" is encoded as <31 32 3a 68
|
||||
65 6c 6c 6f 20 77 6f 72 6c 64 21 2c>, i.e., "12:hello world!,". The
|
||||
empty string is encoded as "0:,".
|
||||
|
||||
[len]":"[string]"," is called a netstring. [string] is called the
|
||||
interpretation of the netstring.
|
||||
|
||||
|
||||
3. Sample code
|
||||
|
||||
The following C code starts with a buffer buf of length len and
|
||||
prints it as a netstring.
|
||||
|
||||
if (printf("%lu:",len) < 0) barf();
|
||||
if (fwrite(buf,1,len,stdout) < len) barf();
|
||||
if (putchar(',') < 0) barf();
|
||||
|
||||
The following C code reads a netstring and decodes it into a
|
||||
dynamically allocated buffer buf of length len.
|
||||
|
||||
if (scanf("%9lu",&len) < 1) barf(); /* >999999999 bytes is bad */
|
||||
if (getchar() != ':') barf();
|
||||
buf = malloc(len + 1); /* malloc(0) is not portable */
|
||||
if (!buf) barf();
|
||||
if (fread(buf,1,len,stdin) < len) barf();
|
||||
if (getchar() != ',') barf();
|
||||
|
||||
Both of these code fragments assume that the local character set is
|
||||
ASCII, and that the relevant stdio streams are in binary mode.
|
||||
|
||||
|
||||
4. Security considerations
|
||||
|
||||
The famous Finger security hole may be blamed on Finger's use of the
|
||||
CRLF encoding. In that encoding, each string is simply terminated by
|
||||
CRLF. This encoding has several problems. Most importantly, it does
|
||||
not declare the string size in advance. This means that a correct
|
||||
CRLF parser must be prepared to ask for more and more memory as it is
|
||||
reading the string. In the case of Finger, a lazy implementor found
|
||||
this to be too much trouble; instead he simply declared a fixed-size
|
||||
buffer and used C's gets() function. The rest is history.
|
||||
|
||||
In contrast, as the above sample code shows, it is very easy to
|
||||
handle netstrings without risking buffer overflow. Thus widespread
|
||||
use of netstrings may improve network security.
|
Binary file not shown.
|
@ -1,542 +0,0 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"flag"
|
||||
"fmt"
|
||||
"github.com/dotcloud/docker/pkg/beam"
|
||||
"github.com/dotcloud/docker/pkg/beam/data"
|
||||
"github.com/dotcloud/docker/pkg/dockerscript"
|
||||
"github.com/dotcloud/docker/pkg/term"
|
||||
"io"
|
||||
"net"
|
||||
"net/url"
|
||||
"os"
|
||||
"path"
|
||||
"strings"
|
||||
"sync"
|
||||
)
|
||||
|
||||
var rootPlugins = []string{
|
||||
"stdio",
|
||||
}
|
||||
|
||||
var (
|
||||
flX bool
|
||||
flPing bool
|
||||
introspect beam.ReceiveSender = beam.Devnull()
|
||||
)
|
||||
|
||||
func main() {
|
||||
fd3 := os.NewFile(3, "beam-introspect")
|
||||
if introsp, err := beam.FileConn(fd3); err == nil {
|
||||
introspect = introsp
|
||||
Logf("introspection enabled\n")
|
||||
} else {
|
||||
Logf("introspection disabled\n")
|
||||
}
|
||||
fd3.Close()
|
||||
flag.BoolVar(&flX, "x", false, "print commands as they are being executed")
|
||||
flag.Parse()
|
||||
if flag.NArg() == 0 {
|
||||
if term.IsTerminal(0) {
|
||||
// No arguments, stdin is terminal --> interactive mode
|
||||
input := bufio.NewScanner(os.Stdin)
|
||||
for {
|
||||
fmt.Printf("[%d] beamsh> ", os.Getpid())
|
||||
if !input.Scan() {
|
||||
break
|
||||
}
|
||||
line := input.Text()
|
||||
if len(line) != 0 {
|
||||
cmd, err := dockerscript.Parse(strings.NewReader(line))
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "error: %v\n", err)
|
||||
continue
|
||||
}
|
||||
if err := executeRootScript(cmd); err != nil {
|
||||
Fatal(err)
|
||||
}
|
||||
}
|
||||
if err := input.Err(); err == io.EOF {
|
||||
break
|
||||
} else if err != nil {
|
||||
Fatal(err)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// No arguments, stdin not terminal --> batch mode
|
||||
script, err := dockerscript.Parse(os.Stdin)
|
||||
if err != nil {
|
||||
Fatal("parse error: %v\n", err)
|
||||
}
|
||||
if err := executeRootScript(script); err != nil {
|
||||
Fatal(err)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// 1+ arguments: parse them as script files
|
||||
for _, scriptpath := range flag.Args() {
|
||||
f, err := os.Open(scriptpath)
|
||||
if err != nil {
|
||||
Fatal(err)
|
||||
}
|
||||
script, err := dockerscript.Parse(f)
|
||||
if err != nil {
|
||||
Fatal("parse error: %v\n", err)
|
||||
}
|
||||
if err := executeRootScript(script); err != nil {
|
||||
Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func executeRootScript(script []*dockerscript.Command) error {
|
||||
if len(rootPlugins) > 0 {
|
||||
// If there are root plugins, wrap the script inside them
|
||||
var (
|
||||
rootCmd *dockerscript.Command
|
||||
lastCmd *dockerscript.Command
|
||||
)
|
||||
for _, plugin := range rootPlugins {
|
||||
pluginCmd := &dockerscript.Command{
|
||||
Args: []string{plugin},
|
||||
}
|
||||
if rootCmd == nil {
|
||||
rootCmd = pluginCmd
|
||||
} else {
|
||||
lastCmd.Children = []*dockerscript.Command{pluginCmd}
|
||||
}
|
||||
lastCmd = pluginCmd
|
||||
}
|
||||
lastCmd.Children = script
|
||||
script = []*dockerscript.Command{rootCmd}
|
||||
}
|
||||
handlers, err := Handlers(introspect)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer handlers.Close()
|
||||
var tasks sync.WaitGroup
|
||||
defer func() {
|
||||
Debugf("Waiting for introspection...\n")
|
||||
tasks.Wait()
|
||||
Debugf("DONE Waiting for introspection\n")
|
||||
}()
|
||||
if introspect != nil {
|
||||
tasks.Add(1)
|
||||
go func() {
|
||||
Debugf("starting introspection\n")
|
||||
defer Debugf("done with introspection\n")
|
||||
defer tasks.Done()
|
||||
introspect.Send(data.Empty().Set("cmd", "log", "stdout").Set("message", "introspection worked!").Bytes(), nil)
|
||||
Debugf("XXX starting reading introspection messages\n")
|
||||
r := beam.NewRouter(handlers)
|
||||
r.NewRoute().All().Handler(func(p []byte, a *os.File) error {
|
||||
Logf("[INTROSPECTION] %s\n", beam.MsgDesc(p, a))
|
||||
return handlers.Send(p, a)
|
||||
})
|
||||
n, err := beam.Copy(r, introspect)
|
||||
Debugf("XXX done reading %d introspection messages: %v\n", n, err)
|
||||
}()
|
||||
}
|
||||
if err := executeScript(handlers, script); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func executeScript(out beam.Sender, script []*dockerscript.Command) error {
|
||||
Debugf("executeScript(%s)\n", scriptString(script))
|
||||
defer Debugf("executeScript(%s) DONE\n", scriptString(script))
|
||||
var background sync.WaitGroup
|
||||
defer background.Wait()
|
||||
for _, cmd := range script {
|
||||
if cmd.Background {
|
||||
background.Add(1)
|
||||
go func(out beam.Sender, cmd *dockerscript.Command) {
|
||||
executeCommand(out, cmd)
|
||||
background.Done()
|
||||
}(out, cmd)
|
||||
} else {
|
||||
if err := executeCommand(out, cmd); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// 1) Find a handler for the command (if no handler, fail)
|
||||
// 2) Attach new in & out pair to the handler
|
||||
// 3) [in the background] Copy handler output to our own output
|
||||
// 4) [in the background] Run the handler
|
||||
// 5) Recursively executeScript() all children commands and wait for them to complete
|
||||
// 6) Wait for handler to return and (shortly afterwards) output copy to complete
|
||||
// 7) Profit
|
||||
func executeCommand(out beam.Sender, cmd *dockerscript.Command) error {
|
||||
if flX {
|
||||
fmt.Printf("+ %v\n", strings.Replace(strings.TrimRight(cmd.String(), "\n"), "\n", "\n+ ", -1))
|
||||
}
|
||||
Debugf("executeCommand(%s)\n", strings.Join(cmd.Args, " "))
|
||||
defer Debugf("executeCommand(%s) DONE\n", strings.Join(cmd.Args, " "))
|
||||
if len(cmd.Args) == 0 {
|
||||
return fmt.Errorf("empty command")
|
||||
}
|
||||
Debugf("[executeCommand] sending job '%s'\n", strings.Join(cmd.Args, " "))
|
||||
job, err := beam.SendConn(out, data.Empty().Set("cmd", cmd.Args...).Set("type", "job").Bytes())
|
||||
if err != nil {
|
||||
return fmt.Errorf("%v\n", err)
|
||||
}
|
||||
var tasks sync.WaitGroup
|
||||
tasks.Add(1)
|
||||
Debugf("[executeCommand] spawning background copy of the output of '%s'\n", strings.Join(cmd.Args, " "))
|
||||
go func() {
|
||||
if out != nil {
|
||||
Debugf("[executeCommand] background copy of the output of '%s'\n", strings.Join(cmd.Args, " "))
|
||||
n, err := beam.Copy(out, job)
|
||||
if err != nil {
|
||||
Fatalf("[executeCommand] [%s] error during background copy: %v\n", strings.Join(cmd.Args, " "), err)
|
||||
}
|
||||
Debugf("[executeCommand] background copy done of the output of '%s': copied %d messages\n", strings.Join(cmd.Args, " "), n)
|
||||
}
|
||||
tasks.Done()
|
||||
}()
|
||||
// depth-first execution of children commands
|
||||
// executeScript() blocks until all commands are completed
|
||||
Debugf("[executeCommand] recursively running children of '%s'\n", strings.Join(cmd.Args, " "))
|
||||
executeScript(job, cmd.Children)
|
||||
Debugf("[executeCommand] DONE recursively running children of '%s'\n", strings.Join(cmd.Args, " "))
|
||||
job.CloseWrite()
|
||||
Debugf("[executeCommand] closing the input of '%s' (all children are completed)\n", strings.Join(cmd.Args, " "))
|
||||
Debugf("[executeCommand] waiting for background copy of '%s' to complete...\n", strings.Join(cmd.Args, " "))
|
||||
tasks.Wait()
|
||||
Debugf("[executeCommand] background copy of '%s' complete! This means the job completed.\n", strings.Join(cmd.Args, " "))
|
||||
return nil
|
||||
}
|
||||
|
||||
type Handler func([]string, io.Writer, io.Writer, beam.Receiver, beam.Sender)
|
||||
|
||||
func Handlers(sink beam.Sender) (*beam.UnixConn, error) {
|
||||
var tasks sync.WaitGroup
|
||||
pub, priv, err := beam.USocketPair()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
go func() {
|
||||
defer func() {
|
||||
Debugf("[handlers] closewrite() on endpoint\n")
|
||||
// FIXME: this is not yet necessary but will be once
|
||||
// there is synchronization over standard beam messages
|
||||
priv.CloseWrite()
|
||||
Debugf("[handlers] done closewrite() on endpoint\n")
|
||||
}()
|
||||
r := beam.NewRouter(sink)
|
||||
r.NewRoute().HasAttachment().KeyIncludes("type", "job").Handler(func(payload []byte, attachment *os.File) error {
|
||||
conn, err := beam.FileConn(attachment)
|
||||
if err != nil {
|
||||
attachment.Close()
|
||||
return err
|
||||
}
|
||||
// attachment.Close()
|
||||
tasks.Add(1)
|
||||
go func() {
|
||||
defer tasks.Done()
|
||||
defer func() {
|
||||
Debugf("[handlers] '%s' closewrite\n", payload)
|
||||
conn.CloseWrite()
|
||||
Debugf("[handlers] '%s' done closewrite\n", payload)
|
||||
}()
|
||||
cmd := data.Message(payload).Get("cmd")
|
||||
Debugf("[handlers] received %s\n", strings.Join(cmd, " "))
|
||||
if len(cmd) == 0 {
|
||||
return
|
||||
}
|
||||
handler := GetHandler(cmd[0])
|
||||
if handler == nil {
|
||||
return
|
||||
}
|
||||
stdout, err := beam.SendRPipe(conn, data.Empty().Set("cmd", "log", "stdout").Set("fromcmd", cmd...).Bytes())
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer stdout.Close()
|
||||
stderr, err := beam.SendRPipe(conn, data.Empty().Set("cmd", "log", "stderr").Set("fromcmd", cmd...).Bytes())
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer stderr.Close()
|
||||
Debugf("[handlers] calling %s\n", strings.Join(cmd, " "))
|
||||
handler(cmd, stdout, stderr, beam.Receiver(conn), beam.Sender(conn))
|
||||
Debugf("[handlers] returned: %s\n", strings.Join(cmd, " "))
|
||||
}()
|
||||
return nil
|
||||
})
|
||||
beam.Copy(r, priv)
|
||||
Debugf("[handlers] waiting for all tasks\n")
|
||||
tasks.Wait()
|
||||
Debugf("[handlers] all tasks returned\n")
|
||||
}()
|
||||
return pub, nil
|
||||
}
|
||||
|
||||
func GetHandler(name string) Handler {
|
||||
if name == "logger" {
|
||||
return CmdLogger
|
||||
} else if name == "render" {
|
||||
return CmdRender
|
||||
} else if name == "devnull" {
|
||||
return CmdDevnull
|
||||
} else if name == "prompt" {
|
||||
return CmdPrompt
|
||||
} else if name == "stdio" {
|
||||
return CmdStdio
|
||||
} else if name == "echo" {
|
||||
return CmdEcho
|
||||
} else if name == "pass" {
|
||||
return CmdPass
|
||||
} else if name == "in" {
|
||||
return CmdIn
|
||||
} else if name == "exec" {
|
||||
return CmdExec
|
||||
} else if name == "trace" {
|
||||
return CmdTrace
|
||||
} else if name == "emit" {
|
||||
return CmdEmit
|
||||
} else if name == "print" {
|
||||
return CmdPrint
|
||||
} else if name == "multiprint" {
|
||||
return CmdMultiprint
|
||||
} else if name == "listen" {
|
||||
return CmdListen
|
||||
} else if name == "beamsend" {
|
||||
return CmdBeamsend
|
||||
} else if name == "beamreceive" {
|
||||
return CmdBeamreceive
|
||||
} else if name == "connect" {
|
||||
return CmdConnect
|
||||
} else if name == "openfile" {
|
||||
return CmdOpenfile
|
||||
} else if name == "spawn" {
|
||||
return CmdSpawn
|
||||
} else if name == "chdir" {
|
||||
return CmdChdir
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// VARIOUS HELPER FUNCTIONS:
|
||||
|
||||
func connToFile(conn net.Conn) (f *os.File, err error) {
|
||||
if connWithFile, ok := conn.(interface {
|
||||
File() (*os.File, error)
|
||||
}); !ok {
|
||||
return nil, fmt.Errorf("no file descriptor available")
|
||||
} else {
|
||||
f, err = connWithFile.File()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return f, err
|
||||
}
|
||||
|
||||
type Msg struct {
|
||||
payload []byte
|
||||
attachment *os.File
|
||||
}
|
||||
|
||||
func Logf(msg string, args ...interface{}) (int, error) {
|
||||
if len(msg) == 0 || msg[len(msg)-1] != '\n' {
|
||||
msg = msg + "\n"
|
||||
}
|
||||
msg = fmt.Sprintf("[%v] [%v] %s", os.Getpid(), path.Base(os.Args[0]), msg)
|
||||
return fmt.Printf(msg, args...)
|
||||
}
|
||||
|
||||
func Debugf(msg string, args ...interface{}) {
|
||||
if os.Getenv("BEAMDEBUG") != "" {
|
||||
Logf(msg, args...)
|
||||
}
|
||||
}
|
||||
|
||||
func Fatalf(msg string, args ...interface{}) {
|
||||
Logf(msg, args...)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
func Fatal(args ...interface{}) {
|
||||
Fatalf("%v", args[0])
|
||||
}
|
||||
|
||||
func scriptString(script []*dockerscript.Command) string {
|
||||
lines := make([]string, 0, len(script))
|
||||
for _, cmd := range script {
|
||||
line := strings.Join(cmd.Args, " ")
|
||||
if len(cmd.Children) > 0 {
|
||||
line += fmt.Sprintf(" { %s }", scriptString(cmd.Children))
|
||||
} else {
|
||||
line += " {}"
|
||||
}
|
||||
lines = append(lines, line)
|
||||
}
|
||||
return fmt.Sprintf("'%s'", strings.Join(lines, "; "))
|
||||
}
|
||||
|
||||
func dialer(addr string) (chan net.Conn, error) {
|
||||
u, err := url.Parse(addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
connections := make(chan net.Conn)
|
||||
go func() {
|
||||
defer close(connections)
|
||||
for {
|
||||
conn, err := net.Dial(u.Scheme, u.Host)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
connections <- conn
|
||||
}
|
||||
}()
|
||||
return connections, nil
|
||||
}
|
||||
|
||||
func listener(addr string) (chan net.Conn, error) {
|
||||
u, err := url.Parse(addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
l, err := net.Listen(u.Scheme, u.Host)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
connections := make(chan net.Conn)
|
||||
go func() {
|
||||
defer close(connections)
|
||||
for {
|
||||
conn, err := l.Accept()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
Logf("new connection\n")
|
||||
connections <- conn
|
||||
}
|
||||
}()
|
||||
return connections, nil
|
||||
}
|
||||
|
||||
func SendToConn(connections chan net.Conn, src beam.Receiver) error {
|
||||
var tasks sync.WaitGroup
|
||||
defer tasks.Wait()
|
||||
for {
|
||||
payload, attachment, err := src.Receive()
|
||||
if err == io.EOF {
|
||||
return nil
|
||||
} else if err != nil {
|
||||
return err
|
||||
}
|
||||
conn, ok := <-connections
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
Logf("Sending %s\n", msgDesc(payload, attachment))
|
||||
tasks.Add(1)
|
||||
go func(payload []byte, attachment *os.File, conn net.Conn) {
|
||||
defer tasks.Done()
|
||||
if _, err := conn.Write([]byte(data.EncodeString(string(payload)))); err != nil {
|
||||
return
|
||||
}
|
||||
if attachment == nil {
|
||||
conn.Close()
|
||||
return
|
||||
}
|
||||
var iotasks sync.WaitGroup
|
||||
iotasks.Add(2)
|
||||
go func(attachment *os.File, conn net.Conn) {
|
||||
defer iotasks.Done()
|
||||
Debugf("copying the connection to [%d]\n", attachment.Fd())
|
||||
io.Copy(attachment, conn)
|
||||
attachment.Close()
|
||||
Debugf("done copying the connection to [%d]\n", attachment.Fd())
|
||||
}(attachment, conn)
|
||||
go func(attachment *os.File, conn net.Conn) {
|
||||
defer iotasks.Done()
|
||||
Debugf("copying [%d] to the connection\n", attachment.Fd())
|
||||
io.Copy(conn, attachment)
|
||||
conn.Close()
|
||||
Debugf("done copying [%d] to the connection\n", attachment.Fd())
|
||||
}(attachment, conn)
|
||||
iotasks.Wait()
|
||||
}(payload, attachment, conn)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func msgDesc(payload []byte, attachment *os.File) string {
|
||||
return beam.MsgDesc(payload, attachment)
|
||||
}
|
||||
|
||||
func ReceiveFromConn(connections chan net.Conn, dst beam.Sender) error {
|
||||
for conn := range connections {
|
||||
err := func() error {
|
||||
Logf("parsing message from network...\n")
|
||||
defer Logf("done parsing message from network\n")
|
||||
buf := make([]byte, 4098)
|
||||
n, err := conn.Read(buf)
|
||||
if n == 0 {
|
||||
conn.Close()
|
||||
if err == io.EOF {
|
||||
return nil
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
}
|
||||
Logf("decoding message from '%s'\n", buf[:n])
|
||||
header, skip, err := data.DecodeString(string(buf[:n]))
|
||||
if err != nil {
|
||||
conn.Close()
|
||||
return err
|
||||
}
|
||||
pub, priv, err := beam.SocketPair()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
Logf("decoded message: %s\n", data.Message(header).Pretty())
|
||||
go func(skipped []byte, conn net.Conn, f *os.File) {
|
||||
// this closes both conn and f
|
||||
if len(skipped) > 0 {
|
||||
if _, err := f.Write(skipped); err != nil {
|
||||
Logf("ERROR: %v\n", err)
|
||||
f.Close()
|
||||
conn.Close()
|
||||
return
|
||||
}
|
||||
}
|
||||
bicopy(conn, f)
|
||||
}(buf[skip:n], conn, pub)
|
||||
if err := dst.Send([]byte(header), priv); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}()
|
||||
if err != nil {
|
||||
Logf("Error reading from connection: %v\n", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func bicopy(a, b io.ReadWriteCloser) {
|
||||
var iotasks sync.WaitGroup
|
||||
oneCopy := func(dst io.WriteCloser, src io.Reader) {
|
||||
defer iotasks.Done()
|
||||
io.Copy(dst, src)
|
||||
dst.Close()
|
||||
}
|
||||
iotasks.Add(2)
|
||||
go oneCopy(a, b)
|
||||
go oneCopy(b, a)
|
||||
iotasks.Wait()
|
||||
}
|
|
@ -1,441 +0,0 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"fmt"
|
||||
"github.com/dotcloud/docker/pkg/beam"
|
||||
"github.com/dotcloud/docker/pkg/beam/data"
|
||||
"github.com/dotcloud/docker/pkg/term"
|
||||
"github.com/dotcloud/docker/utils"
|
||||
"io"
|
||||
"net"
|
||||
"net/url"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path"
|
||||
"strings"
|
||||
"sync"
|
||||
"text/template"
|
||||
)
|
||||
|
||||
func CmdLogger(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
|
||||
if err := os.MkdirAll("logs", 0700); err != nil {
|
||||
fmt.Fprintf(stderr, "%v\n", err)
|
||||
return
|
||||
}
|
||||
var tasks sync.WaitGroup
|
||||
defer tasks.Wait()
|
||||
var n int = 1
|
||||
r := beam.NewRouter(out)
|
||||
r.NewRoute().HasAttachment().KeyStartsWith("cmd", "log").Handler(func(payload []byte, attachment *os.File) error {
|
||||
tasks.Add(1)
|
||||
go func(n int) {
|
||||
defer tasks.Done()
|
||||
defer attachment.Close()
|
||||
var streamname string
|
||||
if cmd := data.Message(payload).Get("cmd"); len(cmd) == 1 || cmd[1] == "stdout" {
|
||||
streamname = "stdout"
|
||||
} else {
|
||||
streamname = cmd[1]
|
||||
}
|
||||
if fromcmd := data.Message(payload).Get("fromcmd"); len(fromcmd) != 0 {
|
||||
streamname = fmt.Sprintf("%s-%s", strings.Replace(strings.Join(fromcmd, "_"), "/", "_", -1), streamname)
|
||||
}
|
||||
logfile, err := os.OpenFile(path.Join("logs", fmt.Sprintf("%d-%s", n, streamname)), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0700)
|
||||
if err != nil {
|
||||
fmt.Fprintf(stderr, "%v\n", err)
|
||||
return
|
||||
}
|
||||
defer logfile.Close()
|
||||
io.Copy(logfile, attachment)
|
||||
logfile.Sync()
|
||||
}(n)
|
||||
n++
|
||||
return nil
|
||||
}).Tee(out)
|
||||
if _, err := beam.Copy(r, in); err != nil {
|
||||
fmt.Fprintf(stderr, "%v\n", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func CmdRender(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
|
||||
if len(args) != 2 {
|
||||
fmt.Fprintf(stderr, "Usage: %s FORMAT\n", args[0])
|
||||
out.Send(data.Empty().Set("status", "1").Bytes(), nil)
|
||||
return
|
||||
}
|
||||
txt := args[1]
|
||||
if !strings.HasSuffix(txt, "\n") {
|
||||
txt += "\n"
|
||||
}
|
||||
t := template.Must(template.New("render").Parse(txt))
|
||||
for {
|
||||
payload, attachment, err := in.Receive()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
msg, err := data.Decode(string(payload))
|
||||
if err != nil {
|
||||
fmt.Fprintf(stderr, "decode error: %v\n")
|
||||
}
|
||||
if err := t.Execute(stdout, msg); err != nil {
|
||||
fmt.Fprintf(stderr, "rendering error: %v\n", err)
|
||||
out.Send(data.Empty().Set("status", "1").Bytes(), nil)
|
||||
return
|
||||
}
|
||||
if err := out.Send(payload, attachment); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func CmdDevnull(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
|
||||
for {
|
||||
_, attachment, err := in.Receive()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if attachment != nil {
|
||||
attachment.Close()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func CmdPrompt(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
|
||||
if len(args) < 2 {
|
||||
fmt.Fprintf(stderr, "usage: %s PROMPT...\n", args[0])
|
||||
return
|
||||
}
|
||||
if !term.IsTerminal(0) {
|
||||
fmt.Fprintf(stderr, "can't prompt: no tty available...\n")
|
||||
return
|
||||
}
|
||||
fmt.Printf("%s: ", strings.Join(args[1:], " "))
|
||||
oldState, _ := term.SaveState(0)
|
||||
term.DisableEcho(0, oldState)
|
||||
line, _, err := bufio.NewReader(os.Stdin).ReadLine()
|
||||
if err != nil {
|
||||
fmt.Fprintln(stderr, err.Error())
|
||||
return
|
||||
}
|
||||
val := string(line)
|
||||
fmt.Printf("\n")
|
||||
term.RestoreTerminal(0, oldState)
|
||||
out.Send(data.Empty().Set("fromcmd", args...).Set("value", val).Bytes(), nil)
|
||||
}
|
||||
|
||||
func CmdStdio(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
|
||||
var tasks sync.WaitGroup
|
||||
defer tasks.Wait()
|
||||
|
||||
r := beam.NewRouter(out)
|
||||
r.NewRoute().HasAttachment().KeyStartsWith("cmd", "log").Handler(func(payload []byte, attachment *os.File) error {
|
||||
tasks.Add(1)
|
||||
go func() {
|
||||
defer tasks.Done()
|
||||
defer attachment.Close()
|
||||
io.Copy(os.Stdout, attachment)
|
||||
attachment.Close()
|
||||
}()
|
||||
return nil
|
||||
}).Tee(out)
|
||||
|
||||
if _, err := beam.Copy(r, in); err != nil {
|
||||
Fatal(err)
|
||||
fmt.Fprintf(stderr, "%v\n", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func CmdEcho(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
|
||||
fmt.Fprintln(stdout, strings.Join(args[1:], " "))
|
||||
}
|
||||
|
||||
func CmdPass(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
|
||||
for {
|
||||
payload, attachment, err := in.Receive()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if err := out.Send(payload, attachment); err != nil {
|
||||
if attachment != nil {
|
||||
attachment.Close()
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func CmdSpawn(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
|
||||
c := exec.Command(utils.SelfPath())
|
||||
r, w, err := os.Pipe()
|
||||
if err != nil {
|
||||
fmt.Fprintf(stderr, "%v\n", err)
|
||||
return
|
||||
}
|
||||
c.Stdin = r
|
||||
c.Stdout = stdout
|
||||
c.Stderr = stderr
|
||||
go func() {
|
||||
fmt.Fprintf(w, strings.Join(args[1:], " "))
|
||||
w.Sync()
|
||||
w.Close()
|
||||
}()
|
||||
if err := c.Run(); err != nil {
|
||||
fmt.Fprintf(stderr, "%v\n", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func CmdIn(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
|
||||
os.Chdir(args[1])
|
||||
GetHandler("pass")([]string{"pass"}, stdout, stderr, in, out)
|
||||
}
|
||||
|
||||
func CmdExec(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
|
||||
cmd := exec.Command(args[1], args[2:]...)
|
||||
cmd.Stdout = stdout
|
||||
cmd.Stderr = stderr
|
||||
//cmd.Stdin = os.Stdin
|
||||
local, remote, err := beam.SocketPair()
|
||||
if err != nil {
|
||||
fmt.Fprintf(stderr, "%v\n", err)
|
||||
return
|
||||
}
|
||||
child, err := beam.FileConn(local)
|
||||
if err != nil {
|
||||
local.Close()
|
||||
remote.Close()
|
||||
fmt.Fprintf(stderr, "%v\n", err)
|
||||
return
|
||||
}
|
||||
local.Close()
|
||||
cmd.ExtraFiles = append(cmd.ExtraFiles, remote)
|
||||
|
||||
var tasks sync.WaitGroup
|
||||
tasks.Add(1)
|
||||
go func() {
|
||||
defer Debugf("done copying to child\n")
|
||||
defer tasks.Done()
|
||||
defer child.CloseWrite()
|
||||
beam.Copy(child, in)
|
||||
}()
|
||||
|
||||
tasks.Add(1)
|
||||
go func() {
|
||||
defer Debugf("done copying from child %d\n")
|
||||
defer tasks.Done()
|
||||
r := beam.NewRouter(out)
|
||||
r.NewRoute().All().Handler(func(p []byte, a *os.File) error {
|
||||
return out.Send(data.Message(p).Set("pid", fmt.Sprintf("%d", cmd.Process.Pid)).Bytes(), a)
|
||||
})
|
||||
beam.Copy(r, child)
|
||||
}()
|
||||
execErr := cmd.Run()
|
||||
// We can close both ends of the socket without worrying about data stuck in the buffer,
|
||||
// because unix socket writes are fully synchronous.
|
||||
child.Close()
|
||||
tasks.Wait()
|
||||
var status string
|
||||
if execErr != nil {
|
||||
status = execErr.Error()
|
||||
} else {
|
||||
status = "ok"
|
||||
}
|
||||
out.Send(data.Empty().Set("status", status).Set("cmd", args...).Bytes(), nil)
|
||||
}
|
||||
|
||||
func CmdTrace(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
|
||||
r := beam.NewRouter(out)
|
||||
r.NewRoute().All().Handler(func(payload []byte, attachment *os.File) error {
|
||||
var sfd string = "nil"
|
||||
if attachment != nil {
|
||||
sfd = fmt.Sprintf("%d", attachment.Fd())
|
||||
}
|
||||
fmt.Printf("===> %s [%s]\n", data.Message(payload).Pretty(), sfd)
|
||||
out.Send(payload, attachment)
|
||||
return nil
|
||||
})
|
||||
beam.Copy(r, in)
|
||||
}
|
||||
|
||||
func CmdEmit(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
|
||||
out.Send(data.Parse(args[1:]).Bytes(), nil)
|
||||
}
|
||||
|
||||
func CmdPrint(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
|
||||
for {
|
||||
payload, a, err := in.Receive()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
// Skip commands
|
||||
if a != nil && data.Message(payload).Get("cmd") == nil {
|
||||
dup, err := beam.SendRPipe(out, payload)
|
||||
if err != nil {
|
||||
a.Close()
|
||||
return
|
||||
}
|
||||
io.Copy(io.MultiWriter(os.Stdout, dup), a)
|
||||
dup.Close()
|
||||
} else {
|
||||
if err := out.Send(payload, a); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func CmdMultiprint(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
|
||||
var tasks sync.WaitGroup
|
||||
defer tasks.Wait()
|
||||
r := beam.NewRouter(out)
|
||||
multiprint := func(p []byte, a *os.File) error {
|
||||
tasks.Add(1)
|
||||
go func() {
|
||||
defer tasks.Done()
|
||||
defer a.Close()
|
||||
msg := data.Message(string(p))
|
||||
input := bufio.NewScanner(a)
|
||||
for input.Scan() {
|
||||
fmt.Printf("[%s] %s\n", msg.Pretty(), input.Text())
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
r.NewRoute().KeyIncludes("type", "job").Passthrough(out)
|
||||
r.NewRoute().HasAttachment().Handler(multiprint).Tee(out)
|
||||
beam.Copy(r, in)
|
||||
}
|
||||
|
||||
func CmdListen(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
|
||||
if len(args) != 2 {
|
||||
out.Send(data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil)
|
||||
return
|
||||
}
|
||||
u, err := url.Parse(args[1])
|
||||
if err != nil {
|
||||
out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil)
|
||||
return
|
||||
}
|
||||
l, err := net.Listen(u.Scheme, u.Host)
|
||||
if err != nil {
|
||||
out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil)
|
||||
return
|
||||
}
|
||||
for {
|
||||
conn, err := l.Accept()
|
||||
if err != nil {
|
||||
out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil)
|
||||
return
|
||||
}
|
||||
f, err := connToFile(conn)
|
||||
if err != nil {
|
||||
conn.Close()
|
||||
continue
|
||||
}
|
||||
out.Send(data.Empty().Set("type", "socket").Set("remoteaddr", conn.RemoteAddr().String()).Bytes(), f)
|
||||
}
|
||||
}
|
||||
|
||||
func CmdBeamsend(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
|
||||
if len(args) < 2 {
|
||||
if err := out.Send(data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil); err != nil {
|
||||
Fatal(err)
|
||||
}
|
||||
return
|
||||
}
|
||||
var connector func(string) (chan net.Conn, error)
|
||||
connector = dialer
|
||||
connections, err := connector(args[1])
|
||||
if err != nil {
|
||||
out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil)
|
||||
return
|
||||
}
|
||||
// Copy in to conn
|
||||
SendToConn(connections, in)
|
||||
}
|
||||
|
||||
func CmdBeamreceive(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
|
||||
if len(args) != 2 {
|
||||
if err := out.Send(data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil); err != nil {
|
||||
Fatal(err)
|
||||
}
|
||||
return
|
||||
}
|
||||
var connector func(string) (chan net.Conn, error)
|
||||
connector = listener
|
||||
connections, err := connector(args[1])
|
||||
if err != nil {
|
||||
out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil)
|
||||
return
|
||||
}
|
||||
// Copy in to conn
|
||||
ReceiveFromConn(connections, out)
|
||||
}
|
||||
|
||||
func CmdConnect(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
|
||||
if len(args) != 2 {
|
||||
out.Send(data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil)
|
||||
return
|
||||
}
|
||||
u, err := url.Parse(args[1])
|
||||
if err != nil {
|
||||
out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil)
|
||||
return
|
||||
}
|
||||
var tasks sync.WaitGroup
|
||||
for {
|
||||
_, attachment, err := in.Receive()
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
if attachment == nil {
|
||||
continue
|
||||
}
|
||||
Logf("connecting to %s/%s\n", u.Scheme, u.Host)
|
||||
conn, err := net.Dial(u.Scheme, u.Host)
|
||||
if err != nil {
|
||||
out.Send(data.Empty().Set("cmd", "msg", "connect error: "+err.Error()).Bytes(), nil)
|
||||
return
|
||||
}
|
||||
out.Send(data.Empty().Set("cmd", "msg", "connection established").Bytes(), nil)
|
||||
tasks.Add(1)
|
||||
go func(attachment *os.File, conn net.Conn) {
|
||||
defer tasks.Done()
|
||||
// even when successful, conn.File() returns a duplicate,
|
||||
// so we must close the original
|
||||
var iotasks sync.WaitGroup
|
||||
iotasks.Add(2)
|
||||
go func(attachment *os.File, conn net.Conn) {
|
||||
defer iotasks.Done()
|
||||
io.Copy(attachment, conn)
|
||||
}(attachment, conn)
|
||||
go func(attachment *os.File, conn net.Conn) {
|
||||
defer iotasks.Done()
|
||||
io.Copy(conn, attachment)
|
||||
}(attachment, conn)
|
||||
iotasks.Wait()
|
||||
conn.Close()
|
||||
attachment.Close()
|
||||
}(attachment, conn)
|
||||
}
|
||||
tasks.Wait()
|
||||
}
|
||||
|
||||
func CmdOpenfile(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
|
||||
for _, name := range args {
|
||||
f, err := os.Open(name)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if err := out.Send(data.Empty().Set("path", name).Set("type", "file").Bytes(), f); err != nil {
|
||||
f.Close()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func CmdChdir(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
|
||||
os.Chdir(args[1])
|
||||
}
|
|
@ -1,3 +0,0 @@
|
|||
#!/usr/bin/env beamsh
|
||||
|
||||
exec ls -l
|
|
@ -1,5 +0,0 @@
|
|||
#!/usr/bin/env beamsh
|
||||
|
||||
trace {
|
||||
exec ls -l
|
||||
}
|
|
@ -1,7 +0,0 @@
|
|||
#!/usr/bin/env beamsh
|
||||
|
||||
trace {
|
||||
stdio {
|
||||
exec ls -l
|
||||
}
|
||||
}
|
|
@ -1,10 +0,0 @@
|
|||
#!/usr/bin/env beamsh -x
|
||||
|
||||
trace outer {
|
||||
# stdio fails
|
||||
stdio {
|
||||
trace inner {
|
||||
exec ls -l
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,9 +0,0 @@
|
|||
#!/usr/bin/env beamsh
|
||||
|
||||
stdio {
|
||||
trace {
|
||||
stdio {
|
||||
exec ls -l
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,6 +0,0 @@
|
|||
#!/usr/bin/env beamsh
|
||||
|
||||
stdio {
|
||||
# exec fails
|
||||
exec ls -l
|
||||
}
|
|
@ -1,7 +0,0 @@
|
|||
#!/usr/bin/env beamsh
|
||||
|
||||
stdio {
|
||||
trace {
|
||||
echo hello
|
||||
}
|
||||
}
|
|
@ -1,6 +0,0 @@
|
|||
#!/usr/bin/env beamsh
|
||||
|
||||
stdio {
|
||||
# exec fails
|
||||
echo hello world
|
||||
}
|
|
@ -1,9 +0,0 @@
|
|||
#!/usr/bin/env beamsh
|
||||
|
||||
devnull {
|
||||
multiprint {
|
||||
exec tail -f /var/log/system.log &
|
||||
exec ls -l
|
||||
exec ls ksdhfkjdshf jksdfhkjsdhf
|
||||
}
|
||||
}
|
|
@ -1,8 +0,0 @@
|
|||
#!/usr/bin/env beamsh
|
||||
|
||||
print {
|
||||
trace {
|
||||
emit msg=hello
|
||||
emit msg=world
|
||||
}
|
||||
}
|
|
@ -1,9 +0,0 @@
|
|||
#!/usr/bin/env beamsh
|
||||
|
||||
trace {
|
||||
log {
|
||||
exec ls -l
|
||||
exec ls /tmp/jhsdfkjhsdjkfhsdjkfhsdjkkhsdjkf
|
||||
echo hello world
|
||||
}
|
||||
}
|
|
@ -1,9 +0,0 @@
|
|||
#!/usr/bin/env beamsh
|
||||
|
||||
multiprint {
|
||||
trace {
|
||||
listen tcp://localhost:7676 &
|
||||
listen tcp://localhost:8787 &
|
||||
}
|
||||
}
|
||||
|
184
beam/router.go
184
beam/router.go
|
@ -1,184 +0,0 @@
|
|||
package beam
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/dotcloud/docker/pkg/beam/data"
|
||||
"io"
|
||||
"os"
|
||||
)
|
||||
|
||||
type Router struct {
|
||||
routes []*Route
|
||||
sink Sender
|
||||
}
|
||||
|
||||
func NewRouter(sink Sender) *Router {
|
||||
return &Router{sink: sink}
|
||||
}
|
||||
|
||||
func (r *Router) Send(payload []byte, attachment *os.File) (err error) {
|
||||
//fmt.Printf("Router.Send(%s)\n", MsgDesc(payload, attachment))
|
||||
defer func() {
|
||||
//fmt.Printf("DONE Router.Send(%s) = %v\n", MsgDesc(payload, attachment), err)
|
||||
}()
|
||||
for _, route := range r.routes {
|
||||
if route.Match(payload, attachment) {
|
||||
return route.Handle(payload, attachment)
|
||||
}
|
||||
}
|
||||
if r.sink != nil {
|
||||
// fmt.Printf("[%d] [Router.Send] no match. sending %s to sink %#v\n", os.Getpid(), MsgDesc(payload, attachment), r.sink)
|
||||
return r.sink.Send(payload, attachment)
|
||||
}
|
||||
//fmt.Printf("[Router.Send] no match. return error.\n")
|
||||
return fmt.Errorf("no matching route")
|
||||
}
|
||||
|
||||
func (r *Router) NewRoute() *Route {
|
||||
route := &Route{}
|
||||
r.routes = append(r.routes, route)
|
||||
return route
|
||||
}
|
||||
|
||||
type Route struct {
|
||||
rules []func([]byte, *os.File) bool
|
||||
handler func([]byte, *os.File) error
|
||||
}
|
||||
|
||||
func (route *Route) Match(payload []byte, attachment *os.File) bool {
|
||||
for _, rule := range route.rules {
|
||||
if !rule(payload, attachment) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (route *Route) Handle(payload []byte, attachment *os.File) error {
|
||||
if route.handler == nil {
|
||||
return nil
|
||||
}
|
||||
return route.handler(payload, attachment)
|
||||
}
|
||||
|
||||
func (r *Route) HasAttachment() *Route {
|
||||
r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool {
|
||||
return attachment != nil
|
||||
})
|
||||
return r
|
||||
}
|
||||
|
||||
func (route *Route) Tee(dst Sender) *Route {
|
||||
inner := route.handler
|
||||
route.handler = func(payload []byte, attachment *os.File) error {
|
||||
if inner == nil {
|
||||
return nil
|
||||
}
|
||||
if attachment == nil {
|
||||
return inner(payload, attachment)
|
||||
}
|
||||
// Setup the tee
|
||||
w, err := SendRPipe(dst, payload)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
teeR, teeW, err := os.Pipe()
|
||||
if err != nil {
|
||||
w.Close()
|
||||
return err
|
||||
}
|
||||
go func() {
|
||||
io.Copy(io.MultiWriter(teeW, w), attachment)
|
||||
attachment.Close()
|
||||
w.Close()
|
||||
teeW.Close()
|
||||
}()
|
||||
return inner(payload, teeR)
|
||||
}
|
||||
return route
|
||||
}
|
||||
|
||||
func (r *Route) Filter(f func([]byte, *os.File) bool) *Route {
|
||||
r.rules = append(r.rules, f)
|
||||
return r
|
||||
}
|
||||
|
||||
func (r *Route) KeyStartsWith(k string, beginning ...string) *Route {
|
||||
r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool {
|
||||
values := data.Message(payload).Get(k)
|
||||
if values == nil {
|
||||
return false
|
||||
}
|
||||
if len(values) < len(beginning) {
|
||||
return false
|
||||
}
|
||||
for i, v := range beginning {
|
||||
if v != values[i] {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
})
|
||||
return r
|
||||
}
|
||||
|
||||
func (r *Route) KeyEquals(k string, full ...string) *Route {
|
||||
r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool {
|
||||
values := data.Message(payload).Get(k)
|
||||
if len(values) != len(full) {
|
||||
return false
|
||||
}
|
||||
for i, v := range full {
|
||||
if v != values[i] {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
})
|
||||
return r
|
||||
}
|
||||
|
||||
func (r *Route) KeyIncludes(k, v string) *Route {
|
||||
r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool {
|
||||
for _, val := range data.Message(payload).Get(k) {
|
||||
if val == v {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
})
|
||||
return r
|
||||
}
|
||||
|
||||
func (r *Route) NoKey(k string) *Route {
|
||||
r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool {
|
||||
return len(data.Message(payload).Get(k)) == 0
|
||||
})
|
||||
return r
|
||||
}
|
||||
|
||||
func (r *Route) KeyExists(k string) *Route {
|
||||
r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool {
|
||||
return data.Message(payload).Get(k) != nil
|
||||
})
|
||||
return r
|
||||
}
|
||||
|
||||
func (r *Route) Passthrough(dst Sender) *Route {
|
||||
r.handler = func(payload []byte, attachment *os.File) error {
|
||||
return dst.Send(payload, attachment)
|
||||
}
|
||||
return r
|
||||
}
|
||||
|
||||
func (r *Route) All() *Route {
|
||||
r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool {
|
||||
return true
|
||||
})
|
||||
return r
|
||||
}
|
||||
|
||||
func (r *Route) Handler(h func([]byte, *os.File) error) *Route {
|
||||
r.handler = h
|
||||
return r
|
||||
}
|
|
@ -1,95 +0,0 @@
|
|||
package beam
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"sync"
|
||||
"testing"
|
||||
)
|
||||
|
||||
type msg struct {
|
||||
payload []byte
|
||||
attachment *os.File
|
||||
}
|
||||
|
||||
func (m msg) String() string {
|
||||
return MsgDesc(m.payload, m.attachment)
|
||||
}
|
||||
|
||||
type mockReceiver []msg
|
||||
|
||||
func (r *mockReceiver) Send(p []byte, a *os.File) error {
|
||||
(*r) = append((*r), msg{p, a})
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestSendNoSinkNoRoute(t *testing.T) {
|
||||
r := NewRouter(nil)
|
||||
if err := r.Send([]byte("hello"), nil); err == nil {
|
||||
t.Fatalf("error expected")
|
||||
}
|
||||
a, b, err := os.Pipe()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer a.Close()
|
||||
defer b.Close()
|
||||
if err := r.Send([]byte("foo bar baz"), a); err == nil {
|
||||
t.Fatalf("error expected")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSendSinkNoRoute(t *testing.T) {
|
||||
var sink mockReceiver
|
||||
r := NewRouter(&sink)
|
||||
if err := r.Send([]byte("hello"), nil); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
a, b, err := os.Pipe()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer a.Close()
|
||||
defer b.Close()
|
||||
if err := r.Send([]byte("world"), a); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(sink) != 2 {
|
||||
t.Fatalf("%#v\n", sink)
|
||||
}
|
||||
if string(sink[0].payload) != "hello" {
|
||||
t.Fatalf("%#v\n", sink)
|
||||
}
|
||||
if sink[0].attachment != nil {
|
||||
t.Fatalf("%#v\n", sink)
|
||||
}
|
||||
if string(sink[1].payload) != "world" {
|
||||
t.Fatalf("%#v\n", sink)
|
||||
}
|
||||
if sink[1].attachment == nil || sink[1].attachment.Fd() > 42 || sink[1].attachment.Fd() < 0 {
|
||||
t.Fatalf("%v\n", sink)
|
||||
}
|
||||
var tasks sync.WaitGroup
|
||||
tasks.Add(2)
|
||||
go func() {
|
||||
defer tasks.Done()
|
||||
fmt.Printf("[%d] Reading from '%d'\n", os.Getpid(), sink[1].attachment.Fd())
|
||||
data, err := ioutil.ReadAll(sink[1].attachment)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if string(data) != "foo bar\n" {
|
||||
t.Fatalf("%v\n", string(data))
|
||||
}
|
||||
}()
|
||||
go func() {
|
||||
defer tasks.Done()
|
||||
fmt.Printf("[%d] writing to '%d'\n", os.Getpid(), a.Fd())
|
||||
if _, err := fmt.Fprintf(b, "foo bar\n"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
b.Close()
|
||||
}()
|
||||
tasks.Wait()
|
||||
}
|
|
@ -1,85 +0,0 @@
|
|||
package beam
|
||||
|
||||
import (
|
||||
"net"
|
||||
)
|
||||
|
||||
// Listen is a convenience interface for applications to create service endpoints
|
||||
// which can be easily used with existing networking code.
|
||||
//
|
||||
// Listen registers a new service endpoint on the beam connection `conn`, using the
|
||||
// service name `name`. It returns a listener which can be used in the usual
|
||||
// way. Calling Accept() on the listener will block until a new connection is available
|
||||
// on the service endpoint. The endpoint is then returned as a regular net.Conn and
|
||||
// can be used as a regular network connection.
|
||||
//
|
||||
// Note that if the underlying file descriptor received in attachment is nil or does
|
||||
// not point to a connection, that message will be skipped.
|
||||
//
|
||||
func Listen(conn Sender, name string) (net.Listener, error) {
|
||||
endpoint, err := SendConn(conn, []byte(name))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &listener{
|
||||
name: name,
|
||||
endpoint: endpoint,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func Connect(ctx *UnixConn, name string) (net.Conn, error) {
|
||||
l, err := Listen(ctx, name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
conn, err := l.Accept()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
type listener struct {
|
||||
name string
|
||||
endpoint ReceiveCloser
|
||||
}
|
||||
|
||||
func (l *listener) Accept() (net.Conn, error) {
|
||||
for {
|
||||
_, f, err := l.endpoint.Receive()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if f == nil {
|
||||
// Skip empty attachments
|
||||
continue
|
||||
}
|
||||
conn, err := net.FileConn(f)
|
||||
if err != nil {
|
||||
// Skip beam attachments which are not connections
|
||||
// (for example might be a regular file, directory etc)
|
||||
continue
|
||||
}
|
||||
return conn, nil
|
||||
}
|
||||
panic("impossibru!")
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (l *listener) Close() error {
|
||||
return l.endpoint.Close()
|
||||
}
|
||||
|
||||
func (l *listener) Addr() net.Addr {
|
||||
return addr(l.name)
|
||||
}
|
||||
|
||||
type addr string
|
||||
|
||||
func (a addr) Network() string {
|
||||
return "beam"
|
||||
}
|
||||
|
||||
func (a addr) String() string {
|
||||
return string(a)
|
||||
}
|
317
beam/unix.go
317
beam/unix.go
|
@ -1,317 +0,0 @@
|
|||
package beam
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"syscall"
|
||||
)
|
||||
|
||||
func debugCheckpoint(msg string, args ...interface{}) {
|
||||
if os.Getenv("DEBUG") == "" {
|
||||
return
|
||||
}
|
||||
os.Stdout.Sync()
|
||||
tty, _ := os.OpenFile("/dev/tty", os.O_RDWR, 0700)
|
||||
fmt.Fprintf(tty, msg, args...)
|
||||
bufio.NewScanner(tty).Scan()
|
||||
tty.Close()
|
||||
}
|
||||
|
||||
type UnixConn struct {
|
||||
*net.UnixConn
|
||||
fds []*os.File
|
||||
}
|
||||
|
||||
// Framing:
|
||||
// In order to handle framing in Send/Recieve, as these give frame
|
||||
// boundaries we use a very simple 4 bytes header. It is a big endiand
|
||||
// uint32 where the high bit is set if the message includes a file
|
||||
// descriptor. The rest of the uint32 is the length of the next frame.
|
||||
// We need the bit in order to be able to assign recieved fds to
|
||||
// the right message, as multiple messages may be coalesced into
|
||||
// a single recieve operation.
|
||||
func makeHeader(data []byte, fds []int) ([]byte, error) {
|
||||
header := make([]byte, 4)
|
||||
|
||||
length := uint32(len(data))
|
||||
|
||||
if length > 0x7fffffff {
|
||||
return nil, fmt.Errorf("Data to large")
|
||||
}
|
||||
|
||||
if len(fds) != 0 {
|
||||
length = length | 0x80000000
|
||||
}
|
||||
header[0] = byte((length >> 24) & 0xff)
|
||||
header[1] = byte((length >> 16) & 0xff)
|
||||
header[2] = byte((length >> 8) & 0xff)
|
||||
header[3] = byte((length >> 0) & 0xff)
|
||||
|
||||
return header, nil
|
||||
}
|
||||
|
||||
func parseHeader(header []byte) (uint32, bool) {
|
||||
length := uint32(header[0])<<24 | uint32(header[1])<<16 | uint32(header[2])<<8 | uint32(header[3])
|
||||
hasFd := length&0x80000000 != 0
|
||||
length = length & ^uint32(0x80000000)
|
||||
|
||||
return length, hasFd
|
||||
}
|
||||
|
||||
func FileConn(f *os.File) (*UnixConn, error) {
|
||||
conn, err := net.FileConn(f)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
uconn, ok := conn.(*net.UnixConn)
|
||||
if !ok {
|
||||
conn.Close()
|
||||
return nil, fmt.Errorf("%d: not a unix connection", f.Fd())
|
||||
}
|
||||
return &UnixConn{UnixConn: uconn}, nil
|
||||
|
||||
}
|
||||
|
||||
// Send sends a new message on conn with data and f as payload and
|
||||
// attachment, respectively.
|
||||
// On success, f is closed
|
||||
func (conn *UnixConn) Send(data []byte, f *os.File) error {
|
||||
{
|
||||
var fd int = -1
|
||||
if f != nil {
|
||||
fd = int(f.Fd())
|
||||
}
|
||||
debugCheckpoint("===DEBUG=== about to send '%s'[%d]. Hit enter to confirm: ", data, fd)
|
||||
}
|
||||
var fds []int
|
||||
if f != nil {
|
||||
fds = append(fds, int(f.Fd()))
|
||||
}
|
||||
if err := conn.sendUnix(data, fds...); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if f != nil {
|
||||
f.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Receive waits for a new message on conn, and receives its payload
|
||||
// and attachment, or an error if any.
|
||||
//
|
||||
// If more than 1 file descriptor is sent in the message, they are all
|
||||
// closed except for the first, which is the attachment.
|
||||
// It is legal for a message to have no attachment or an empty payload.
|
||||
func (conn *UnixConn) Receive() (rdata []byte, rf *os.File, rerr error) {
|
||||
defer func() {
|
||||
var fd int = -1
|
||||
if rf != nil {
|
||||
fd = int(rf.Fd())
|
||||
}
|
||||
debugCheckpoint("===DEBUG=== Receive() -> '%s'[%d]. Hit enter to continue.\n", rdata, fd)
|
||||
}()
|
||||
|
||||
// Read header
|
||||
header := make([]byte, 4)
|
||||
nRead := uint32(0)
|
||||
|
||||
for nRead < 4 {
|
||||
n, err := conn.receiveUnix(header[nRead:])
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
nRead = nRead + uint32(n)
|
||||
}
|
||||
|
||||
length, hasFd := parseHeader(header)
|
||||
|
||||
if hasFd {
|
||||
if len(conn.fds) == 0 {
|
||||
return nil, nil, fmt.Errorf("No expected file descriptor in message")
|
||||
}
|
||||
|
||||
rf = conn.fds[0]
|
||||
conn.fds = conn.fds[1:]
|
||||
}
|
||||
|
||||
rdata = make([]byte, length)
|
||||
|
||||
nRead = 0
|
||||
for nRead < length {
|
||||
n, err := conn.receiveUnix(rdata[nRead:])
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
nRead = nRead + uint32(n)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (conn *UnixConn) receiveUnix(buf []byte) (int, error) {
|
||||
oob := make([]byte, syscall.CmsgSpace(4))
|
||||
bufn, oobn, _, _, err := conn.ReadMsgUnix(buf, oob)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
fd := extractFd(oob[:oobn])
|
||||
if fd != -1 {
|
||||
f := os.NewFile(uintptr(fd), "")
|
||||
conn.fds = append(conn.fds, f)
|
||||
}
|
||||
|
||||
return bufn, nil
|
||||
}
|
||||
|
||||
func (conn *UnixConn) sendUnix(data []byte, fds ...int) error {
|
||||
header, err := makeHeader(data, fds)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// There is a bug in conn.WriteMsgUnix where it doesn't correctly return
|
||||
// the number of bytes writte (http://code.google.com/p/go/issues/detail?id=7645)
|
||||
// So, we can't rely on the return value from it. However, we must use it to
|
||||
// send the fds. In order to handle this we only write one byte using WriteMsgUnix
|
||||
// (when we have to), as that can only ever block or fully suceed. We then write
|
||||
// the rest with conn.Write()
|
||||
// The reader side should not rely on this though, as hopefully this gets fixed
|
||||
// in go later.
|
||||
written := 0
|
||||
if len(fds) != 0 {
|
||||
oob := syscall.UnixRights(fds...)
|
||||
wrote, _, err := conn.WriteMsgUnix(header[0:1], oob, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
written = written + wrote
|
||||
}
|
||||
|
||||
for written < len(header) {
|
||||
wrote, err := conn.Write(header[written:])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
written = written + wrote
|
||||
}
|
||||
|
||||
written = 0
|
||||
for written < len(data) {
|
||||
wrote, err := conn.Write(data[written:])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
written = written + wrote
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func extractFd(oob []byte) int {
|
||||
// Grab forklock to make sure no forks accidentally inherit the new
|
||||
// fds before they are made CLOEXEC
|
||||
// There is a slight race condition between ReadMsgUnix returns and
|
||||
// when we grap the lock, so this is not perfect. Unfortunately
|
||||
// There is no way to pass MSG_CMSG_CLOEXEC to recvmsg() nor any
|
||||
// way to implement non-blocking i/o in go, so this is hard to fix.
|
||||
syscall.ForkLock.Lock()
|
||||
defer syscall.ForkLock.Unlock()
|
||||
scms, err := syscall.ParseSocketControlMessage(oob)
|
||||
if err != nil {
|
||||
return -1
|
||||
}
|
||||
|
||||
foundFd := -1
|
||||
for _, scm := range scms {
|
||||
fds, err := syscall.ParseUnixRights(&scm)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, fd := range fds {
|
||||
if foundFd == -1 {
|
||||
syscall.CloseOnExec(fd)
|
||||
foundFd = fd
|
||||
} else {
|
||||
syscall.Close(fd)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return foundFd
|
||||
}
|
||||
|
||||
func socketpair() ([2]int, error) {
|
||||
return syscall.Socketpair(syscall.AF_LOCAL, syscall.SOCK_STREAM|syscall.FD_CLOEXEC, 0)
|
||||
}
|
||||
|
||||
// SocketPair is a convenience wrapper around the socketpair(2) syscall.
|
||||
// It returns a unix socket of type SOCK_STREAM in the form of 2 file descriptors
|
||||
// not bound to the underlying filesystem.
|
||||
// Messages sent on one end are received on the other, and vice-versa.
|
||||
// It is the caller's responsibility to close both ends.
|
||||
func SocketPair() (a *os.File, b *os.File, err error) {
|
||||
defer func() {
|
||||
var (
|
||||
fdA int = -1
|
||||
fdB int = -1
|
||||
)
|
||||
if a != nil {
|
||||
fdA = int(a.Fd())
|
||||
}
|
||||
if b != nil {
|
||||
fdB = int(b.Fd())
|
||||
}
|
||||
debugCheckpoint("===DEBUG=== SocketPair() = [%d-%d]. Hit enter to confirm: ", fdA, fdB)
|
||||
}()
|
||||
pair, err := socketpair()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
return os.NewFile(uintptr(pair[0]), ""), os.NewFile(uintptr(pair[1]), ""), nil
|
||||
}
|
||||
|
||||
func USocketPair() (*UnixConn, *UnixConn, error) {
|
||||
debugCheckpoint("===DEBUG=== USocketPair(). Hit enter to confirm: ")
|
||||
defer debugCheckpoint("===DEBUG=== USocketPair() returned. Hit enter to confirm ")
|
||||
a, b, err := SocketPair()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
defer a.Close()
|
||||
defer b.Close()
|
||||
uA, err := FileConn(a)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
uB, err := FileConn(b)
|
||||
if err != nil {
|
||||
uA.Close()
|
||||
return nil, nil, err
|
||||
}
|
||||
return uA, uB, nil
|
||||
}
|
||||
|
||||
// FdConn wraps a file descriptor in a standard *net.UnixConn object, or
|
||||
// returns an error if the file descriptor does not point to a unix socket.
|
||||
// This creates a duplicate file descriptor. It's the caller's responsibility
|
||||
// to close both.
|
||||
func FdConn(fd int) (n *net.UnixConn, err error) {
|
||||
{
|
||||
debugCheckpoint("===DEBUG=== FdConn([%d]) = (unknown fd). Hit enter to confirm: ", fd)
|
||||
}
|
||||
f := os.NewFile(uintptr(fd), fmt.Sprintf("%d", fd))
|
||||
conn, err := net.FileConn(f)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
uconn, ok := conn.(*net.UnixConn)
|
||||
if !ok {
|
||||
conn.Close()
|
||||
return nil, fmt.Errorf("%d: not a unix connection", fd)
|
||||
}
|
||||
return uconn, nil
|
||||
}
|
|
@ -1,237 +0,0 @@
|
|||
package beam
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestSocketPair(t *testing.T) {
|
||||
a, b, err := SocketPair()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
go func() {
|
||||
a.Write([]byte("hello world!"))
|
||||
fmt.Printf("done writing. closing\n")
|
||||
a.Close()
|
||||
fmt.Printf("done closing\n")
|
||||
}()
|
||||
data, err := ioutil.ReadAll(b)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
fmt.Printf("--> %s\n", data)
|
||||
fmt.Printf("still open: %v\n", a.Fd())
|
||||
}
|
||||
|
||||
func TestUSocketPair(t *testing.T) {
|
||||
a, b, err := USocketPair()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
data := "hello world!"
|
||||
go func() {
|
||||
a.Write([]byte(data))
|
||||
a.Close()
|
||||
}()
|
||||
res := make([]byte, 1024)
|
||||
size, err := b.Read(res)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if size != len(data) {
|
||||
t.Fatal("Unexpected size")
|
||||
}
|
||||
if string(res[0:size]) != data {
|
||||
t.Fatal("Unexpected data")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSendUnixSocket(t *testing.T) {
|
||||
a1, a2, err := USocketPair()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// defer a1.Close()
|
||||
// defer a2.Close()
|
||||
b1, b2, err := USocketPair()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// defer b1.Close()
|
||||
// defer b2.Close()
|
||||
glueA, glueB, err := SocketPair()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// defer glueA.Close()
|
||||
// defer glueB.Close()
|
||||
go func() {
|
||||
err := b2.Send([]byte("a"), glueB)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}()
|
||||
go func() {
|
||||
err := a2.Send([]byte("b"), glueA)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}()
|
||||
connAhdr, connA, err := a1.Receive()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if string(connAhdr) != "b" {
|
||||
t.Fatalf("unexpected: %s", connAhdr)
|
||||
}
|
||||
connBhdr, connB, err := b1.Receive()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if string(connBhdr) != "a" {
|
||||
t.Fatalf("unexpected: %s", connBhdr)
|
||||
}
|
||||
fmt.Printf("received both ends: %v <-> %v\n", connA.Fd(), connB.Fd())
|
||||
go func() {
|
||||
fmt.Printf("sending message on %v\n", connA.Fd())
|
||||
connA.Write([]byte("hello world"))
|
||||
connA.Sync()
|
||||
fmt.Printf("closing %v\n", connA.Fd())
|
||||
connA.Close()
|
||||
}()
|
||||
data, err := ioutil.ReadAll(connB)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
fmt.Printf("---> %s\n", data)
|
||||
|
||||
}
|
||||
|
||||
// Ensure we get proper segmenting of messages
|
||||
func TestSendSegmenting(t *testing.T) {
|
||||
a, b, err := USocketPair()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer a.Close()
|
||||
defer b.Close()
|
||||
|
||||
extrafd1, extrafd2, err := SocketPair()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
extrafd2.Close()
|
||||
|
||||
go func() {
|
||||
a.Send([]byte("message 1"), nil)
|
||||
a.Send([]byte("message 2"), extrafd1)
|
||||
a.Send([]byte("message 3"), nil)
|
||||
}()
|
||||
|
||||
msg1, file1, err := b.Receive()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if string(msg1) != "message 1" {
|
||||
t.Fatal("unexpected msg1:", string(msg1))
|
||||
}
|
||||
if file1 != nil {
|
||||
t.Fatal("unexpectedly got file1")
|
||||
}
|
||||
|
||||
msg2, file2, err := b.Receive()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if string(msg2) != "message 2" {
|
||||
t.Fatal("unexpected msg2:", string(msg2))
|
||||
}
|
||||
if file2 == nil {
|
||||
t.Fatal("didn't get file2")
|
||||
}
|
||||
file2.Close()
|
||||
|
||||
msg3, file3, err := b.Receive()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if string(msg3) != "message 3" {
|
||||
t.Fatal("unexpected msg3:", string(msg3))
|
||||
}
|
||||
if file3 != nil {
|
||||
t.Fatal("unexpectedly got file3")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Test sending a zero byte message
|
||||
func TestSendEmpty(t *testing.T) {
|
||||
a, b, err := USocketPair()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer a.Close()
|
||||
defer b.Close()
|
||||
go func() {
|
||||
a.Send([]byte{}, nil)
|
||||
}()
|
||||
|
||||
msg, file, err := b.Receive()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(msg) != 0 {
|
||||
t.Fatalf("unexpected non-empty message: %v", msg)
|
||||
}
|
||||
if file != nil {
|
||||
t.Fatal("unexpectedly got file")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func makeLarge(size int) []byte {
|
||||
res := make([]byte, size)
|
||||
for i := range res {
|
||||
res[i] = byte(i % 255)
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
func verifyLarge(data []byte, size int) bool {
|
||||
if len(data) != size {
|
||||
return false
|
||||
}
|
||||
for i := range data {
|
||||
if data[i] != byte(i%255) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// Test sending a large message
|
||||
func TestSendLarge(t *testing.T) {
|
||||
a, b, err := USocketPair()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer a.Close()
|
||||
defer b.Close()
|
||||
go func() {
|
||||
a.Send(makeLarge(100000), nil)
|
||||
}()
|
||||
|
||||
msg, file, err := b.Receive()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !verifyLarge(msg, 100000) {
|
||||
t.Fatalf("unexpected message (size %d)", len(msg))
|
||||
}
|
||||
if file != nil {
|
||||
t.Fatal("unexpectedly got file")
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue