From 78d7e8b25695863f1e1a60aedc1fb9653f03758c Mon Sep 17 00:00:00 2001 From: Kenfe-Mickael Laventure Date: Mon, 6 Feb 2017 14:57:43 -0800 Subject: [PATCH] supervisor: implement monitoring Signed-off-by: Kenfe-Mickael Laventure --- api/execution/execution.pb.go | 158 +++--- api/execution/execution.proto | 1 - api/shim/shim.pb.go | 167 +++--- api/shim/shim.proto | 3 +- cmd/containerd/main.go | 67 +-- cmd/ctr/run.go | 52 +- cmd/ctr/utils.go | 7 +- events/nats.go | 40 +- shim/service.go | 14 +- supervisor/service.go | 128 ++++- supervisor/shim.go | 98 +++- .../nats-io/go-nats-streaming/stan.go | 476 ++++++++++++++++++ .../nats-io/go-nats-streaming/sub.go | 376 ++++++++++++++ 13 files changed, 1311 insertions(+), 276 deletions(-) create mode 100644 vendor/github.com/nats-io/go-nats-streaming/stan.go create mode 100644 vendor/github.com/nats-io/go-nats-streaming/sub.go diff --git a/api/execution/execution.pb.go b/api/execution/execution.pb.go index 9925bab..78829ec 100644 --- a/api/execution/execution.pb.go +++ b/api/execution/execution.pb.go @@ -122,8 +122,7 @@ func (*CreateContainerResponse) ProtoMessage() {} func (*CreateContainerResponse) Descriptor() ([]byte, []int) { return fileDescriptorExecution, []int{2} } type DeleteContainerRequest struct { - ID string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` - Pid uint32 `protobuf:"varint,2,opt,name=pid,proto3" json:"pid,omitempty"` + ID string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` } func (m *DeleteContainerRequest) Reset() { *m = DeleteContainerRequest{} } @@ -363,10 +362,9 @@ func (this *DeleteContainerRequest) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 6) + s := make([]string, 0, 5) s = append(s, "&execution.DeleteContainerRequest{") s = append(s, "ID: "+fmt.Sprintf("%#v", this.ID)+",\n") - s = append(s, "Pid: "+fmt.Sprintf("%#v", this.Pid)+",\n") s = append(s, "}") return strings.Join(s, "") } @@ -1220,11 +1218,6 @@ func (m *DeleteContainerRequest) MarshalTo(dAtA []byte) (int, error) { i = encodeVarintExecution(dAtA, i, uint64(len(m.ID))) i += copy(dAtA[i:], m.ID) } - if m.Pid != 0 { - dAtA[i] = 0x10 - i++ - i = encodeVarintExecution(dAtA, i, uint64(m.Pid)) - } return i, nil } @@ -1936,9 +1929,6 @@ func (m *DeleteContainerRequest) Size() (n int) { if l > 0 { n += 1 + l + sovExecution(uint64(l)) } - if m.Pid != 0 { - n += 1 + sovExecution(uint64(m.Pid)) - } return n } @@ -2259,7 +2249,6 @@ func (this *DeleteContainerRequest) String() string { } s := strings.Join([]string{`&DeleteContainerRequest{`, `ID:` + fmt.Sprintf("%v", this.ID) + `,`, - `Pid:` + fmt.Sprintf("%v", this.Pid) + `,`, `}`, }, "") return s @@ -2940,25 +2929,6 @@ func (m *DeleteContainerRequest) Unmarshal(dAtA []byte) error { } m.ID = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex - case 2: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field Pid", wireType) - } - m.Pid = 0 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowExecution - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - m.Pid |= (uint32(b) & 0x7F) << shift - if b < 0x80 { - break - } - } default: iNdEx = preIndex skippy, err := skipExecution(dAtA[iNdEx:]) @@ -5039,68 +5009,68 @@ var ( func init() { proto.RegisterFile("execution.proto", fileDescriptorExecution) } var fileDescriptorExecution = []byte{ - // 998 bytes of a gzipped FileDescriptorProto + // 996 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xb4, 0x56, 0xcf, 0x6f, 0xe3, 0x44, 0x14, 0xae, 0x93, 0x36, 0x69, 0x5f, 0x9a, 0x34, 0x4c, 0x53, 0x63, 0x05, 0x94, 0x16, 0xb3, 0xbb, - 0xac, 0x90, 0x9a, 0x2e, 0x01, 0xa1, 0x95, 0x38, 0x6d, 0x9b, 0x50, 0x8a, 0x4a, 0x08, 0x93, 0x8d, - 0x56, 0x20, 0x50, 0x71, 0xe3, 0x21, 0x6b, 0x94, 0xda, 0xc6, 0x33, 0xee, 0x2e, 0x37, 0xee, 0xfc, - 0x35, 0x1c, 0xf9, 0x0f, 0xf6, 0xc8, 0x11, 0x09, 0x69, 0x45, 0x73, 0xe0, 0xcc, 0x9f, 0x80, 0x66, - 0x3c, 0x76, 0xe2, 0x1f, 0x49, 0xc3, 0x76, 0x7b, 0x9b, 0x79, 0xfe, 0xe6, 0xf3, 0xf7, 0x9e, 0xe7, - 0x7d, 0x7e, 0xb0, 0x45, 0x9e, 0x93, 0xa1, 0xcf, 0x2c, 0xc7, 0x6e, 0xba, 0x9e, 0xc3, 0x1c, 0x54, - 0x1e, 0x3a, 0x36, 0x33, 0x2c, 0x9b, 0x78, 0x66, 0xf3, 0xf2, 0x83, 0xfa, 0x5b, 0x23, 0xc7, 0x19, - 0x8d, 0xc9, 0x81, 0x78, 0x78, 0xee, 0xff, 0x70, 0x40, 0x2e, 0x5c, 0xf6, 0x73, 0x80, 0xad, 0xd7, - 0x46, 0xce, 0xc8, 0x11, 0xcb, 0x03, 0xbe, 0x0a, 0xa2, 0xfa, 0x01, 0xec, 0xf4, 0x99, 0xe1, 0xb1, - 0xa3, 0x90, 0x08, 0x93, 0x9f, 0x7c, 0x42, 0x19, 0x52, 0x21, 0x67, 0x99, 0x9a, 0xb2, 0xa7, 0xdc, - 0xdf, 0x38, 0x2c, 0x4c, 0x5e, 0xee, 0xe6, 0x4e, 0xda, 0x38, 0x67, 0x99, 0xfa, 0x6f, 0x0a, 0xa8, - 0x47, 0x1e, 0x31, 0x18, 0x59, 0xf6, 0x08, 0xda, 0x85, 0xd2, 0xb9, 0x6f, 0x9b, 0x63, 0x72, 0xe6, - 0x1a, 0xec, 0xa9, 0x96, 0xe3, 0x00, 0x0c, 0x41, 0xa8, 0x67, 0xb0, 0xa7, 0x48, 0x83, 0xe2, 0xd0, - 0xb1, 0xa9, 0x33, 0x26, 0x5a, 0x7e, 0x4f, 0xb9, 0xbf, 0x8e, 0xc3, 0x2d, 0xaa, 0xc1, 0x1a, 0x65, - 0xa6, 0x65, 0x6b, 0xab, 0xe2, 0x50, 0xb0, 0x41, 0x2a, 0x14, 0x28, 0x33, 0x1d, 0x9f, 0x69, 0x6b, - 0x22, 0x2c, 0x77, 0x32, 0x4e, 0x3c, 0x4f, 0x2b, 0x44, 0x71, 0xe2, 0x79, 0xfa, 0xaf, 0x0a, 0xbc, - 0x99, 0xd2, 0x4c, 0x5d, 0xc7, 0xa6, 0x04, 0x7d, 0x0c, 0x1b, 0x51, 0x11, 0x85, 0xf6, 0x52, 0x4b, - 0x6b, 0xc6, 0xca, 0xda, 0x9c, 0x1e, 0x9a, 0x42, 0xd1, 0x43, 0x28, 0x59, 0xb6, 0xc5, 0x7a, 0x9e, - 0x33, 0x24, 0x94, 0x8a, 0xa4, 0x4a, 0x2d, 0x35, 0x71, 0x52, 0x3e, 0xc5, 0xb3, 0x50, 0xfd, 0x10, - 0xd4, 0x36, 0x19, 0x93, 0xff, 0x51, 0xc0, 0x2a, 0xe4, 0x5d, 0xcb, 0x14, 0xef, 0x28, 0x63, 0xbe, - 0xd4, 0xf7, 0x61, 0xe7, 0xd4, 0xa2, 0xd3, 0xaf, 0x46, 0x43, 0x8a, 0x1a, 0xac, 0x39, 0xcf, 0x82, - 0x54, 0xf2, 0xbc, 0x60, 0x62, 0xa3, 0x63, 0x50, 0x93, 0x70, 0x99, 0xfe, 0x43, 0x80, 0x48, 0x32, - 0x15, 0x87, 0x16, 0xe5, 0x3f, 0x83, 0xd5, 0xff, 0x52, 0x60, 0x5b, 0x5c, 0x9d, 0x30, 0x49, 0xa9, - 0xa0, 0x05, 0x9b, 0x11, 0xea, 0x2c, 0x4a, 0x67, 0x6b, 0xf2, 0x72, 0xb7, 0x14, 0x11, 0x9d, 0xb4, - 0x71, 0x29, 0x02, 0x9d, 0x98, 0xe8, 0x01, 0x14, 0xdd, 0xa5, 0x0a, 0x19, 0xc2, 0x6e, 0xfd, 0xca, - 0x7c, 0x06, 0xb5, 0x78, 0x72, 0xb2, 0x5e, 0x33, 0x4a, 0x95, 0xa5, 0x94, 0xea, 0x3f, 0xc2, 0x46, - 0x94, 0xf7, 0xdc, 0x2f, 0xac, 0x42, 0x21, 0xe8, 0x07, 0xd9, 0x1d, 0x72, 0x87, 0xf6, 0xb9, 0x3c, - 0x83, 0xf9, 0x54, 0x64, 0x53, 0x69, 0xed, 0x24, 0xde, 0xd6, 0x17, 0x0f, 0xb1, 0x04, 0xe9, 0xbf, - 0x2b, 0x50, 0x94, 0x02, 0xc2, 0x4b, 0xa3, 0x44, 0x97, 0x06, 0x21, 0x58, 0x35, 0xbc, 0x11, 0x2f, - 0x31, 0xbf, 0x1a, 0x62, 0xcd, 0x51, 0xc4, 0xbe, 0xd4, 0xf2, 0x22, 0xc4, 0x97, 0xe8, 0x3d, 0x58, - 0xf5, 0x29, 0xf1, 0xc4, 0x0b, 0x4b, 0xad, 0xed, 0xc4, 0x0b, 0x07, 0x94, 0x78, 0x58, 0x00, 0xf8, - 0xd1, 0xe1, 0x33, 0x53, 0xd6, 0x93, 0x2f, 0x51, 0x1d, 0xd6, 0x19, 0xf1, 0x2e, 0x2c, 0xdb, 0x18, - 0x8b, 0x72, 0xae, 0xe3, 0x68, 0xcf, 0x4d, 0x80, 0x3c, 0xb7, 0xd8, 0x99, 0x4c, 0xa7, 0x28, 0x64, - 0x01, 0x0f, 0x05, 0x39, 0xe8, 0x18, 0x56, 0x07, 0x92, 0xd6, 0x9f, 0xea, 0xf6, 0x83, 0xeb, 0x3f, - 0x9a, 0x5e, 0xff, 0x91, 0x65, 0xa2, 0x7b, 0x50, 0x31, 0x4c, 0xd3, 0xe2, 0x4e, 0x68, 0x8c, 0x8f, - 0x2d, 0x93, 0x8a, 0x04, 0xca, 0x38, 0x11, 0xd5, 0xf7, 0x61, 0xfb, 0x98, 0x2c, 0xef, 0x6d, 0x5d, - 0xa8, 0xc5, 0xe1, 0x37, 0xf3, 0x08, 0xfd, 0x02, 0xd4, 0x81, 0x6b, 0x66, 0x59, 0xe5, 0xab, 0x34, - 0xc9, 0x75, 0x36, 0xca, 0xbd, 0xbc, 0x67, 0xf8, 0x74, 0x69, 0x5f, 0xd1, 0x1f, 0x80, 0x8a, 0x09, - 0xf5, 0x2f, 0x96, 0x3f, 0xf1, 0x35, 0xbc, 0x71, 0x4c, 0x5e, 0x47, 0xc7, 0xa7, 0x2d, 0xed, 0x53, - 0x40, 0xb3, 0xd4, 0xaf, 0xdc, 0x6f, 0x0c, 0x6a, 0x7d, 0x6b, 0x64, 0x1b, 0xe3, 0xdb, 0x50, 0x29, - 0xfc, 0x42, 0xb0, 0x0b, 0xdb, 0x29, 0x63, 0xb9, 0xd3, 0xbf, 0x85, 0x5a, 0x60, 0xea, 0xb7, 0x52, - 0x9b, 0xcf, 0xa1, 0xc6, 0xfd, 0x5b, 0x72, 0x93, 0x9b, 0xb0, 0xeb, 0x5f, 0x04, 0xbf, 0x8e, 0x19, - 0x2e, 0x59, 0xea, 0x8f, 0x60, 0xc3, 0x0d, 0x83, 0xf2, 0x4f, 0x30, 0xaf, 0xd8, 0x53, 0xe0, 0xfb, - 0x9f, 0x40, 0x21, 0x68, 0x60, 0x54, 0x82, 0xe2, 0x11, 0xee, 0x3c, 0x7a, 0xdc, 0x69, 0x57, 0x57, - 0xf8, 0x06, 0x0f, 0xba, 0xdd, 0x93, 0xee, 0x71, 0x55, 0xe1, 0x9b, 0xfe, 0xe3, 0x2f, 0x7b, 0xbd, - 0x4e, 0xbb, 0x9a, 0x43, 0x00, 0x85, 0xde, 0xa3, 0x41, 0xbf, 0xd3, 0xae, 0xe6, 0x5b, 0xff, 0xac, - 0x43, 0xb5, 0x13, 0xce, 0x34, 0x7d, 0xe2, 0x5d, 0x5a, 0x43, 0x82, 0xbe, 0x87, 0xad, 0xc4, 0xcf, - 0x1a, 0xdd, 0x4d, 0x76, 0x5b, 0xe6, 0x00, 0x52, 0xbf, 0x77, 0x1d, 0x4c, 0x66, 0xda, 0x85, 0x4a, - 0x7c, 0xe8, 0x41, 0x77, 0xd2, 0xbe, 0x9a, 0x9e, 0x89, 0xea, 0x6a, 0x33, 0x18, 0xb0, 0x9a, 0xe1, - 0x80, 0xd5, 0xec, 0xf0, 0x01, 0x0b, 0xf5, 0x60, 0x2b, 0xd1, 0xe7, 0x29, 0xc5, 0xd9, 0x3e, 0x30, - 0x97, 0xb1, 0x0b, 0x95, 0x78, 0x2b, 0xa7, 0x14, 0x66, 0x76, 0xfa, 0x22, 0x85, 0x89, 0x4e, 0x4f, - 0x29, 0xcc, 0x76, 0x82, 0x45, 0x8c, 0x89, 0x29, 0x26, 0xc5, 0x98, 0x3d, 0xe5, 0xcc, 0x65, 0x7c, - 0x02, 0x9b, 0xb3, 0xee, 0x8b, 0xf4, 0x04, 0x5d, 0x86, 0x93, 0xd7, 0xdf, 0x5d, 0x88, 0x91, 0x9f, - 0xfb, 0x3b, 0xa8, 0xc4, 0xa7, 0x9f, 0x54, 0x31, 0x33, 0x67, 0xa9, 0xfa, 0xdd, 0x6b, 0x50, 0x92, - 0xfe, 0x09, 0x6c, 0xce, 0x8e, 0x0a, 0x29, 0xdd, 0x19, 0x43, 0x52, 0x4a, 0x77, 0xe6, 0xac, 0xf1, - 0x15, 0xc0, 0xd4, 0x11, 0xd1, 0x5e, 0x3a, 0xd5, 0x04, 0xe9, 0x3b, 0x0b, 0x10, 0x92, 0xf2, 0x14, - 0xca, 0x31, 0x73, 0x44, 0x29, 0x21, 0x19, 0xd6, 0x39, 0xf7, 0x8b, 0x9d, 0x42, 0x39, 0x66, 0x7a, - 0x29, 0xb6, 0x2c, 0x4b, 0x9c, 0xcb, 0xf6, 0x0d, 0x94, 0x63, 0xc6, 0x94, 0x62, 0xcb, 0xb2, 0xc0, - 0xfa, 0x9d, 0xc5, 0xa0, 0x20, 0xef, 0xc3, 0xb7, 0x5f, 0x5c, 0x35, 0x56, 0xfe, 0xbc, 0x6a, 0xac, - 0xfc, 0x7b, 0xd5, 0x50, 0x7e, 0x99, 0x34, 0x94, 0x17, 0x93, 0x86, 0xf2, 0xc7, 0xa4, 0xa1, 0xfc, - 0x3d, 0x69, 0x28, 0xe7, 0x05, 0xa1, 0xe4, 0xc3, 0xff, 0x02, 0x00, 0x00, 0xff, 0xff, 0x24, 0xc6, - 0xc8, 0x88, 0x60, 0x0d, 0x00, 0x00, + 0xac, 0x90, 0x9a, 0x96, 0x80, 0xd0, 0x4a, 0x9c, 0x76, 0x9b, 0x50, 0x8a, 0x4a, 0x08, 0x93, 0x8d, + 0x56, 0x20, 0x50, 0x71, 0xe3, 0x21, 0x3b, 0x28, 0xb1, 0x83, 0x67, 0xdc, 0x5d, 0x6e, 0xdc, 0xf9, + 0x6b, 0x38, 0xf2, 0x1f, 0xec, 0x91, 0x23, 0x12, 0xd2, 0x8a, 0xe6, 0xc0, 0x99, 0x3f, 0x01, 0xcd, + 0x78, 0xe2, 0xc4, 0x3f, 0x92, 0x46, 0xbb, 0xf4, 0x36, 0xf3, 0xfc, 0xcd, 0xe7, 0xef, 0x3d, 0xcf, + 0xfb, 0xfc, 0x60, 0x87, 0x3c, 0x27, 0x7d, 0x9f, 0x53, 0xd7, 0xa9, 0x8f, 0x3d, 0x97, 0xbb, 0xa8, + 0xd8, 0x77, 0x1d, 0x6e, 0x51, 0x87, 0x78, 0x76, 0xfd, 0xea, 0x83, 0xea, 0x5b, 0x03, 0xd7, 0x1d, + 0x0c, 0xc9, 0x91, 0x7c, 0x78, 0xe9, 0xff, 0x70, 0x44, 0x46, 0x63, 0xfe, 0x73, 0x80, 0xad, 0x56, + 0x06, 0xee, 0xc0, 0x95, 0xcb, 0x23, 0xb1, 0x0a, 0xa2, 0xe6, 0x11, 0xec, 0x75, 0xb9, 0xe5, 0xf1, + 0x93, 0x29, 0x11, 0x26, 0x3f, 0xf9, 0x84, 0x71, 0xa4, 0x43, 0x86, 0xda, 0x86, 0x76, 0xa0, 0xdd, + 0xdf, 0x7a, 0x94, 0x9b, 0xbc, 0xdc, 0xcf, 0x9c, 0x35, 0x71, 0x86, 0xda, 0xe6, 0x6f, 0x1a, 0xe8, + 0x27, 0x1e, 0xb1, 0x38, 0x59, 0xf5, 0x08, 0xda, 0x87, 0xc2, 0xa5, 0xef, 0xd8, 0x43, 0x72, 0x31, + 0xb6, 0xf8, 0x53, 0x23, 0x23, 0x00, 0x18, 0x82, 0x50, 0xc7, 0xe2, 0x4f, 0x91, 0x01, 0xf9, 0xbe, + 0xeb, 0x30, 0x77, 0x48, 0x8c, 0xec, 0x81, 0x76, 0x7f, 0x13, 0x4f, 0xb7, 0xa8, 0x02, 0x1b, 0x8c, + 0xdb, 0xd4, 0x31, 0xd6, 0xe5, 0xa1, 0x60, 0x83, 0x74, 0xc8, 0x31, 0x6e, 0xbb, 0x3e, 0x37, 0x36, + 0x64, 0x58, 0xed, 0x54, 0x9c, 0x78, 0x9e, 0x91, 0x0b, 0xe3, 0xc4, 0xf3, 0xcc, 0x5f, 0x35, 0x78, + 0x33, 0xa1, 0x99, 0x8d, 0x5d, 0x87, 0x11, 0xf4, 0x31, 0x6c, 0x85, 0x45, 0x94, 0xda, 0x0b, 0x0d, + 0xa3, 0x1e, 0x29, 0x6b, 0x7d, 0x76, 0x68, 0x06, 0x45, 0x0f, 0xa0, 0x40, 0x1d, 0xca, 0x3b, 0x9e, + 0xdb, 0x27, 0x8c, 0xc9, 0xa4, 0x0a, 0x0d, 0x3d, 0x76, 0x52, 0x3d, 0xc5, 0xf3, 0x50, 0xf3, 0x18, + 0xf4, 0x26, 0x19, 0x92, 0xd5, 0x0b, 0x68, 0x1e, 0xc2, 0xde, 0x39, 0x65, 0xb3, 0x6f, 0xc4, 0xa6, + 0x07, 0x2a, 0xb0, 0xe1, 0x3e, 0x0b, 0x84, 0x67, 0x45, 0x79, 0xe4, 0xc6, 0xc4, 0xa0, 0xc7, 0xe1, + 0x2a, 0xd9, 0x07, 0x00, 0xa1, 0x40, 0x26, 0x0f, 0x2d, 0xcb, 0x76, 0x0e, 0x6b, 0xfe, 0xa5, 0xc1, + 0xae, 0xbc, 0x28, 0xd3, 0x94, 0x94, 0x82, 0x06, 0x6c, 0x87, 0xa8, 0x8b, 0x50, 0xfc, 0xce, 0xe4, + 0xe5, 0x7e, 0x21, 0x24, 0x3a, 0x6b, 0xe2, 0x42, 0x08, 0x3a, 0xb3, 0xd1, 0x31, 0xe4, 0xc7, 0x2b, + 0x95, 0x6d, 0x0a, 0xbb, 0xf5, 0x0b, 0xf2, 0x19, 0x54, 0xa2, 0xc9, 0xa9, 0x7a, 0xcd, 0x29, 0xd5, + 0x56, 0x52, 0x6a, 0xfe, 0x08, 0x5b, 0x61, 0xde, 0x0b, 0x1b, 0x42, 0x87, 0x5c, 0x70, 0xfb, 0x55, + 0x2f, 0xa8, 0x1d, 0x3a, 0x14, 0xf2, 0x2c, 0xee, 0x33, 0x99, 0x4d, 0xa9, 0xb1, 0x17, 0x7b, 0x5b, + 0x57, 0x3e, 0xc4, 0x0a, 0x64, 0xfe, 0xae, 0x41, 0x5e, 0x09, 0x40, 0x65, 0xc8, 0x8e, 0xd5, 0xbb, + 0x8a, 0x58, 0x2c, 0x11, 0x82, 0x75, 0xcb, 0x1b, 0x88, 0x12, 0x8b, 0xab, 0x21, 0xd7, 0x02, 0x45, + 0x9c, 0x2b, 0x23, 0x2b, 0x43, 0x62, 0x89, 0xde, 0x83, 0x75, 0x9f, 0x11, 0x4f, 0xbe, 0xb0, 0xd0, + 0xd8, 0x8d, 0xbd, 0xb0, 0xc7, 0x88, 0x87, 0x25, 0x40, 0x1c, 0xed, 0x3f, 0xb3, 0x55, 0x3d, 0xc5, + 0x12, 0x55, 0x61, 0x93, 0x13, 0x6f, 0x44, 0x1d, 0x6b, 0x28, 0xcb, 0xb9, 0x89, 0xc3, 0xbd, 0x68, + 0x79, 0xf2, 0x9c, 0xf2, 0x0b, 0x95, 0x4e, 0x5e, 0xca, 0x02, 0x11, 0x0a, 0x72, 0x30, 0x31, 0xac, + 0xf7, 0x14, 0xad, 0x3f, 0xd3, 0xed, 0x53, 0x5b, 0x44, 0x06, 0xd4, 0x96, 0x95, 0x29, 0x62, 0xb1, + 0x44, 0xf7, 0xa0, 0x64, 0xd9, 0x36, 0x15, 0xbe, 0x67, 0x0d, 0x4f, 0xa9, 0xcd, 0x64, 0x02, 0x45, + 0x1c, 0x8b, 0x9a, 0x87, 0xb0, 0x7b, 0x4a, 0x56, 0x77, 0xb2, 0x36, 0x54, 0xa2, 0xf0, 0xd7, 0x73, + 0x04, 0x73, 0x04, 0x7a, 0x6f, 0x6c, 0xa7, 0x19, 0xe3, 0xab, 0x34, 0xc9, 0x4d, 0xa6, 0x29, 0x9c, + 0xbb, 0x63, 0xf9, 0x6c, 0x75, 0x17, 0x39, 0x06, 0x1d, 0x13, 0xe6, 0x8f, 0x56, 0x3f, 0xf1, 0x35, + 0xbc, 0x71, 0x4a, 0xfe, 0x8f, 0x8e, 0x57, 0xb7, 0x33, 0x13, 0xde, 0x4e, 0xf3, 0x53, 0x40, 0xf3, + 0xd4, 0xaf, 0xdc, 0x6f, 0x1c, 0x2a, 0x5d, 0x3a, 0x70, 0xac, 0xe1, 0x6d, 0xa8, 0x94, 0x7e, 0x21, + 0xd9, 0xa5, 0xed, 0x14, 0xb1, 0xda, 0x99, 0xdf, 0x42, 0x25, 0xb0, 0xf0, 0x5b, 0xa9, 0xcd, 0xe7, + 0x50, 0x11, 0xfe, 0xad, 0xb8, 0xc9, 0xeb, 0xb0, 0x9b, 0x5f, 0x04, 0xbf, 0x8e, 0x39, 0x2e, 0x55, + 0xea, 0x8f, 0x60, 0x6b, 0x3c, 0x0d, 0xaa, 0x3f, 0xc1, 0xa2, 0x62, 0xcf, 0x80, 0xef, 0x7f, 0x02, + 0xb9, 0xa0, 0x81, 0x51, 0x01, 0xf2, 0x27, 0xb8, 0xf5, 0xf0, 0x71, 0xab, 0x59, 0x5e, 0x13, 0x1b, + 0xdc, 0x6b, 0xb7, 0xcf, 0xda, 0xa7, 0x65, 0x4d, 0x6c, 0xba, 0x8f, 0xbf, 0xec, 0x74, 0x5a, 0xcd, + 0x72, 0x06, 0x01, 0xe4, 0x3a, 0x0f, 0x7b, 0xdd, 0x56, 0xb3, 0x9c, 0x6d, 0xfc, 0xb3, 0x09, 0xe5, + 0xd6, 0x74, 0x82, 0xe9, 0x12, 0xef, 0x8a, 0xf6, 0x09, 0xfa, 0x1e, 0x76, 0x62, 0xbf, 0x66, 0x74, + 0x37, 0xde, 0x6d, 0xa9, 0xe3, 0x46, 0xf5, 0xde, 0x4d, 0x30, 0x95, 0x69, 0x1b, 0x4a, 0xd1, 0x11, + 0x07, 0xdd, 0x49, 0xfa, 0x6a, 0x72, 0x02, 0xaa, 0xea, 0xf5, 0x60, 0x9c, 0xaa, 0x4f, 0xc7, 0xa9, + 0x7a, 0x4b, 0x8c, 0x53, 0xa8, 0x03, 0x3b, 0xb1, 0x3e, 0x4f, 0x28, 0x4e, 0xf7, 0x81, 0x85, 0x8c, + 0x6d, 0x28, 0x45, 0x5b, 0x39, 0xa1, 0x30, 0xb5, 0xd3, 0x97, 0x29, 0x8c, 0x75, 0x7a, 0x42, 0x61, + 0xba, 0x13, 0x2c, 0x63, 0x8c, 0xcd, 0x2c, 0x09, 0xc6, 0xf4, 0x99, 0x66, 0x21, 0xe3, 0x13, 0xd8, + 0x9e, 0x77, 0x5f, 0x64, 0xc6, 0xe8, 0x52, 0x9c, 0xbc, 0xfa, 0xee, 0x52, 0x8c, 0xfa, 0xdc, 0xdf, + 0x41, 0x29, 0x3a, 0xfd, 0x24, 0x8a, 0x99, 0x3a, 0x4b, 0x55, 0xef, 0xde, 0x80, 0x52, 0xf4, 0x4f, + 0x60, 0x7b, 0x7e, 0x54, 0x48, 0xe8, 0x4e, 0x19, 0x92, 0x12, 0xba, 0x53, 0x67, 0x8d, 0xaf, 0x00, + 0x66, 0x8e, 0x88, 0x0e, 0x92, 0xa9, 0xc6, 0x48, 0xdf, 0x59, 0x82, 0x50, 0x94, 0xe7, 0x50, 0x8c, + 0x98, 0x23, 0x4a, 0x08, 0x49, 0xb1, 0xce, 0x85, 0x5f, 0xec, 0x1c, 0x8a, 0x11, 0xd3, 0x4b, 0xb0, + 0xa5, 0x59, 0xe2, 0x42, 0xb6, 0x6f, 0xa0, 0x18, 0x31, 0xa6, 0x04, 0x5b, 0x9a, 0x05, 0x56, 0xef, + 0x2c, 0x07, 0x05, 0x79, 0x3f, 0x7a, 0xfb, 0xc5, 0x75, 0x6d, 0xed, 0xcf, 0xeb, 0xda, 0xda, 0xbf, + 0xd7, 0x35, 0xed, 0x97, 0x49, 0x4d, 0x7b, 0x31, 0xa9, 0x69, 0x7f, 0x4c, 0x6a, 0xda, 0xdf, 0x93, + 0x9a, 0x76, 0x99, 0x93, 0x4a, 0x3e, 0xfc, 0x2f, 0x00, 0x00, 0xff, 0xff, 0xf5, 0x7a, 0x45, 0x38, + 0x4e, 0x0d, 0x00, 0x00, } diff --git a/api/execution/execution.proto b/api/execution/execution.proto index a19c915..55d9c86 100644 --- a/api/execution/execution.proto +++ b/api/execution/execution.proto @@ -43,7 +43,6 @@ message CreateContainerResponse { message DeleteContainerRequest { string id = 1 [(gogoproto.customname) = "ID"]; - uint32 pid = 2; } message ListContainersRequest { diff --git a/api/shim/shim.pb.go b/api/shim/shim.pb.go index e1d64ed..ae311d4 100644 --- a/api/shim/shim.pb.go +++ b/api/shim/shim.pb.go @@ -249,7 +249,8 @@ func (*StateRequest) Descriptor() ([]byte, []int) { return fileDescriptorShim, [ type StateResponse struct { ID string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` Bundle string `protobuf:"bytes,2,opt,name=bundle,proto3" json:"bundle,omitempty"` - Processes []*Process `protobuf:"bytes,3,rep,name=processes" json:"processes,omitempty"` + InitPid uint32 `protobuf:"varint,3,opt,name=initPid,proto3" json:"initPid,omitempty"` + Processes []*Process `protobuf:"bytes,4,rep,name=processes" json:"processes,omitempty"` } func (m *StateResponse) Reset() { *m = StateResponse{} } @@ -463,10 +464,11 @@ func (this *StateResponse) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 7) + s := make([]string, 0, 8) s = append(s, "&shim.StateResponse{") s = append(s, "ID: "+fmt.Sprintf("%#v", this.ID)+",\n") s = append(s, "Bundle: "+fmt.Sprintf("%#v", this.Bundle)+",\n") + s = append(s, "InitPid: "+fmt.Sprintf("%#v", this.InitPid)+",\n") if this.Processes != nil { s = append(s, "Processes: "+fmt.Sprintf("%#v", this.Processes)+",\n") } @@ -1432,9 +1434,14 @@ func (m *StateResponse) MarshalTo(dAtA []byte) (int, error) { i = encodeVarintShim(dAtA, i, uint64(len(m.Bundle))) i += copy(dAtA[i:], m.Bundle) } + if m.InitPid != 0 { + dAtA[i] = 0x18 + i++ + i = encodeVarintShim(dAtA, i, uint64(m.InitPid)) + } if len(m.Processes) > 0 { for _, msg := range m.Processes { - dAtA[i] = 0x1a + dAtA[i] = 0x22 i++ i = encodeVarintShim(dAtA, i, uint64(msg.Size())) n, err := msg.MarshalTo(dAtA[i:]) @@ -1772,6 +1779,9 @@ func (m *StateResponse) Size() (n int) { if l > 0 { n += 1 + l + sovShim(uint64(l)) } + if m.InitPid != 0 { + n += 1 + sovShim(uint64(m.InitPid)) + } if len(m.Processes) > 0 { for _, e := range m.Processes { l = e.Size() @@ -1980,6 +1990,7 @@ func (this *StateResponse) String() string { s := strings.Join([]string{`&StateResponse{`, `ID:` + fmt.Sprintf("%v", this.ID) + `,`, `Bundle:` + fmt.Sprintf("%v", this.Bundle) + `,`, + `InitPid:` + fmt.Sprintf("%v", this.InitPid) + `,`, `Processes:` + strings.Replace(fmt.Sprintf("%v", this.Processes), "Process", "Process", 1) + `,`, `}`, }, "") @@ -3725,6 +3736,25 @@ func (m *StateResponse) Unmarshal(dAtA []byte) error { m.Bundle = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field InitPid", wireType) + } + m.InitPid = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowShim + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.InitPid |= (uint32(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 4: if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field Processes", wireType) } @@ -4072,69 +4102,70 @@ var ( func init() { proto.RegisterFile("shim.proto", fileDescriptorShim) } var fileDescriptorShim = []byte{ - // 1018 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x94, 0x55, 0xcf, 0x6e, 0xe3, 0xb6, - 0x13, 0x8e, 0x2c, 0xf9, 0xdf, 0x38, 0x72, 0xfc, 0x23, 0x16, 0x81, 0xe2, 0xfd, 0xd5, 0x71, 0xd4, - 0x43, 0xd3, 0xa0, 0x70, 0x9a, 0xb4, 0x97, 0x16, 0xe8, 0x21, 0x1b, 0x0b, 0xdb, 0x00, 0x69, 0x62, - 0xd0, 0x0e, 0xb0, 0x37, 0x43, 0x89, 0x18, 0x9b, 0x80, 0x2c, 0xa9, 0x24, 0x95, 0x3f, 0x3d, 0xf5, - 0xd2, 0x47, 0xe8, 0x53, 0xf4, 0x45, 0xf6, 0xd8, 0x53, 0xd1, 0x53, 0xd1, 0xe4, 0x09, 0xfa, 0x08, - 0x05, 0x49, 0xc9, 0x4e, 0x36, 0xd2, 0x02, 0xbd, 0xcd, 0x7c, 0xfa, 0x34, 0x9c, 0x6f, 0x38, 0x33, - 0x04, 0xe0, 0x73, 0xba, 0x18, 0x24, 0x2c, 0x16, 0x31, 0x42, 0x57, 0x71, 0x24, 0x7c, 0x1a, 0x11, - 0x16, 0x0c, 0x14, 0x7c, 0x73, 0xd0, 0x7d, 0x3d, 0x8b, 0xe3, 0x59, 0x48, 0xf6, 0x15, 0xe3, 0x32, - 0xbd, 0xde, 0x27, 0x8b, 0x44, 0xdc, 0xeb, 0x1f, 0xba, 0xaf, 0x66, 0xf1, 0x2c, 0x56, 0xe6, 0xbe, - 0xb4, 0x34, 0xea, 0xfe, 0x61, 0x80, 0x7d, 0xcc, 0x88, 0x2f, 0x08, 0x26, 0x3f, 0xa6, 0x84, 0x0b, - 0xb4, 0x09, 0x15, 0x1a, 0x38, 0x46, 0xdf, 0xd8, 0x6d, 0xbe, 0xa9, 0x3d, 0xfe, 0xb5, 0x5d, 0x39, - 0x19, 0xe2, 0x0a, 0x0d, 0xd0, 0x26, 0xd4, 0x2e, 0xd3, 0x28, 0x08, 0x89, 0x53, 0x91, 0xdf, 0x70, - 0xe6, 0x21, 0x07, 0xea, 0x2c, 0x8d, 0x04, 0x5d, 0x10, 0xc7, 0x54, 0x1f, 0x72, 0x17, 0x6d, 0x41, - 0x23, 0x8a, 0xa7, 0x09, 0xbd, 0x89, 0x85, 0x63, 0xf5, 0x8d, 0xdd, 0x06, 0xae, 0x47, 0xf1, 0x48, - 0xba, 0xa8, 0x0b, 0x0d, 0x41, 0xd8, 0x82, 0x46, 0x7e, 0xe8, 0x54, 0xd5, 0xa7, 0xa5, 0x8f, 0x5e, - 0x41, 0x95, 0x8b, 0x80, 0x46, 0x4e, 0x4d, 0x85, 0xd3, 0x8e, 0x3c, 0x9e, 0x8b, 0x20, 0x4e, 0x85, - 0x53, 0xd7, 0xc7, 0x6b, 0x2f, 0xc3, 0x09, 0x63, 0x4e, 0x63, 0x89, 0x13, 0xc6, 0x5c, 0x17, 0xda, - 0xb9, 0x2e, 0x9e, 0xc4, 0x11, 0x27, 0xa8, 0x03, 0x66, 0x92, 0x29, 0xb3, 0xb1, 0x34, 0xdd, 0x36, - 0xac, 0x8f, 0x85, 0xcf, 0x44, 0x26, 0xdd, 0xdd, 0x01, 0x7b, 0x48, 0x42, 0xb2, 0xaa, 0xc5, 0xcb, - 0x5f, 0x0e, 0xa0, 0x9d, 0x53, 0xb2, 0xb0, 0xdb, 0xd0, 0x22, 0x77, 0x54, 0x4c, 0xb9, 0xf0, 0x45, - 0xca, 0x33, 0x2e, 0x48, 0x68, 0xac, 0x10, 0xf7, 0x37, 0x13, 0x5a, 0xde, 0x1d, 0xb9, 0xca, 0x83, - 0x3e, 0xd5, 0x6e, 0x94, 0x69, 0xaf, 0x14, 0x6b, 0x37, 0x4b, 0xb4, 0x5b, 0x4f, 0xb5, 0xa3, 0x4f, - 0xc1, 0xe6, 0x24, 0xa4, 0x51, 0x7a, 0x37, 0x0d, 0xfd, 0x4b, 0xa2, 0x4b, 0xdc, 0xc4, 0xeb, 0x19, - 0x78, 0x2a, 0x31, 0xf4, 0x05, 0x58, 0x29, 0x27, 0x4c, 0x55, 0xb9, 0x75, 0xe8, 0x0c, 0x5e, 0xf6, - 0xd3, 0xe0, 0x82, 0x13, 0x86, 0x15, 0x0b, 0x21, 0xb0, 0x7c, 0x36, 0xe3, 0x4e, 0xbd, 0x6f, 0xee, - 0x36, 0xb1, 0xb2, 0x65, 0x75, 0x48, 0x74, 0xe3, 0x34, 0x14, 0x24, 0x4d, 0x89, 0x5c, 0xdd, 0x06, - 0x4e, 0x53, 0x1d, 0x27, 0x4d, 0xe4, 0xc2, 0xfa, 0x95, 0x9f, 0xf8, 0x97, 0x34, 0xa4, 0x82, 0x12, - 0xee, 0x80, 0x22, 0x3f, 0xc3, 0xd0, 0xd7, 0x50, 0x67, 0x21, 0x5d, 0x50, 0xc1, 0x9d, 0x56, 0xdf, - 0xdc, 0x6d, 0x1d, 0x76, 0x8b, 0x92, 0xc1, 0x8a, 0x82, 0x73, 0x2a, 0xda, 0x83, 0xff, 0x45, 0xf1, - 0x34, 0x22, 0xb7, 0xd3, 0x84, 0xd1, 0x1b, 0x1a, 0x92, 0x19, 0xe1, 0xce, 0xba, 0xaa, 0xe7, 0x46, - 0x14, 0x9f, 0x91, 0xdb, 0xd1, 0x12, 0x46, 0x9f, 0x43, 0xc7, 0x4f, 0x12, 0x9f, 0x2d, 0x62, 0x36, - 0x4d, 0x58, 0x7c, 0x4d, 0x43, 0xe2, 0xd8, 0x2a, 0xc9, 0x8d, 0x1c, 0x1f, 0x69, 0xd8, 0x1d, 0x83, - 0x25, 0x65, 0x4b, 0x29, 0xe9, 0xea, 0xea, 0x53, 0x1a, 0x48, 0x64, 0x46, 0x03, 0x75, 0x33, 0x36, - 0x96, 0x26, 0xfa, 0x0c, 0x36, 0xfc, 0x20, 0xa0, 0x82, 0xc6, 0x91, 0x1f, 0x4e, 0x67, 0x34, 0xe0, - 0x8e, 0xd9, 0x37, 0x77, 0x6d, 0xdc, 0x5e, 0xc1, 0x6f, 0x69, 0xc0, 0xdd, 0x21, 0xd4, 0x74, 0xfa, - 0xb2, 0x8e, 0xe2, 0x3e, 0x21, 0x7a, 0xbe, 0xb0, 0xb2, 0x25, 0x36, 0xf7, 0x99, 0x8e, 0x6c, 0x61, - 0x65, 0x4b, 0x8c, 0xc7, 0xd7, 0xfa, 0xc2, 0x2d, 0xac, 0x6c, 0xb7, 0x0f, 0xeb, 0xba, 0x8f, 0x4a, - 0x1b, 0xfa, 0x14, 0x60, 0x24, 0xee, 0x4b, 0xbb, 0x57, 0xb6, 0xd7, 0x2d, 0x0d, 0xc4, 0x3c, 0x13, - 0xa1, 0x1d, 0xd9, 0x46, 0x73, 0x42, 0x67, 0x73, 0x7d, 0x9a, 0x8d, 0x33, 0xcf, 0xdd, 0x00, 0xdb, - 0xbb, 0x21, 0x91, 0xe0, 0xf9, 0x7c, 0xfc, 0x62, 0x40, 0x55, 0x21, 0xa5, 0x4b, 0xe2, 0x20, 0x93, - 0x27, 0xe3, 0xb7, 0x0f, 0x3f, 0x29, 0xba, 0x47, 0x15, 0x60, 0x72, 0x9f, 0x90, 0x4c, 0x7d, 0x96, - 0xa5, 0xb9, 0xca, 0xf2, 0x83, 0x89, 0xb2, 0x5e, 0x4c, 0x94, 0x9e, 0xdb, 0xe5, 0x98, 0xba, 0x3f, - 0x81, 0x9d, 0xf9, 0x59, 0x65, 0xfe, 0xeb, 0x0e, 0xfb, 0x06, 0x9a, 0x09, 0x8b, 0xaf, 0x08, 0xe7, - 0x44, 0x5f, 0x61, 0xeb, 0xf0, 0x75, 0x51, 0xee, 0x23, 0x4d, 0xc2, 0x2b, 0xb6, 0x7b, 0x0a, 0xf5, - 0x0c, 0x2d, 0xa8, 0xf7, 0xbe, 0x1c, 0x67, 0x5f, 0xe4, 0xf5, 0xd8, 0x2a, 0x8a, 0xa9, 0x33, 0xd7, - 0x3c, 0xa9, 0x6c, 0xe4, 0xa7, 0x7c, 0xa9, 0x6c, 0x03, 0x6c, 0x4c, 0x78, 0xba, 0xc8, 0x81, 0x3d, - 0x0f, 0x9a, 0xcb, 0x02, 0xa2, 0x06, 0x58, 0xde, 0xbb, 0x93, 0x49, 0x67, 0x0d, 0xd5, 0xc1, 0x3c, - 0x3f, 0xff, 0xa1, 0x63, 0x20, 0x80, 0xda, 0x31, 0xf6, 0x8e, 0x26, 0x5e, 0xa7, 0x82, 0x9a, 0x50, - 0x1d, 0x4f, 0x8e, 0xf0, 0xa4, 0x63, 0xa2, 0x36, 0x80, 0xf7, 0xce, 0x3b, 0x9e, 0x1e, 0x0d, 0x87, - 0xde, 0xb0, 0x63, 0xed, 0x7d, 0x0b, 0x55, 0x75, 0x2e, 0x6a, 0x41, 0x7d, 0x3c, 0x39, 0x1f, 0x8d, - 0xbc, 0x61, 0x67, 0x4d, 0x3a, 0xf8, 0xe2, 0xec, 0xec, 0xe4, 0xec, 0xad, 0x8e, 0x34, 0x3a, 0xba, - 0x18, 0x7b, 0xc3, 0x4e, 0x45, 0x7e, 0xd0, 0x51, 0x87, 0x1d, 0xf3, 0xf0, 0xd7, 0x2a, 0x58, 0xe3, - 0x39, 0x5d, 0xa0, 0x73, 0xa8, 0xe9, 0x15, 0x8b, 0x76, 0x8a, 0x84, 0x3d, 0x7b, 0x56, 0xba, 0xee, - 0xc7, 0x28, 0xd9, 0xb5, 0x1d, 0xa9, 0xac, 0x98, 0x40, 0xfd, 0x92, 0x42, 0x2d, 0x57, 0x75, 0x77, - 0x73, 0xa0, 0xdf, 0xba, 0x41, 0xfe, 0xd6, 0x0d, 0x3c, 0xf9, 0xd6, 0xc9, 0x9c, 0xf4, 0x7e, 0x2e, - 0xce, 0xe9, 0xd9, 0x7a, 0x2f, 0xce, 0xe9, 0x83, 0xf5, 0x7e, 0x02, 0x96, 0x1c, 0x3a, 0xb4, 0x5d, - 0xd8, 0xcb, 0xab, 0xb5, 0xde, 0xed, 0x97, 0x13, 0xb2, 0x50, 0xdf, 0x81, 0x39, 0x12, 0xf7, 0xa8, - 0x57, 0xd8, 0x59, 0xcb, 0xb1, 0x2d, 0x95, 0xf6, 0x3d, 0xd4, 0xf4, 0x38, 0x16, 0x4b, 0x7b, 0x36, - 0xaa, 0xdd, 0xad, 0x52, 0xca, 0x97, 0x06, 0x3a, 0xcd, 0x6f, 0xbf, 0x5f, 0xde, 0x90, 0x59, 0x9c, - 0x9d, 0x8f, 0x30, 0x56, 0xb7, 0xa6, 0x7a, 0xb6, 0x38, 0xda, 0xd3, 0x76, 0x2e, 0x95, 0x76, 0x0c, - 0x35, 0xdd, 0xe6, 0xc5, 0xd2, 0x9e, 0x8d, 0x40, 0x59, 0x90, 0x37, 0xff, 0x7f, 0xff, 0xd0, 0x5b, - 0xfb, 0xf3, 0xa1, 0xb7, 0xf6, 0xcf, 0x43, 0xcf, 0xf8, 0xf9, 0xb1, 0x67, 0xbc, 0x7f, 0xec, 0x19, - 0xbf, 0x3f, 0xf6, 0x8c, 0xbf, 0x1f, 0x7b, 0xc6, 0x65, 0x4d, 0xb1, 0xbf, 0xfa, 0x37, 0x00, 0x00, - 0xff, 0xff, 0x62, 0xce, 0xef, 0x76, 0x44, 0x09, 0x00, 0x00, + // 1027 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x9c, 0x55, 0xcf, 0x6e, 0xe3, 0xb6, + 0x13, 0x8e, 0x2c, 0xf9, 0xdf, 0x38, 0x76, 0xfc, 0x23, 0x16, 0x81, 0xe2, 0xfd, 0xd5, 0x71, 0xd4, + 0x43, 0xd3, 0xa0, 0x70, 0x9a, 0xb4, 0x97, 0x16, 0xe8, 0x21, 0x1b, 0x0b, 0xdb, 0x00, 0x69, 0x22, + 0xd0, 0x0e, 0xb0, 0x37, 0x43, 0x89, 0x18, 0x9b, 0x80, 0x2c, 0xa9, 0x24, 0x95, 0x3f, 0xb7, 0x5e, + 0xfa, 0x06, 0xed, 0x53, 0xf4, 0x45, 0xf6, 0xd8, 0x53, 0xd1, 0x53, 0xd1, 0xe4, 0x09, 0xfa, 0x08, + 0x05, 0x49, 0xc9, 0x4e, 0x36, 0xd2, 0x1e, 0x7a, 0x9b, 0xf9, 0xf4, 0x69, 0x38, 0xdf, 0x70, 0x66, + 0x08, 0xc0, 0xe7, 0x74, 0x31, 0x4c, 0x58, 0x2c, 0x62, 0x84, 0xae, 0xe2, 0x48, 0xf8, 0x34, 0x22, + 0x2c, 0x18, 0x2a, 0xf8, 0xe6, 0xa0, 0xf7, 0x7a, 0x16, 0xc7, 0xb3, 0x90, 0xec, 0x2b, 0xc6, 0x65, + 0x7a, 0xbd, 0x4f, 0x16, 0x89, 0xb8, 0xd7, 0x3f, 0xf4, 0x5e, 0xcd, 0xe2, 0x59, 0xac, 0xcc, 0x7d, + 0x69, 0x69, 0xd4, 0xf9, 0xc3, 0x80, 0xf6, 0x31, 0x23, 0xbe, 0x20, 0x98, 0xfc, 0x98, 0x12, 0x2e, + 0xd0, 0x26, 0x54, 0x68, 0x60, 0x1b, 0x03, 0x63, 0xb7, 0xf9, 0xa6, 0xf6, 0xf8, 0xd7, 0x76, 0xe5, + 0x64, 0x84, 0x2b, 0x34, 0x40, 0x9b, 0x50, 0xbb, 0x4c, 0xa3, 0x20, 0x24, 0x76, 0x45, 0x7e, 0xc3, + 0x99, 0x87, 0x6c, 0xa8, 0xb3, 0x34, 0x12, 0x74, 0x41, 0x6c, 0x53, 0x7d, 0xc8, 0x5d, 0xb4, 0x05, + 0x8d, 0x28, 0x9e, 0x26, 0xf4, 0x26, 0x16, 0xb6, 0x35, 0x30, 0x76, 0x1b, 0xb8, 0x1e, 0xc5, 0x9e, + 0x74, 0x51, 0x0f, 0x1a, 0x82, 0xb0, 0x05, 0x8d, 0xfc, 0xd0, 0xae, 0xaa, 0x4f, 0x4b, 0x1f, 0xbd, + 0x82, 0x2a, 0x17, 0x01, 0x8d, 0xec, 0x9a, 0x0a, 0xa7, 0x1d, 0x79, 0x3c, 0x17, 0x41, 0x9c, 0x0a, + 0xbb, 0xae, 0x8f, 0xd7, 0x5e, 0x86, 0x13, 0xc6, 0xec, 0xc6, 0x12, 0x27, 0x8c, 0x39, 0x0e, 0x74, + 0x72, 0x5d, 0x3c, 0x89, 0x23, 0x4e, 0x50, 0x17, 0xcc, 0x24, 0x53, 0xd6, 0xc6, 0xd2, 0x74, 0x3a, + 0xb0, 0x3e, 0x16, 0x3e, 0x13, 0x99, 0x74, 0x67, 0x07, 0xda, 0x23, 0x12, 0x92, 0x55, 0x2d, 0x5e, + 0xfe, 0x72, 0x00, 0x9d, 0x9c, 0x92, 0x85, 0xdd, 0x86, 0x16, 0xb9, 0xa3, 0x62, 0xca, 0x85, 0x2f, + 0x52, 0x9e, 0x71, 0x41, 0x42, 0x63, 0x85, 0x38, 0xbf, 0x99, 0xd0, 0x72, 0xef, 0xc8, 0x55, 0x1e, + 0xf4, 0xa9, 0x76, 0xa3, 0x4c, 0x7b, 0xa5, 0x58, 0xbb, 0x59, 0xa2, 0xdd, 0x7a, 0xaa, 0x1d, 0x7d, + 0x0a, 0x6d, 0x4e, 0x42, 0x1a, 0xa5, 0x77, 0xd3, 0xd0, 0xbf, 0x24, 0xba, 0xc4, 0x4d, 0xbc, 0x9e, + 0x81, 0xa7, 0x12, 0x43, 0x5f, 0x80, 0x95, 0x72, 0xc2, 0x54, 0x95, 0x5b, 0x87, 0xf6, 0xf0, 0x65, + 0x3f, 0x0d, 0x2f, 0x38, 0x61, 0x58, 0xb1, 0x10, 0x02, 0xcb, 0x67, 0x33, 0x6e, 0xd7, 0x07, 0xe6, + 0x6e, 0x13, 0x2b, 0x5b, 0x56, 0x87, 0x44, 0x37, 0x76, 0x43, 0x41, 0xd2, 0x94, 0xc8, 0xd5, 0x6d, + 0x60, 0x37, 0xd5, 0x71, 0xd2, 0x44, 0x0e, 0xac, 0x5f, 0xf9, 0x89, 0x7f, 0x49, 0x43, 0x2a, 0x28, + 0xe1, 0x36, 0x28, 0xf2, 0x33, 0x0c, 0x7d, 0x0d, 0x75, 0x16, 0xd2, 0x05, 0x15, 0xdc, 0x6e, 0x0d, + 0xcc, 0xdd, 0xd6, 0x61, 0xaf, 0x28, 0x19, 0xac, 0x28, 0x38, 0xa7, 0xa2, 0x3d, 0xf8, 0x5f, 0x14, + 0x4f, 0x23, 0x72, 0x3b, 0x4d, 0x18, 0xbd, 0xa1, 0x21, 0x99, 0x11, 0x6e, 0xaf, 0xab, 0x7a, 0x6e, + 0x44, 0xf1, 0x19, 0xb9, 0xf5, 0x96, 0x30, 0xfa, 0x1c, 0xba, 0x7e, 0x92, 0xf8, 0x6c, 0x11, 0xb3, + 0x69, 0xc2, 0xe2, 0x6b, 0x1a, 0x12, 0xbb, 0xad, 0x92, 0xdc, 0xc8, 0x71, 0x4f, 0xc3, 0xce, 0x18, + 0x2c, 0x29, 0x5b, 0x4a, 0x49, 0x57, 0x57, 0x9f, 0xd2, 0x40, 0x22, 0x33, 0x1a, 0xa8, 0x9b, 0x69, + 0x63, 0x69, 0xa2, 0xcf, 0x60, 0xc3, 0x0f, 0x02, 0x2a, 0x68, 0x1c, 0xf9, 0xe1, 0x74, 0x46, 0x03, + 0x6e, 0x9b, 0x03, 0x73, 0xb7, 0x8d, 0x3b, 0x2b, 0xf8, 0x2d, 0x0d, 0xb8, 0x33, 0x82, 0x9a, 0x4e, + 0x5f, 0xd6, 0x51, 0xdc, 0x27, 0x44, 0xcf, 0x17, 0x56, 0xb6, 0xc4, 0xe6, 0x3e, 0xd3, 0x91, 0x2d, + 0xac, 0x6c, 0x89, 0xf1, 0xf8, 0x5a, 0x5f, 0xb8, 0x85, 0x95, 0xed, 0x0c, 0x60, 0x5d, 0xf7, 0x51, + 0x69, 0x43, 0x9f, 0x02, 0x78, 0xe2, 0xbe, 0xb4, 0x7b, 0x65, 0x7b, 0xdd, 0xd2, 0x40, 0xcc, 0x33, + 0x11, 0xda, 0x91, 0x6d, 0x34, 0x27, 0x74, 0x36, 0xd7, 0xa7, 0xb5, 0x71, 0xe6, 0x39, 0x1b, 0xd0, + 0x76, 0x6f, 0x48, 0x24, 0x78, 0x3e, 0x1f, 0x3f, 0x1b, 0x50, 0x55, 0x48, 0xe9, 0x92, 0x38, 0xc8, + 0xe4, 0xc9, 0xf8, 0x9d, 0xc3, 0x4f, 0x8a, 0xee, 0x51, 0x05, 0x98, 0xdc, 0x27, 0x24, 0x53, 0x9f, + 0x65, 0x69, 0xae, 0xb2, 0xfc, 0x60, 0xa2, 0xac, 0x17, 0x13, 0xa5, 0xe7, 0x76, 0x39, 0xa6, 0xce, + 0x2f, 0x06, 0xb4, 0x33, 0x20, 0x2b, 0xcd, 0x7f, 0x58, 0x62, 0x34, 0xa2, 0xc2, 0x5b, 0x26, 0x92, + 0xbb, 0xe8, 0x1b, 0x68, 0x26, 0x2c, 0xbe, 0x22, 0x9c, 0x13, 0x99, 0x8a, 0x6c, 0xcf, 0xd7, 0x45, + 0xb2, 0x3c, 0x4d, 0xc2, 0x2b, 0xb6, 0x73, 0x0a, 0xf5, 0x0c, 0x2d, 0xb8, 0x8a, 0x7d, 0x39, 0xe9, + 0xbe, 0xc8, 0x4b, 0xb5, 0x55, 0x14, 0x53, 0x6b, 0xd2, 0x3c, 0x29, 0xda, 0xf3, 0x53, 0xbe, 0x14, + 0xbd, 0x01, 0x6d, 0x4c, 0x78, 0xba, 0xc8, 0x81, 0x3d, 0x17, 0x9a, 0xcb, 0xda, 0xa2, 0x06, 0x58, + 0xee, 0xbb, 0x93, 0x49, 0x77, 0x0d, 0xd5, 0xc1, 0x3c, 0x3f, 0xff, 0xa1, 0x6b, 0x20, 0x80, 0xda, + 0x31, 0x76, 0x8f, 0x26, 0x6e, 0xb7, 0x82, 0x9a, 0x50, 0x1d, 0x4f, 0x8e, 0xf0, 0xa4, 0x6b, 0xa2, + 0x0e, 0x80, 0xfb, 0xce, 0x3d, 0x9e, 0x1e, 0x8d, 0x46, 0xee, 0xa8, 0x6b, 0xed, 0x7d, 0x0b, 0x55, + 0x75, 0x2e, 0x6a, 0x41, 0x7d, 0x3c, 0x39, 0xf7, 0x3c, 0x77, 0xd4, 0x5d, 0x93, 0x0e, 0xbe, 0x38, + 0x3b, 0x3b, 0x39, 0x7b, 0xab, 0x23, 0x79, 0x47, 0x17, 0x63, 0x77, 0xd4, 0xad, 0xc8, 0x0f, 0x3a, + 0xea, 0xa8, 0x6b, 0x1e, 0xfe, 0x5a, 0x05, 0x6b, 0x3c, 0xa7, 0x0b, 0x74, 0x0e, 0x35, 0xbd, 0x7d, + 0xd1, 0x4e, 0x91, 0xb0, 0x67, 0x2f, 0x4e, 0xcf, 0xf9, 0x18, 0x25, 0xbb, 0xd0, 0x23, 0x95, 0x15, + 0x13, 0x68, 0x50, 0x52, 0xa8, 0xe5, 0x16, 0xef, 0x6d, 0x0e, 0xf5, 0x33, 0x38, 0xcc, 0x9f, 0xc1, + 0xa1, 0x2b, 0x9f, 0x41, 0x99, 0x93, 0x5e, 0xdd, 0xc5, 0x39, 0x3d, 0xdb, 0xfc, 0xc5, 0x39, 0x7d, + 0xb0, 0xf9, 0x4f, 0xc0, 0x92, 0xf3, 0x88, 0xb6, 0x0b, 0xdb, 0x7c, 0xb5, 0xf1, 0x7b, 0x83, 0x72, + 0x42, 0x16, 0xea, 0x3b, 0x30, 0x3d, 0x71, 0x8f, 0xfa, 0x85, 0x9d, 0xb5, 0x9c, 0xe8, 0x52, 0x69, + 0xdf, 0x43, 0x4d, 0x4f, 0x6a, 0xb1, 0xb4, 0x67, 0x53, 0xdc, 0xdb, 0x2a, 0xa5, 0x7c, 0x69, 0xa0, + 0xd3, 0xfc, 0xf6, 0x07, 0xe5, 0x0d, 0x99, 0xc5, 0xd9, 0xf9, 0x08, 0x63, 0x75, 0x6b, 0xaa, 0x67, + 0x8b, 0xa3, 0x3d, 0x6d, 0xe7, 0x52, 0x69, 0xc7, 0x50, 0xd3, 0x6d, 0x5e, 0x2c, 0xed, 0xd9, 0x08, + 0x94, 0x05, 0x79, 0xf3, 0xff, 0xf7, 0x0f, 0xfd, 0xb5, 0x3f, 0x1f, 0xfa, 0x6b, 0xff, 0x3c, 0xf4, + 0x8d, 0x9f, 0x1e, 0xfb, 0xc6, 0xfb, 0xc7, 0xbe, 0xf1, 0xfb, 0x63, 0xdf, 0xf8, 0xfb, 0xb1, 0x6f, + 0x5c, 0xd6, 0x14, 0xfb, 0xab, 0x7f, 0x03, 0x00, 0x00, 0xff, 0xff, 0xcb, 0x48, 0xee, 0xf6, 0x5f, + 0x09, 0x00, 0x00, } diff --git a/api/shim/shim.proto b/api/shim/shim.proto index 6fc4a15..e03fee3 100644 --- a/api/shim/shim.proto +++ b/api/shim/shim.proto @@ -105,7 +105,8 @@ message StateRequest { message StateResponse { string id = 1 [(gogoproto.customname) = "ID"]; string bundle = 2; - repeated Process processes = 3; + uint32 initPid = 3; + repeated Process processes = 4; } // TODO: share core runtime types between shim and execution rpcs diff --git a/cmd/containerd/main.go b/cmd/containerd/main.go index b4a83c0..31407e9 100644 --- a/cmd/containerd/main.go +++ b/cmd/containerd/main.go @@ -25,8 +25,10 @@ import ( "github.com/docker/containerd/supervisor" "github.com/docker/containerd/utils" metrics "github.com/docker/go-metrics" + "github.com/pkg/errors" "github.com/urfave/cli" + natsd "github.com/nats-io/gnatsd/server" "github.com/nats-io/go-nats" stand "github.com/nats-io/nats-streaming-server/server" ) @@ -41,6 +43,11 @@ const usage = ` high performance container runtime ` +const ( + StanClusterID = "containerd" + stanClientID = "containerd" +) + func main() { app := cli.NewApp() app.Name = "containerd" @@ -126,14 +133,12 @@ func main() { } // Get events publisher - nec, err := getNATSPublisher(ea) + natsPoster, err := events.NewNATSPoster(StanClusterID, stanClientID) if err != nil { return err } - defer nec.Close() - execCtx := log.WithModule(ctx, "execution") - execCtx = events.WithPoster(execCtx, events.GetNATSPoster(nec)) + execCtx = events.WithPoster(execCtx, natsPoster) execService, err := supervisor.New(execCtx, context.GlobalString("root")) if err != nil { return err @@ -145,7 +150,7 @@ func main() { switch info.Server.(type) { case api.ExecutionServiceServer: ctx = log.WithModule(ctx, "execution") - ctx = events.WithPoster(ctx, events.GetNATSPoster(nec)) + ctx = events.WithPoster(ctx, natsPoster) default: fmt.Printf("Unknown type: %#v\n", info.Server) } @@ -211,25 +216,10 @@ func dumpStacks(ctx gocontext.Context) { log.G(ctx).Infof("=== BEGIN goroutine stack dump ===\n%s\n=== END goroutine stack dump ===", buf) } -func startNATSServer(eventsAddress string) (e *stand.StanServer, err error) { - eventsURL, err := url.Parse(eventsAddress) - if err != nil { - return nil, err - } - - no := stand.DefaultNatsServerOptions - nOpts := &no - nOpts.NoSigs = true - parts := strings.Split(eventsURL.Host, ":") - nOpts.Host = parts[0] - if len(parts) == 2 { - nOpts.Port, err = strconv.Atoi(parts[1]) - } else { - nOpts.Port = nats.DefaultPort - } +func startNATSServer(address string) (s *stand.StanServer, err error) { defer func() { if r := recover(); r != nil { - e = nil + s = nil if _, ok := r.(error); !ok { err = fmt.Errorf("failed to start NATS server: %v", r) } else { @@ -237,21 +227,32 @@ func startNATSServer(eventsAddress string) (e *stand.StanServer, err error) { } } }() - s := stand.RunServerWithOpts(nil, nOpts) - - return s, nil -} - -func getNATSPublisher(eventsAddress string) (*nats.EncodedConn, error) { - nc, err := nats.Connect(eventsAddress) + so, no, err := getServerOptions(address) if err != nil { return nil, err } - nec, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER) + s = stand.RunServerWithOpts(so, no) + + return s, err +} + +func getServerOptions(address string) (*stand.Options, *natsd.Options, error) { + url, err := url.Parse(address) if err != nil { - nc.Close() - return nil, err + return nil, nil, errors.Wrapf(err, "failed to parse address url %q", address) } - return nec, nil + no := stand.DefaultNatsServerOptions + parts := strings.Split(url.Host, ":") + if len(parts) == 2 { + no.Port, err = strconv.Atoi(parts[1]) + } else { + no.Port = nats.DefaultPort + } + no.Host = parts[0] + + so := stand.GetDefaultOptions() + so.ID = StanClusterID + + return so, &no, nil } diff --git a/cmd/ctr/run.go b/cmd/ctr/run.go index fbd9ed3..b23015f 100644 --- a/cmd/ctr/run.go +++ b/cmd/ctr/run.go @@ -1,6 +1,7 @@ package main import ( + "encoding/json" "fmt" "os" "path/filepath" @@ -8,10 +9,12 @@ import ( gocontext "context" + "github.com/crosbymichael/console" "github.com/docker/containerd/api/execution" execEvents "github.com/docker/containerd/execution" - "github.com/docker/docker/pkg/term" "github.com/nats-io/go-nats" + "github.com/nats-io/go-nats-streaming" + "github.com/pkg/errors" "github.com/urfave/cli" ) @@ -39,20 +42,23 @@ var runCommand = cli.Command{ } // setup our event subscriber - nc, err := nats.Connect(nats.DefaultURL) + sc, err := stan.Connect("containerd", "ctr", stan.ConnectWait(5*time.Second)) if err != nil { return err } - nec, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER) - if err != nil { - nc.Close() - return err - } - defer nec.Close() + defer sc.Close() evCh := make(chan *execEvents.ContainerEvent, 64) - sub, err := nec.Subscribe(execEvents.ContainersEventsSubjectSubscriber, func(e *execEvents.ContainerEvent) { - evCh <- e + sub, err := sc.Subscribe(fmt.Sprintf("containers.%s", id), func(m *stan.Msg) { + var e execEvents.ContainerEvent + + err := json.Unmarshal(m.Data, &e) + if err != nil { + fmt.Printf("failed to unmarshal event: %v", err) + return + } + + evCh <- &e }) if err != nil { return err @@ -78,19 +84,12 @@ var runCommand = cli.Command{ Stderr: filepath.Join(tmpDir, "stderr"), } - var oldState *term.State - restoreTerm := func() { - if oldState != nil { - term.RestoreTerminal(os.Stdin.Fd(), oldState) - } - } - if crOpts.Console { - oldState, err = term.SetRawTerminal(os.Stdin.Fd()) - if err != nil { + con := console.Current() + defer con.Reset() + if err := con.SetRaw(); err != nil { return err } - defer restoreTerm() } fwg, err := prepareStdio(crOpts.Stdin, crOpts.Stdout, crOpts.Stderr, crOpts.Console) @@ -100,13 +99,13 @@ var runCommand = cli.Command{ cr, err := executionService.CreateContainer(gocontext.Background(), crOpts) if err != nil { - return err + return errors.Wrap(err, "CreateContainer RPC failed") } if _, err := executionService.StartContainer(gocontext.Background(), &execution.StartContainerRequest{ ID: cr.Container.ID, }); err != nil { - return err + return errors.Wrap(err, "StartContainer RPC failed") } var ec uint32 @@ -123,7 +122,7 @@ var runCommand = cli.Command{ break eventLoop } case <-time.After(1 * time.Second): - if nec.Conn.Status() != nats.CONNECTED { + if sc.NatsConn().Status() != nats.CONNECTED { break eventLoop } } @@ -132,14 +131,15 @@ var runCommand = cli.Command{ if _, err := executionService.DeleteContainer(gocontext.Background(), &execution.DeleteContainerRequest{ ID: cr.Container.ID, }); err != nil { - return err + return errors.Wrap(err, "DeleteContainer RPC failed") } // Ensure we read all io fwg.Wait() - restoreTerm() - os.Exit(int(ec)) + if ec != 0 { + return cli.NewExitError("", int(ec)) + } return nil }, diff --git a/cmd/ctr/utils.go b/cmd/ctr/utils.go index 398033c..b2ded47 100644 --- a/cmd/ctr/utils.go +++ b/cmd/ctr/utils.go @@ -14,8 +14,8 @@ import ( gocontext "context" - "github.com/Sirupsen/logrus" "github.com/docker/containerd/api/execution" + "github.com/pkg/errors" "github.com/tonistiigi/fifo" "github.com/urfave/cli" "google.golang.org/grpc" @@ -39,7 +39,6 @@ func prepareStdio(stdin, stdout, stderr string, console bool) (*sync.WaitGroup, }(f) go func(w io.WriteCloser) { io.Copy(w, os.Stdin) - logrus.Info("stdin copy finished") w.Close() }(f) @@ -56,7 +55,6 @@ func prepareStdio(stdin, stdout, stderr string, console bool) (*sync.WaitGroup, go func(r io.ReadCloser) { io.Copy(os.Stdout, r) r.Close() - logrus.Info("stdout copy finished") wg.Done() }(f) @@ -74,7 +72,6 @@ func prepareStdio(stdin, stdout, stderr string, console bool) (*sync.WaitGroup, go func(r io.ReadCloser) { io.Copy(os.Stderr, r) r.Close() - logrus.Info("stderr copy finished") wg.Done() }(f) } @@ -99,7 +96,7 @@ func getGRPCConnection(context *cli.Context) (*grpc.ClientConn, error) { conn, err := grpc.Dial(fmt.Sprintf("unix://%s", bindSocket), dialOpts...) if err != nil { - return nil, err + return nil, errors.Wrapf(err, "failed to dial %q", bindSocket) } grpcConn = conn diff --git a/events/nats.go b/events/nats.go index f833567..d05a497 100644 --- a/events/nats.go +++ b/events/nats.go @@ -2,30 +2,48 @@ package events import ( "context" - "strings" + "encoding/json" + "time" + "github.com/Sirupsen/logrus" "github.com/docker/containerd/log" - nats "github.com/nats-io/go-nats" + "github.com/nats-io/go-nats-streaming" + "github.com/pkg/errors" ) type natsPoster struct { - nec *nats.EncodedConn + sc stan.Conn } -func GetNATSPoster(nec *nats.EncodedConn) Poster { - return &natsPoster{nec} +func NewNATSPoster(clusterID, clientID string) (Poster, error) { + sc, err := stan.Connect(clusterID, clientID, stan.ConnectWait(5*time.Second)) + if err != nil { + return nil, errors.Wrap(err, "failed to connect to nats streaming server") + } + return &natsPoster{sc}, nil + } func (p *natsPoster) Post(ctx context.Context, e Event) { - subject := strings.Replace(log.GetModulePath(ctx), "/", ".", -1) topic := getTopic(ctx) - if topic != "" { - subject = strings.Join([]string{subject, topic}, ".") + if topic == "" { + log.G(ctx).WithField("event", e).Warn("unable to post event, topic is empty") + return } - if subject == "" { - log.GetLogger(ctx).WithField("event", e).Warn("unable to post event, subject is empty") + data, err := json.Marshal(e) + if err != nil { + log.G(ctx).WithError(err).WithFields(logrus.Fields{"event": e, "topic": topic}). + Warn("unable to marshal event") + return } - p.nec.Publish(subject, e) + err = p.sc.Publish(topic, data) + if err != nil { + log.G(ctx).WithError(err).WithFields(logrus.Fields{"event": e, "topic": topic}). + Warn("unable to post event") + } + + log.G(ctx).WithFields(logrus.Fields{"event": e, "topic": topic}). + Debug("Posted event") } diff --git a/shim/service.go b/shim/service.go index 5610ca5..69dff11 100644 --- a/shim/service.go +++ b/shim/service.go @@ -1,7 +1,6 @@ package shim import ( - "fmt" "sync" "syscall" @@ -9,6 +8,7 @@ import ( apishim "github.com/docker/containerd/api/shim" "github.com/docker/containerd/utils" google_protobuf "github.com/golang/protobuf/ptypes/empty" + "github.com/pkg/errors" "golang.org/x/net/context" ) @@ -25,6 +25,7 @@ func New() *Service { type Service struct { initProcess *initProcess id string + bundle string mu sync.Mutex processes map[int]process events chan *apishim.Event @@ -37,10 +38,11 @@ func (s *Service) Create(ctx context.Context, r *apishim.CreateRequest) (*apishi return nil, err } s.mu.Lock() + s.id = r.ID + s.bundle = r.Bundle s.initProcess = process pid := process.Pid() s.processes[pid] = process - s.id = r.ID s.mu.Unlock() s.events <- &apishim.Event{ Type: apishim.EventType_CREATE, @@ -69,7 +71,7 @@ func (s *Service) Delete(ctx context.Context, r *apishim.DeleteRequest) (*apishi p, ok := s.processes[int(r.Pid)] s.mu.Unlock() if !ok { - return nil, fmt.Errorf("process does not exist %d", r.Pid) + return nil, errors.Errorf("process does not exist %d", r.Pid) } if err := p.Delete(ctx); err != nil { return nil, err @@ -104,7 +106,7 @@ func (s *Service) Exec(ctx context.Context, r *apishim.ExecRequest) (*apishim.Ex func (s *Service) Pty(ctx context.Context, r *apishim.PtyRequest) (*google_protobuf.Empty, error) { if r.Pid == 0 { - return nil, fmt.Errorf("pid not provided in request") + return nil, errors.Errorf("pid not provided in request") } ws := console.WinSize{ Width: uint16(r.Width), @@ -114,7 +116,7 @@ func (s *Service) Pty(ctx context.Context, r *apishim.PtyRequest) (*google_proto p, ok := s.processes[int(r.Pid)] s.mu.Unlock() if !ok { - return nil, fmt.Errorf("process does not exist %d", r.Pid) + return nil, errors.Errorf("process does not exist %d", r.Pid) } if err := p.Resize(ws); err != nil { return nil, err @@ -134,6 +136,8 @@ func (s *Service) Events(r *apishim.EventsRequest, stream apishim.Shim_EventsSer func (s *Service) State(ctx context.Context, r *apishim.StateRequest) (*apishim.StateResponse, error) { o := &apishim.StateResponse{ ID: s.id, + Bundle: s.bundle, + InitPid: uint32(s.initProcess.Pid()), Processes: []*apishim.Process{}, } s.mu.Lock() diff --git a/supervisor/service.go b/supervisor/service.go index f3693ed..e6c5667 100644 --- a/supervisor/service.go +++ b/supervisor/service.go @@ -3,12 +3,19 @@ package supervisor import ( "fmt" "io/ioutil" + "os" "path/filepath" + "strings" "sync" + "time" api "github.com/docker/containerd/api/execution" "github.com/docker/containerd/api/shim" + "github.com/docker/containerd/events" + "github.com/docker/containerd/execution" + "github.com/docker/containerd/log" google_protobuf "github.com/golang/protobuf/ptypes/empty" + "github.com/pkg/errors" "golang.org/x/net/context" ) @@ -19,16 +26,22 @@ var ( // New creates a new GRPC services for execution func New(ctx context.Context, root string) (*Service, error) { - clients, err := loadClients(root) + ctx = log.WithModule(ctx, "supervisor") + log.G(ctx).WithField("root", root).Debugf("New()") + if err := os.MkdirAll(root, 0700); err != nil { + return nil, errors.Wrapf(err, "unable to create root directory %q", root) + } + clients, err := loadClients(ctx, root) if err != nil { return nil, err } s := &Service{ root: root, shims: clients, + ctx: ctx, } for _, c := range clients { - if err := s.monitor(c); err != nil { + if err := s.monitor(events.GetPoster(ctx), c); err != nil { return nil, err } } @@ -38,24 +51,23 @@ func New(ctx context.Context, root string) (*Service, error) { type Service struct { mu sync.Mutex + ctx context.Context root string - shims map[string]shim.ShimClient + shims map[string]*shimClient } func (s *Service) CreateContainer(ctx context.Context, r *api.CreateContainerRequest) (*api.CreateContainerResponse, error) { - s.mu.Lock() - if _, ok := s.shims[r.ID]; ok { - s.mu.Unlock() - return nil, fmt.Errorf("container already exists %q", r.ID) - } - client, err := newShimClient(filepath.Join(s.root, r.ID)) + client, err := s.newShim(r.ID) if err != nil { - s.mu.Unlock() return nil, err } - s.shims[r.ID] = client - s.mu.Unlock() - if err := s.monitor(client); err != nil { + defer func() { + if err != nil { + s.removeShim(r.ID) + } + }() + + if err := s.monitor(events.GetPoster(ctx), client); err != nil { return nil, err } createResponse, err := client.Create(ctx, &shim.CreateRequest{ @@ -67,8 +79,9 @@ func (s *Service) CreateContainer(ctx context.Context, r *api.CreateContainerReq Stderr: r.Stderr, }) if err != nil { - return nil, err + return nil, errors.Wrapf(err, "shim create request failed") } + client.initPid = createResponse.Pid return &api.CreateContainerResponse{ Container: &api.Container{ ID: r.ID, @@ -96,11 +109,12 @@ func (s *Service) DeleteContainer(ctx context.Context, r *api.DeleteContainerReq return nil, err } _, err = client.Delete(ctx, &shim.DeleteRequest{ - Pid: r.Pid, + Pid: client.initPid, }) if err != nil { return nil, err } + s.removeShim(r.ID) return empty, nil } @@ -180,13 +194,65 @@ func (s *Service) ListProcesses(ctx context.Context, r *api.ListProcessesRequest // monitor monitors the shim's event rpc and forwards container and process // events to callers -func (s *Service) monitor(client shim.ShimClient) error { +func (s *Service) monitor(poster events.Poster, client *shimClient) error { + // we use the service context here because we don't want to be + // tied to the Create rpc call + stream, err := client.Events(s.ctx, &shim.EventsRequest{}) + if err != nil { + return errors.Wrapf(err, "failed to get events stream for client at %q", client.root) + } + + go func() { + for { + e, err := stream.Recv() + if err != nil { + if err.Error() == "EOF" || strings.Contains(err.Error(), "transport is closing") { + break + } + log.G(s.ctx).WithError(err).WithField("container", client.id). + Warnf("event stream for client at %q got terminated", client.root) + break + } + + var topic string + if e.Type == shim.EventType_CREATE { + topic = "containers" + } else { + topic = fmt.Sprintf("containers.%s", e.ID) + } + + ctx := events.WithTopic(s.ctx, topic) + poster.Post(ctx, execution.ContainerEvent{ + Timestamp: time.Now(), + ID: e.ID, + Type: toExecutionEventType(e.Type), + Pid: e.Pid, + ExitStatus: e.ExitStatus, + }) + } + }() return nil } -func (s *Service) getShim(id string) (shim.ShimClient, error) { +func (s *Service) newShim(id string) (*shimClient, error) { s.mu.Lock() defer s.mu.Unlock() + + if _, ok := s.shims[id]; ok { + return nil, errors.Errorf("container %q already exists", id) + } + client, err := newShimClient(filepath.Join(s.root, id), id) + if err != nil { + return nil, err + } + s.shims[id] = client + return client, nil +} + +func (s *Service) getShim(id string) (*shimClient, error) { + s.mu.Lock() + defer s.mu.Unlock() + client, ok := s.shims[id] if !ok { return nil, fmt.Errorf("container does not exist %q", id) @@ -194,22 +260,40 @@ func (s *Service) getShim(id string) (shim.ShimClient, error) { return client, nil } -func loadClients(root string) (map[string]shim.ShimClient, error) { +func (s *Service) removeShim(id string) { + s.mu.Lock() + defer s.mu.Unlock() + + client, ok := s.shims[id] + if ok { + client.stop() + delete(s.shims, id) + } +} + +func loadClients(ctx context.Context, root string) (map[string]*shimClient, error) { files, err := ioutil.ReadDir(root) if err != nil { return nil, err } - out := make(map[string]shim.ShimClient) + out := make(map[string]*shimClient) for _, f := range files { if !f.IsDir() { continue } - socket := filepath.Join(root, f.Name(), "shim.sock") - client, err := connectToShim(socket) + // + id := f.Name() + client, err := loadShimClient(filepath.Join(root, id), id) if err != nil { - return nil, err + log.G(ctx).WithError(err).WithField("id", id).Warn("failed to load container") + // TODO: send an exit event with 255 as exit status + continue } out[f.Name()] = client } return out, nil } + +func toExecutionEventType(et shim.EventType) string { + return strings.Replace(strings.ToLower(et.String()), "_", "-", -1) +} diff --git a/supervisor/shim.go b/supervisor/shim.go index 7a0c632..922c7e5 100644 --- a/supervisor/shim.go +++ b/supervisor/shim.go @@ -1,29 +1,106 @@ package supervisor import ( + "context" "fmt" "io/ioutil" "log" "net" + "os" "os/exec" "path/filepath" + "syscall" "time" "github.com/docker/containerd/api/shim" + "github.com/pkg/errors" "google.golang.org/grpc" "google.golang.org/grpc/grpclog" ) -func newShimClient(root string) (shim.ShimClient, error) { - // TODO: start the shim process - cmd := exec.Command("containerd-shim") - if err := cmd.Start(); err != nil { - return nil, err +func newShimClient(root, id string) (*shimClient, error) { + if err := os.Mkdir(root, 0700); err != nil { + return nil, errors.Wrap(err, "failed to create shim working dir") } + + cmd := exec.Command("containerd-shim") cmd.Dir = root + cmd.SysProcAttr = &syscall.SysProcAttr{ + Setpgid: true, + } + if err := cmd.Start(); err != nil { + return nil, errors.Wrapf(err, "failed to start shim") + } socket := filepath.Join(root, "shim.sock") - return connectToShim(socket) + sc, err := connectToShim(socket) + if err != nil { + syscall.Kill(cmd.Process.Pid, syscall.SIGKILL) + cmd.Wait() + return nil, err + } + + s := &shimClient{ + ShimClient: sc, + shimCmd: cmd, + syncCh: make(chan struct{}), + root: root, + id: id, + } + go func() { + cmd.Wait() + close(s.syncCh) + }() + + return s, nil +} + +func loadShimClient(root, id string) (*shimClient, error) { + socket := filepath.Join(root, "shim.sock") + client, err := connectToShim(socket) + if err != nil { + // TODO: failed to connect to the shim, check if it's alive + // - if it is kill it + // - in both case call runc killall and runc delete on the id + return nil, err + } + + resp, err := client.State(context.Background(), &shim.StateRequest{}) + if err != nil { + return nil, errors.Wrapf(err, "failed to fetch state for container %s", id) + } + + return &shimClient{ + ShimClient: client, + root: root, + id: id, + initPid: resp.InitPid, + }, nil +} + +type shimClient struct { + shim.ShimClient + shimCmd *exec.Cmd + syncCh chan struct{} + root string + id string + initPid uint32 +} + +func (s *shimClient) stop() { + if s.shimCmd != nil { + select { + case <-s.syncCh: + default: + syscall.Kill(s.shimCmd.Process.Pid, syscall.SIGTERM) + select { + case <-s.syncCh: + case <-time.After(10 * time.Second): + syscall.Kill(s.shimCmd.Process.Pid, syscall.SIGKILL) + } + } + } + os.RemoveAll(s.root) } func connectToShim(socket string) (shim.ShimClient, error) { @@ -33,12 +110,13 @@ func connectToShim(socket string) (shim.ShimClient, error) { dialOpts = append(dialOpts, grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) { return net.DialTimeout("unix", socket, timeout) - }, - )) - // FIXME: probably need a retry here + }), + grpc.WithBlock(), + grpc.WithTimeout(2*time.Second), + ) conn, err := grpc.Dial(fmt.Sprintf("unix://%s", socket), dialOpts...) if err != nil { - return nil, err + return nil, errors.Wrapf(err, "failed to connect to shim via \"%s\"", fmt.Sprintf("unix://%s", socket)) } return shim.NewShimClient(conn), nil } diff --git a/vendor/github.com/nats-io/go-nats-streaming/stan.go b/vendor/github.com/nats-io/go-nats-streaming/stan.go new file mode 100644 index 0000000..248f5ee --- /dev/null +++ b/vendor/github.com/nats-io/go-nats-streaming/stan.go @@ -0,0 +1,476 @@ +// Copyright 2016 Apcera Inc. All rights reserved. + +// Package stan is a Go client for the NATS Streaming messaging system (https://nats.io). +package stan + +import ( + "errors" + "fmt" + "runtime" + "sync" + "time" + + "github.com/nats-io/go-nats" + "github.com/nats-io/go-nats-streaming/pb" + "github.com/nats-io/nuid" +) + +// Version is the NATS Streaming Go Client version +const Version = "0.3.4" + +const ( + // DefaultNatsURL is the default URL the client connects to + DefaultNatsURL = "nats://localhost:4222" + // DefaultConnectWait is the default timeout used for the connect operation + DefaultConnectWait = 2 * time.Second + // DefaultDiscoverPrefix is the prefix subject used to connect to the NATS Streaming server + DefaultDiscoverPrefix = "_STAN.discover" + // DefaultACKPrefix is the prefix subject used to send ACKs to the NATS Streaming server + DefaultACKPrefix = "_STAN.acks" + // DefaultMaxPubAcksInflight is the default maximum number of published messages + // without outstanding ACKs from the server + DefaultMaxPubAcksInflight = 16384 +) + +// Conn represents a connection to the NATS Streaming subsystem. It can Publish and +// Subscribe to messages within the NATS Streaming cluster. +type Conn interface { + // Publish + Publish(subject string, data []byte) error + PublishAsync(subject string, data []byte, ah AckHandler) (string, error) + + // Subscribe + Subscribe(subject string, cb MsgHandler, opts ...SubscriptionOption) (Subscription, error) + + // QueueSubscribe + QueueSubscribe(subject, qgroup string, cb MsgHandler, opts ...SubscriptionOption) (Subscription, error) + + // Close + Close() error + + // NatsConn returns the underlying NATS conn. Use this with care. For + // example, closing the wrapped NATS conn will put the NATS Streaming Conn + // in an invalid state. + NatsConn() *nats.Conn +} + +// Errors +var ( + ErrConnectReqTimeout = errors.New("stan: connect request timeout") + ErrCloseReqTimeout = errors.New("stan: close request timeout") + ErrSubReqTimeout = errors.New("stan: subscribe request timeout") + ErrUnsubReqTimeout = errors.New("stan: unsubscribe request timeout") + ErrConnectionClosed = errors.New("stan: connection closed") + ErrTimeout = errors.New("stan: publish ack timeout") + ErrBadAck = errors.New("stan: malformed ack") + ErrBadSubscription = errors.New("stan: invalid subscription") + ErrBadConnection = errors.New("stan: invalid connection") + ErrManualAck = errors.New("stan: cannot manually ack in auto-ack mode") + ErrNilMsg = errors.New("stan: nil message") + ErrNoServerSupport = errors.New("stan: not supported by server") +) + +// AckHandler is used for Async Publishing to provide status of the ack. +// The func will be passed teh GUID and any error state. No error means the +// message was successfully received by NATS Streaming. +type AckHandler func(string, error) + +// Options can be used to a create a customized connection. +type Options struct { + NatsURL string + NatsConn *nats.Conn + ConnectTimeout time.Duration + AckTimeout time.Duration + DiscoverPrefix string + MaxPubAcksInflight int +} + +// DefaultOptions are the NATS Streaming client's default options +var DefaultOptions = Options{ + NatsURL: DefaultNatsURL, + ConnectTimeout: DefaultConnectWait, + AckTimeout: DefaultAckWait, + DiscoverPrefix: DefaultDiscoverPrefix, + MaxPubAcksInflight: DefaultMaxPubAcksInflight, +} + +// Option is a function on the options for a connection. +type Option func(*Options) error + +// NatsURL is an Option to set the URL the client should connect to. +func NatsURL(u string) Option { + return func(o *Options) error { + o.NatsURL = u + return nil + } +} + +// ConnectWait is an Option to set the timeout for establishing a connection. +func ConnectWait(t time.Duration) Option { + return func(o *Options) error { + o.ConnectTimeout = t + return nil + } +} + +// PubAckWait is an Option to set the timeout for waiting for an ACK for a +// published message. +func PubAckWait(t time.Duration) Option { + return func(o *Options) error { + o.AckTimeout = t + return nil + } +} + +// MaxPubAcksInflight is an Option to set the maximum number of published +// messages without outstanding ACKs from the server. +func MaxPubAcksInflight(max int) Option { + return func(o *Options) error { + o.MaxPubAcksInflight = max + return nil + } +} + +// NatsConn is an Option to set the underlying NATS connection to be used +// by a NATS Streaming Conn object. +func NatsConn(nc *nats.Conn) Option { + return func(o *Options) error { + o.NatsConn = nc + return nil + } +} + +// A conn represents a bare connection to a stan cluster. +type conn struct { + sync.RWMutex + clientID string + serverID string + pubPrefix string // Publish prefix set by stan, append our subject. + subRequests string // Subject to send subscription requests. + unsubRequests string // Subject to send unsubscribe requests. + subCloseRequests string // Subject to send subscription close requests. + closeRequests string // Subject to send close requests. + ackSubject string // publish acks + ackSubscription *nats.Subscription + hbSubscription *nats.Subscription + subMap map[string]*subscription + pubAckMap map[string]*ack + pubAckChan chan (struct{}) + opts Options + nc *nats.Conn + ncOwned bool // NATS Streaming created the connection, so needs to close it. +} + +// Closure for ack contexts. +type ack struct { + t *time.Timer + ah AckHandler + ch chan error +} + +// Connect will form a connection to the NATS Streaming subsystem. +func Connect(stanClusterID, clientID string, options ...Option) (Conn, error) { + // Process Options + c := conn{clientID: clientID, opts: DefaultOptions} + for _, opt := range options { + if err := opt(&c.opts); err != nil { + return nil, err + } + } + // Check if the user has provided a connection as an option + c.nc = c.opts.NatsConn + // Create a NATS connection if it doesn't exist. + if c.nc == nil { + nc, err := nats.Connect(c.opts.NatsURL) + if err != nil { + return nil, err + } + c.nc = nc + c.ncOwned = true + } else if !c.nc.IsConnected() { + // Bail if the custom NATS connection is disconnected + return nil, ErrBadConnection + } + + // Create a heartbeat inbox + hbInbox := nats.NewInbox() + var err error + if c.hbSubscription, err = c.nc.Subscribe(hbInbox, c.processHeartBeat); err != nil { + c.Close() + return nil, err + } + + // Send Request to discover the cluster + discoverSubject := c.opts.DiscoverPrefix + "." + stanClusterID + req := &pb.ConnectRequest{ClientID: clientID, HeartbeatInbox: hbInbox} + b, _ := req.Marshal() + reply, err := c.nc.Request(discoverSubject, b, c.opts.ConnectTimeout) + if err != nil { + c.Close() + if err == nats.ErrTimeout { + return nil, ErrConnectReqTimeout + } + return nil, err + } + // Process the response, grab server pubPrefix + cr := &pb.ConnectResponse{} + err = cr.Unmarshal(reply.Data) + if err != nil { + c.Close() + return nil, err + } + if cr.Error != "" { + c.Close() + return nil, errors.New(cr.Error) + } + + // Capture cluster configuration endpoints to publish and subscribe/unsubscribe. + c.pubPrefix = cr.PubPrefix + c.subRequests = cr.SubRequests + c.unsubRequests = cr.UnsubRequests + c.subCloseRequests = cr.SubCloseRequests + c.closeRequests = cr.CloseRequests + + // Setup the ACK subscription + c.ackSubject = DefaultACKPrefix + "." + nuid.Next() + if c.ackSubscription, err = c.nc.Subscribe(c.ackSubject, c.processAck); err != nil { + c.Close() + return nil, err + } + c.ackSubscription.SetPendingLimits(1024*1024, 32*1024*1024) + c.pubAckMap = make(map[string]*ack) + + // Create Subscription map + c.subMap = make(map[string]*subscription) + + c.pubAckChan = make(chan struct{}, c.opts.MaxPubAcksInflight) + + // Attach a finalizer + runtime.SetFinalizer(&c, func(sc *conn) { sc.Close() }) + + return &c, nil +} + +// Close a connection to the stan system. +func (sc *conn) Close() error { + if sc == nil { + return ErrBadConnection + } + + sc.Lock() + defer sc.Unlock() + + if sc.nc == nil { + // We are already closed. + return nil + } + + // Capture for NATS calls below. + nc := sc.nc + if sc.ncOwned { + defer nc.Close() + } + + // Signals we are closed. + sc.nc = nil + + // Now close ourselves. + if sc.ackSubscription != nil { + sc.ackSubscription.Unsubscribe() + } + + req := &pb.CloseRequest{ClientID: sc.clientID} + b, _ := req.Marshal() + reply, err := nc.Request(sc.closeRequests, b, sc.opts.ConnectTimeout) + if err != nil { + if err == nats.ErrTimeout { + return ErrCloseReqTimeout + } + return err + } + cr := &pb.CloseResponse{} + err = cr.Unmarshal(reply.Data) + if err != nil { + return err + } + if cr.Error != "" { + return errors.New(cr.Error) + } + return nil +} + +// NatsConn returns the underlying NATS conn. Use this with care. For example, +// closing the wrapped NATS conn will put the NATS Streaming Conn in an invalid +// state. +func (sc *conn) NatsConn() *nats.Conn { + return sc.nc +} + +// Process a heartbeat from the NATS Streaming cluster +func (sc *conn) processHeartBeat(m *nats.Msg) { + // No payload assumed, just reply. + sc.RLock() + nc := sc.nc + sc.RUnlock() + if nc != nil { + nc.Publish(m.Reply, nil) + } +} + +// Process an ack from the NATS Streaming cluster +func (sc *conn) processAck(m *nats.Msg) { + pa := &pb.PubAck{} + err := pa.Unmarshal(m.Data) + if err != nil { + // FIXME, make closure to have context? + fmt.Printf("Error processing unmarshal\n") + return + } + + // Remove + a := sc.removeAck(pa.Guid) + if a != nil { + // Capture error if it exists. + if pa.Error != "" { + err = errors.New(pa.Error) + } + if a.ah != nil { + // Perform the ackHandler callback + a.ah(pa.Guid, err) + } else if a.ch != nil { + // Send to channel directly + a.ch <- err + } + } +} + +// Publish will publish to the cluster and wait for an ACK. +func (sc *conn) Publish(subject string, data []byte) error { + ch := make(chan error) + _, err := sc.publishAsync(subject, data, nil, ch) + if err == nil { + err = <-ch + } + return err +} + +// PublishAsync will publish to the cluster on pubPrefix+subject and asynchronously +// process the ACK or error state. It will return the GUID for the message being sent. +func (sc *conn) PublishAsync(subject string, data []byte, ah AckHandler) (string, error) { + return sc.publishAsync(subject, data, ah, nil) +} + +func (sc *conn) publishAsync(subject string, data []byte, ah AckHandler, ch chan error) (string, error) { + a := &ack{ah: ah, ch: ch} + sc.Lock() + if sc.nc == nil { + sc.Unlock() + return "", ErrConnectionClosed + } + + subj := sc.pubPrefix + "." + subject + // This is only what we need from PubMsg in the timer below, + // so do this so that pe doesn't escape (and we same on new object) + peGUID := nuid.Next() + pe := &pb.PubMsg{ClientID: sc.clientID, Guid: peGUID, Subject: subject, Data: data} + b, _ := pe.Marshal() + + // Map ack to guid. + sc.pubAckMap[peGUID] = a + // snapshot + ackSubject := sc.ackSubject + ackTimeout := sc.opts.AckTimeout + pac := sc.pubAckChan + sc.Unlock() + + // Use the buffered channel to control the number of outstanding acks. + pac <- struct{}{} + + err := sc.nc.PublishRequest(subj, ackSubject, b) + if err != nil { + sc.removeAck(peGUID) + return "", err + } + + // Setup the timer for expiration. + sc.Lock() + a.t = time.AfterFunc(ackTimeout, func() { + sc.removeAck(peGUID) + if a.ah != nil { + ah(peGUID, ErrTimeout) + } else if a.ch != nil { + a.ch <- ErrTimeout + } + }) + sc.Unlock() + + return peGUID, nil +} + +// removeAck removes the ack from the pubAckMap and cancels any state, e.g. timers +func (sc *conn) removeAck(guid string) *ack { + var t *time.Timer + sc.Lock() + a := sc.pubAckMap[guid] + if a != nil { + t = a.t + delete(sc.pubAckMap, guid) + } + pac := sc.pubAckChan + sc.Unlock() + + // Cancel timer if needed. + if t != nil { + t.Stop() + } + + // Remove from channel to unblock PublishAsync + if a != nil && len(pac) > 0 { + <-pac + } + return a +} + +// Process an msg from the NATS Streaming cluster +func (sc *conn) processMsg(raw *nats.Msg) { + msg := &Msg{} + err := msg.Unmarshal(raw.Data) + if err != nil { + panic("Error processing unmarshal for msg") + } + // Lookup the subscription + sc.RLock() + nc := sc.nc + isClosed := nc == nil + sub := sc.subMap[raw.Subject] + sc.RUnlock() + + // Check if sub is no longer valid or connection has been closed. + if sub == nil || isClosed { + return + } + + // Store in msg for backlink + msg.Sub = sub + + sub.RLock() + cb := sub.cb + ackSubject := sub.ackInbox + isManualAck := sub.opts.ManualAcks + subsc := sub.sc // Can be nil if sub has been unsubscribed. + sub.RUnlock() + + // Perform the callback + if cb != nil && subsc != nil { + cb(msg) + } + + // Proces auto-ack + if !isManualAck && nc != nil { + ack := &pb.Ack{Subject: msg.Subject, Sequence: msg.Sequence} + b, _ := ack.Marshal() + if err := nc.Publish(ackSubject, b); err != nil { + // FIXME(dlc) - Async error handler? Retry? + } + } +} diff --git a/vendor/github.com/nats-io/go-nats-streaming/sub.go b/vendor/github.com/nats-io/go-nats-streaming/sub.go new file mode 100644 index 0000000..b545ac7 --- /dev/null +++ b/vendor/github.com/nats-io/go-nats-streaming/sub.go @@ -0,0 +1,376 @@ +// Copyright 2016 Apcera Inc. All rights reserved. + +// Package stan is a Go client for the NATS Streaming messaging system (https://nats.io). +package stan + +import ( + "errors" + "sync" + "time" + + "github.com/nats-io/go-nats" + "github.com/nats-io/go-nats-streaming/pb" +) + +const ( + // DefaultAckWait indicates how long the server should wait for an ACK before resending a message + DefaultAckWait = 30 * time.Second + // DefaultMaxInflight indicates how many messages with outstanding ACKs the server can send + DefaultMaxInflight = 1024 +) + +// Msg is the client defined message, which includes proto, then back link to subscription. +type Msg struct { + pb.MsgProto // MsgProto: Seq, Subject, Reply[opt], Data, Timestamp, CRC32[opt] + Sub Subscription +} + +// Subscriptions and Options + +// Subscription represents a subscription within the NATS Streaming cluster. Subscriptions +// will be rate matched and follow at-least delivery semantics. +type Subscription interface { + // Unsubscribe removes interest in the subscription. + // For durables, it means that the durable interest is also removed from + // the server. Restarting a durable with the same name will not resume + // the subscription, it will be considered a new one. + Unsubscribe() error + + // Close removes this subscriber from the server, but unlike Unsubscribe(), + // the durable interest is not removed. If the client has connected to a server + // for which this feature is not available, Close() will return a ErrNoServerSupport + // error. + Close() error +} + +// A subscription represents a subscription to a stan cluster. +type subscription struct { + sync.RWMutex + sc *conn + subject string + qgroup string + inbox string + ackInbox string + inboxSub *nats.Subscription + opts SubscriptionOptions + cb MsgHandler +} + +// SubscriptionOption is a function on the options for a subscription. +type SubscriptionOption func(*SubscriptionOptions) error + +// MsgHandler is a callback function that processes messages delivered to +// asynchronous subscribers. +type MsgHandler func(msg *Msg) + +// SubscriptionOptions are used to control the Subscription's behavior. +type SubscriptionOptions struct { + // DurableName, if set will survive client restarts. + DurableName string + // Controls the number of messages the cluster will have inflight without an ACK. + MaxInflight int + // Controls the time the cluster will wait for an ACK for a given message. + AckWait time.Duration + // StartPosition enum from proto. + StartAt pb.StartPosition + // Optional start sequence number. + StartSequence uint64 + // Optional start time. + StartTime time.Time + // Option to do Manual Acks + ManualAcks bool +} + +// DefaultSubscriptionOptions are the default subscriptions' options +var DefaultSubscriptionOptions = SubscriptionOptions{ + MaxInflight: DefaultMaxInflight, + AckWait: DefaultAckWait, +} + +// MaxInflight is an Option to set the maximum number of messages the cluster will send +// without an ACK. +func MaxInflight(m int) SubscriptionOption { + return func(o *SubscriptionOptions) error { + o.MaxInflight = m + return nil + } +} + +// AckWait is an Option to set the timeout for waiting for an ACK from the cluster's +// point of view for delivered messages. +func AckWait(t time.Duration) SubscriptionOption { + return func(o *SubscriptionOptions) error { + o.AckWait = t + return nil + } +} + +// StartAt sets the desired start position for the message stream. +func StartAt(sp pb.StartPosition) SubscriptionOption { + return func(o *SubscriptionOptions) error { + o.StartAt = sp + return nil + } +} + +// StartAtSequence sets the desired start sequence position and state. +func StartAtSequence(seq uint64) SubscriptionOption { + return func(o *SubscriptionOptions) error { + o.StartAt = pb.StartPosition_SequenceStart + o.StartSequence = seq + return nil + } +} + +// StartAtTime sets the desired start time position and state. +func StartAtTime(start time.Time) SubscriptionOption { + return func(o *SubscriptionOptions) error { + o.StartAt = pb.StartPosition_TimeDeltaStart + o.StartTime = start + return nil + } +} + +// StartAtTimeDelta sets the desired start time position and state using the delta. +func StartAtTimeDelta(ago time.Duration) SubscriptionOption { + return func(o *SubscriptionOptions) error { + o.StartAt = pb.StartPosition_TimeDeltaStart + o.StartTime = time.Now().Add(-ago) + return nil + } +} + +// StartWithLastReceived is a helper function to set start position to last received. +func StartWithLastReceived() SubscriptionOption { + return func(o *SubscriptionOptions) error { + o.StartAt = pb.StartPosition_LastReceived + return nil + } +} + +// DeliverAllAvailable will deliver all messages available. +func DeliverAllAvailable() SubscriptionOption { + return func(o *SubscriptionOptions) error { + o.StartAt = pb.StartPosition_First + return nil + } +} + +// SetManualAckMode will allow clients to control their own acks to delivered messages. +func SetManualAckMode() SubscriptionOption { + return func(o *SubscriptionOptions) error { + o.ManualAcks = true + return nil + } +} + +// DurableName sets the DurableName for the subcriber. +func DurableName(name string) SubscriptionOption { + return func(o *SubscriptionOptions) error { + o.DurableName = name + return nil + } +} + +// Subscribe will perform a subscription with the given options to the NATS Streaming cluster. +func (sc *conn) Subscribe(subject string, cb MsgHandler, options ...SubscriptionOption) (Subscription, error) { + return sc.subscribe(subject, "", cb, options...) +} + +// QueueSubscribe will perform a queue subscription with the given options to the NATS Streaming cluster. +func (sc *conn) QueueSubscribe(subject, qgroup string, cb MsgHandler, options ...SubscriptionOption) (Subscription, error) { + return sc.subscribe(subject, qgroup, cb, options...) +} + +// subscribe will perform a subscription with the given options to the NATS Streaming cluster. +func (sc *conn) subscribe(subject, qgroup string, cb MsgHandler, options ...SubscriptionOption) (Subscription, error) { + sub := &subscription{subject: subject, qgroup: qgroup, inbox: nats.NewInbox(), cb: cb, sc: sc, opts: DefaultSubscriptionOptions} + for _, opt := range options { + if err := opt(&sub.opts); err != nil { + return nil, err + } + } + sc.Lock() + if sc.nc == nil { + sc.Unlock() + return nil, ErrConnectionClosed + } + + // Register subscription. + sc.subMap[sub.inbox] = sub + nc := sc.nc + sc.Unlock() + + // Hold lock throughout. + sub.Lock() + defer sub.Unlock() + + // Listen for actual messages. + nsub, err := nc.Subscribe(sub.inbox, sc.processMsg) + if err != nil { + return nil, err + } + sub.inboxSub = nsub + + // Create a subscription request + // FIXME(dlc) add others. + sr := &pb.SubscriptionRequest{ + ClientID: sc.clientID, + Subject: subject, + QGroup: qgroup, + Inbox: sub.inbox, + MaxInFlight: int32(sub.opts.MaxInflight), + AckWaitInSecs: int32(sub.opts.AckWait / time.Second), + StartPosition: sub.opts.StartAt, + DurableName: sub.opts.DurableName, + } + + // Conditionals + switch sr.StartPosition { + case pb.StartPosition_TimeDeltaStart: + sr.StartTimeDelta = time.Now().UnixNano() - sub.opts.StartTime.UnixNano() + case pb.StartPosition_SequenceStart: + sr.StartSequence = sub.opts.StartSequence + } + + b, _ := sr.Marshal() + reply, err := sc.nc.Request(sc.subRequests, b, sc.opts.ConnectTimeout) + if err != nil { + sub.inboxSub.Unsubscribe() + if err == nats.ErrTimeout { + err = ErrSubReqTimeout + } + return nil, err + } + r := &pb.SubscriptionResponse{} + if err := r.Unmarshal(reply.Data); err != nil { + sub.inboxSub.Unsubscribe() + return nil, err + } + if r.Error != "" { + sub.inboxSub.Unsubscribe() + return nil, errors.New(r.Error) + } + sub.ackInbox = r.AckInbox + + return sub, nil +} + +// closeOrUnsubscribe performs either close or unsubsribe based on +// given boolean. +func (sub *subscription) closeOrUnsubscribe(doClose bool) error { + if sub == nil { + return ErrBadSubscription + } + sub.Lock() + sc := sub.sc + if sc == nil { + // Already closed. + sub.Unlock() + return ErrBadSubscription + } + sub.sc = nil + sub.inboxSub.Unsubscribe() + sub.inboxSub = nil + sub.Unlock() + + if sc == nil { + return ErrBadSubscription + } + + sc.Lock() + if sc.nc == nil { + sc.Unlock() + return ErrConnectionClosed + } + + delete(sc.subMap, sub.inbox) + reqSubject := sc.unsubRequests + if doClose { + reqSubject = sc.subCloseRequests + if reqSubject == "" { + sc.Unlock() + return ErrNoServerSupport + } + } + + // Snapshot connection to avoid data race, since the connection may be + // closing while we try to send the request + nc := sc.nc + sc.Unlock() + + usr := &pb.UnsubscribeRequest{ + ClientID: sc.clientID, + Subject: sub.subject, + Inbox: sub.ackInbox, + } + b, _ := usr.Marshal() + reply, err := nc.Request(reqSubject, b, sc.opts.ConnectTimeout) + if err != nil { + if err == nats.ErrTimeout { + if doClose { + return ErrCloseReqTimeout + } + return ErrUnsubReqTimeout + } + return err + } + r := &pb.SubscriptionResponse{} + if err := r.Unmarshal(reply.Data); err != nil { + return err + } + if r.Error != "" { + return errors.New(r.Error) + } + + return nil +} + +// Unsubscribe implements the Subscription interface +func (sub *subscription) Unsubscribe() error { + return sub.closeOrUnsubscribe(false) +} + +// Close implements the Subscription interface +func (sub *subscription) Close() error { + return sub.closeOrUnsubscribe(true) +} + +// Ack manually acknowledges a message. +// The subscriber had to be created with SetManualAckMode() option. +func (msg *Msg) Ack() error { + if msg == nil { + return ErrNilMsg + } + // Look up subscription + sub := msg.Sub.(*subscription) + if sub == nil { + return ErrBadSubscription + } + + sub.RLock() + ackSubject := sub.ackInbox + isManualAck := sub.opts.ManualAcks + sc := sub.sc + sub.RUnlock() + + // Check for error conditions. + if sc == nil { + return ErrBadSubscription + } + // Get nc from the connection (needs locking to avoid race) + sc.RLock() + nc := sc.nc + sc.RUnlock() + if nc == nil { + return ErrBadConnection + } + if !isManualAck { + return ErrManualAck + } + + // Ack here. + ack := &pb.Ack{Subject: msg.Subject, Sequence: msg.Sequence} + b, _ := ack.Marshal() + return nc.Publish(ackSubject, b) +}