Merge pull request #501 from mlaventure/new-shim-continued

New shim continued
This commit is contained in:
Michael Crosby 2017-02-07 15:52:08 -08:00 committed by GitHub
commit 42a17f9391
21 changed files with 1565 additions and 1442 deletions

File diff suppressed because it is too large Load diff

View file

@ -6,14 +6,14 @@ import "google/protobuf/empty.proto";
import "gogoproto/gogo.proto";
service ExecutionService {
rpc Create(CreateContainerRequest) returns (CreateContainerResponse);
rpc Start(StartContainerRequest) returns (google.protobuf.Empty);
rpc Update(UpdateContainerRequest) returns (google.protobuf.Empty);
rpc Pause(PauseContainerRequest) returns (google.protobuf.Empty);
rpc Resume(ResumeContainerRequest) returns (google.protobuf.Empty);
rpc Delete(DeleteContainerRequest) returns (google.protobuf.Empty);
rpc Get(GetContainerRequest) returns (GetContainerResponse);
rpc List(ListContainersRequest) returns (ListContainersResponse);
rpc CreateContainer(CreateContainerRequest) returns (CreateContainerResponse);
rpc StartContainer(StartContainerRequest) returns (google.protobuf.Empty);
rpc UpdateContainer(UpdateContainerRequest) returns (google.protobuf.Empty);
rpc PauseContainer(PauseContainerRequest) returns (google.protobuf.Empty);
rpc ResumeContainer(ResumeContainerRequest) returns (google.protobuf.Empty);
rpc DeleteContainer(DeleteContainerRequest) returns (google.protobuf.Empty);
rpc GetContainer(GetContainerRequest) returns (GetContainerResponse);
rpc ListContainers(ListContainersRequest) returns (ListContainersResponse);
rpc StartProcess(StartProcessRequest) returns (StartProcessResponse);
rpc GetProcess(GetProcessRequest) returns (GetProcessResponse);
@ -43,7 +43,6 @@ message CreateContainerResponse {
message DeleteContainerRequest {
string id = 1 [(gogoproto.customname) = "ID"];
uint32 pid = 2;
}
message ListContainersRequest {
@ -74,14 +73,13 @@ message Container {
}
message Process {
string id = 1 [(gogoproto.customname) = "ID"];
uint32 pid = 2;
repeated string args = 3;
repeated string env = 4;
User user = 5;
string cwd = 6;
bool terminal = 7;
uint32 exit_status = 8;
uint32 pid = 1;
repeated string args = 2;
repeated string env = 3;
User user = 4;
string cwd = 5;
bool terminal = 6;
uint32 exit_status = 7;
}
enum Status {
@ -120,7 +118,7 @@ message ResumeContainerRequest {
message GetProcessRequest {
string container_id = 1 [(gogoproto.customname) = "ContainerID"];
string process_id = 2 [(gogoproto.customname) = "ProcessID"];
uint32 pid = 2;
}
message GetProcessResponse {
@ -129,17 +127,17 @@ message GetProcessResponse {
message SignalProcessRequest {
string container_id = 1 [(gogoproto.customname) = "ContainerID"];
string process_id = 2 [(gogoproto.customname) = "ProcessID"];
uint32 pid = 2;
uint32 signal = 3;
}
message DeleteProcessRequest {
string container_id = 1 [(gogoproto.customname) = "ContainerID"];
string process_id = 2 [(gogoproto.customname) = "ProcessID"];
uint32 pid = 2;
}
message ListProcessesRequest {
string id = 1 [(gogoproto.customname) = "ID"];
string container_id = 1 [(gogoproto.customname) = "ContainerID"];
}
message ListProcessesResponse {

View file

@ -249,7 +249,8 @@ func (*StateRequest) Descriptor() ([]byte, []int) { return fileDescriptorShim, [
type StateResponse struct {
ID string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
Bundle string `protobuf:"bytes,2,opt,name=bundle,proto3" json:"bundle,omitempty"`
Processes []*Process `protobuf:"bytes,3,rep,name=processes" json:"processes,omitempty"`
InitPid uint32 `protobuf:"varint,3,opt,name=initPid,proto3" json:"initPid,omitempty"`
Processes []*Process `protobuf:"bytes,4,rep,name=processes" json:"processes,omitempty"`
}
func (m *StateResponse) Reset() { *m = StateResponse{} }
@ -463,10 +464,11 @@ func (this *StateResponse) GoString() string {
if this == nil {
return "nil"
}
s := make([]string, 0, 7)
s := make([]string, 0, 8)
s = append(s, "&shim.StateResponse{")
s = append(s, "ID: "+fmt.Sprintf("%#v", this.ID)+",\n")
s = append(s, "Bundle: "+fmt.Sprintf("%#v", this.Bundle)+",\n")
s = append(s, "InitPid: "+fmt.Sprintf("%#v", this.InitPid)+",\n")
if this.Processes != nil {
s = append(s, "Processes: "+fmt.Sprintf("%#v", this.Processes)+",\n")
}
@ -1432,9 +1434,14 @@ func (m *StateResponse) MarshalTo(dAtA []byte) (int, error) {
i = encodeVarintShim(dAtA, i, uint64(len(m.Bundle)))
i += copy(dAtA[i:], m.Bundle)
}
if m.InitPid != 0 {
dAtA[i] = 0x18
i++
i = encodeVarintShim(dAtA, i, uint64(m.InitPid))
}
if len(m.Processes) > 0 {
for _, msg := range m.Processes {
dAtA[i] = 0x1a
dAtA[i] = 0x22
i++
i = encodeVarintShim(dAtA, i, uint64(msg.Size()))
n, err := msg.MarshalTo(dAtA[i:])
@ -1772,6 +1779,9 @@ func (m *StateResponse) Size() (n int) {
if l > 0 {
n += 1 + l + sovShim(uint64(l))
}
if m.InitPid != 0 {
n += 1 + sovShim(uint64(m.InitPid))
}
if len(m.Processes) > 0 {
for _, e := range m.Processes {
l = e.Size()
@ -1980,6 +1990,7 @@ func (this *StateResponse) String() string {
s := strings.Join([]string{`&StateResponse{`,
`ID:` + fmt.Sprintf("%v", this.ID) + `,`,
`Bundle:` + fmt.Sprintf("%v", this.Bundle) + `,`,
`InitPid:` + fmt.Sprintf("%v", this.InitPid) + `,`,
`Processes:` + strings.Replace(fmt.Sprintf("%v", this.Processes), "Process", "Process", 1) + `,`,
`}`,
}, "")
@ -3725,6 +3736,25 @@ func (m *StateResponse) Unmarshal(dAtA []byte) error {
m.Bundle = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 3:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field InitPid", wireType)
}
m.InitPid = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowShim
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.InitPid |= (uint32(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
case 4:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Processes", wireType)
}
@ -4072,69 +4102,70 @@ var (
func init() { proto.RegisterFile("shim.proto", fileDescriptorShim) }
var fileDescriptorShim = []byte{
// 1018 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x94, 0x55, 0xcf, 0x6e, 0xe3, 0xb6,
0x13, 0x8e, 0x2c, 0xf9, 0xdf, 0x38, 0x72, 0xfc, 0x23, 0x16, 0x81, 0xe2, 0xfd, 0xd5, 0x71, 0xd4,
0x43, 0xd3, 0xa0, 0x70, 0x9a, 0xb4, 0x97, 0x16, 0xe8, 0x21, 0x1b, 0x0b, 0xdb, 0x00, 0x69, 0x62,
0xd0, 0x0e, 0xb0, 0x37, 0x43, 0x89, 0x18, 0x9b, 0x80, 0x2c, 0xa9, 0x24, 0x95, 0x3f, 0x3d, 0xf5,
0xd2, 0x47, 0xe8, 0x53, 0xf4, 0x45, 0xf6, 0xd8, 0x53, 0xd1, 0x53, 0xd1, 0xe4, 0x09, 0xfa, 0x08,
0x05, 0x49, 0xc9, 0x4e, 0x36, 0xd2, 0x02, 0xbd, 0xcd, 0x7c, 0xfa, 0x34, 0x9c, 0x6f, 0x38, 0x33,
0x04, 0xe0, 0x73, 0xba, 0x18, 0x24, 0x2c, 0x16, 0x31, 0x42, 0x57, 0x71, 0x24, 0x7c, 0x1a, 0x11,
0x16, 0x0c, 0x14, 0x7c, 0x73, 0xd0, 0x7d, 0x3d, 0x8b, 0xe3, 0x59, 0x48, 0xf6, 0x15, 0xe3, 0x32,
0xbd, 0xde, 0x27, 0x8b, 0x44, 0xdc, 0xeb, 0x1f, 0xba, 0xaf, 0x66, 0xf1, 0x2c, 0x56, 0xe6, 0xbe,
0xb4, 0x34, 0xea, 0xfe, 0x61, 0x80, 0x7d, 0xcc, 0x88, 0x2f, 0x08, 0x26, 0x3f, 0xa6, 0x84, 0x0b,
0xb4, 0x09, 0x15, 0x1a, 0x38, 0x46, 0xdf, 0xd8, 0x6d, 0xbe, 0xa9, 0x3d, 0xfe, 0xb5, 0x5d, 0x39,
0x19, 0xe2, 0x0a, 0x0d, 0xd0, 0x26, 0xd4, 0x2e, 0xd3, 0x28, 0x08, 0x89, 0x53, 0x91, 0xdf, 0x70,
0xe6, 0x21, 0x07, 0xea, 0x2c, 0x8d, 0x04, 0x5d, 0x10, 0xc7, 0x54, 0x1f, 0x72, 0x17, 0x6d, 0x41,
0x23, 0x8a, 0xa7, 0x09, 0xbd, 0x89, 0x85, 0x63, 0xf5, 0x8d, 0xdd, 0x06, 0xae, 0x47, 0xf1, 0x48,
0xba, 0xa8, 0x0b, 0x0d, 0x41, 0xd8, 0x82, 0x46, 0x7e, 0xe8, 0x54, 0xd5, 0xa7, 0xa5, 0x8f, 0x5e,
0x41, 0x95, 0x8b, 0x80, 0x46, 0x4e, 0x4d, 0x85, 0xd3, 0x8e, 0x3c, 0x9e, 0x8b, 0x20, 0x4e, 0x85,
0x53, 0xd7, 0xc7, 0x6b, 0x2f, 0xc3, 0x09, 0x63, 0x4e, 0x63, 0x89, 0x13, 0xc6, 0x5c, 0x17, 0xda,
0xb9, 0x2e, 0x9e, 0xc4, 0x11, 0x27, 0xa8, 0x03, 0x66, 0x92, 0x29, 0xb3, 0xb1, 0x34, 0xdd, 0x36,
0xac, 0x8f, 0x85, 0xcf, 0x44, 0x26, 0xdd, 0xdd, 0x01, 0x7b, 0x48, 0x42, 0xb2, 0xaa, 0xc5, 0xcb,
0x5f, 0x0e, 0xa0, 0x9d, 0x53, 0xb2, 0xb0, 0xdb, 0xd0, 0x22, 0x77, 0x54, 0x4c, 0xb9, 0xf0, 0x45,
0xca, 0x33, 0x2e, 0x48, 0x68, 0xac, 0x10, 0xf7, 0x37, 0x13, 0x5a, 0xde, 0x1d, 0xb9, 0xca, 0x83,
0x3e, 0xd5, 0x6e, 0x94, 0x69, 0xaf, 0x14, 0x6b, 0x37, 0x4b, 0xb4, 0x5b, 0x4f, 0xb5, 0xa3, 0x4f,
0xc1, 0xe6, 0x24, 0xa4, 0x51, 0x7a, 0x37, 0x0d, 0xfd, 0x4b, 0xa2, 0x4b, 0xdc, 0xc4, 0xeb, 0x19,
0x78, 0x2a, 0x31, 0xf4, 0x05, 0x58, 0x29, 0x27, 0x4c, 0x55, 0xb9, 0x75, 0xe8, 0x0c, 0x5e, 0xf6,
0xd3, 0xe0, 0x82, 0x13, 0x86, 0x15, 0x0b, 0x21, 0xb0, 0x7c, 0x36, 0xe3, 0x4e, 0xbd, 0x6f, 0xee,
0x36, 0xb1, 0xb2, 0x65, 0x75, 0x48, 0x74, 0xe3, 0x34, 0x14, 0x24, 0x4d, 0x89, 0x5c, 0xdd, 0x06,
0x4e, 0x53, 0x1d, 0x27, 0x4d, 0xe4, 0xc2, 0xfa, 0x95, 0x9f, 0xf8, 0x97, 0x34, 0xa4, 0x82, 0x12,
0xee, 0x80, 0x22, 0x3f, 0xc3, 0xd0, 0xd7, 0x50, 0x67, 0x21, 0x5d, 0x50, 0xc1, 0x9d, 0x56, 0xdf,
0xdc, 0x6d, 0x1d, 0x76, 0x8b, 0x92, 0xc1, 0x8a, 0x82, 0x73, 0x2a, 0xda, 0x83, 0xff, 0x45, 0xf1,
0x34, 0x22, 0xb7, 0xd3, 0x84, 0xd1, 0x1b, 0x1a, 0x92, 0x19, 0xe1, 0xce, 0xba, 0xaa, 0xe7, 0x46,
0x14, 0x9f, 0x91, 0xdb, 0xd1, 0x12, 0x46, 0x9f, 0x43, 0xc7, 0x4f, 0x12, 0x9f, 0x2d, 0x62, 0x36,
0x4d, 0x58, 0x7c, 0x4d, 0x43, 0xe2, 0xd8, 0x2a, 0xc9, 0x8d, 0x1c, 0x1f, 0x69, 0xd8, 0x1d, 0x83,
0x25, 0x65, 0x4b, 0x29, 0xe9, 0xea, 0xea, 0x53, 0x1a, 0x48, 0x64, 0x46, 0x03, 0x75, 0x33, 0x36,
0x96, 0x26, 0xfa, 0x0c, 0x36, 0xfc, 0x20, 0xa0, 0x82, 0xc6, 0x91, 0x1f, 0x4e, 0x67, 0x34, 0xe0,
0x8e, 0xd9, 0x37, 0x77, 0x6d, 0xdc, 0x5e, 0xc1, 0x6f, 0x69, 0xc0, 0xdd, 0x21, 0xd4, 0x74, 0xfa,
0xb2, 0x8e, 0xe2, 0x3e, 0x21, 0x7a, 0xbe, 0xb0, 0xb2, 0x25, 0x36, 0xf7, 0x99, 0x8e, 0x6c, 0x61,
0x65, 0x4b, 0x8c, 0xc7, 0xd7, 0xfa, 0xc2, 0x2d, 0xac, 0x6c, 0xb7, 0x0f, 0xeb, 0xba, 0x8f, 0x4a,
0x1b, 0xfa, 0x14, 0x60, 0x24, 0xee, 0x4b, 0xbb, 0x57, 0xb6, 0xd7, 0x2d, 0x0d, 0xc4, 0x3c, 0x13,
0xa1, 0x1d, 0xd9, 0x46, 0x73, 0x42, 0x67, 0x73, 0x7d, 0x9a, 0x8d, 0x33, 0xcf, 0xdd, 0x00, 0xdb,
0xbb, 0x21, 0x91, 0xe0, 0xf9, 0x7c, 0xfc, 0x62, 0x40, 0x55, 0x21, 0xa5, 0x4b, 0xe2, 0x20, 0x93,
0x27, 0xe3, 0xb7, 0x0f, 0x3f, 0x29, 0xba, 0x47, 0x15, 0x60, 0x72, 0x9f, 0x90, 0x4c, 0x7d, 0x96,
0xa5, 0xb9, 0xca, 0xf2, 0x83, 0x89, 0xb2, 0x5e, 0x4c, 0x94, 0x9e, 0xdb, 0xe5, 0x98, 0xba, 0x3f,
0x81, 0x9d, 0xf9, 0x59, 0x65, 0xfe, 0xeb, 0x0e, 0xfb, 0x06, 0x9a, 0x09, 0x8b, 0xaf, 0x08, 0xe7,
0x44, 0x5f, 0x61, 0xeb, 0xf0, 0x75, 0x51, 0xee, 0x23, 0x4d, 0xc2, 0x2b, 0xb6, 0x7b, 0x0a, 0xf5,
0x0c, 0x2d, 0xa8, 0xf7, 0xbe, 0x1c, 0x67, 0x5f, 0xe4, 0xf5, 0xd8, 0x2a, 0x8a, 0xa9, 0x33, 0xd7,
0x3c, 0xa9, 0x6c, 0xe4, 0xa7, 0x7c, 0xa9, 0x6c, 0x03, 0x6c, 0x4c, 0x78, 0xba, 0xc8, 0x81, 0x3d,
0x0f, 0x9a, 0xcb, 0x02, 0xa2, 0x06, 0x58, 0xde, 0xbb, 0x93, 0x49, 0x67, 0x0d, 0xd5, 0xc1, 0x3c,
0x3f, 0xff, 0xa1, 0x63, 0x20, 0x80, 0xda, 0x31, 0xf6, 0x8e, 0x26, 0x5e, 0xa7, 0x82, 0x9a, 0x50,
0x1d, 0x4f, 0x8e, 0xf0, 0xa4, 0x63, 0xa2, 0x36, 0x80, 0xf7, 0xce, 0x3b, 0x9e, 0x1e, 0x0d, 0x87,
0xde, 0xb0, 0x63, 0xed, 0x7d, 0x0b, 0x55, 0x75, 0x2e, 0x6a, 0x41, 0x7d, 0x3c, 0x39, 0x1f, 0x8d,
0xbc, 0x61, 0x67, 0x4d, 0x3a, 0xf8, 0xe2, 0xec, 0xec, 0xe4, 0xec, 0xad, 0x8e, 0x34, 0x3a, 0xba,
0x18, 0x7b, 0xc3, 0x4e, 0x45, 0x7e, 0xd0, 0x51, 0x87, 0x1d, 0xf3, 0xf0, 0xd7, 0x2a, 0x58, 0xe3,
0x39, 0x5d, 0xa0, 0x73, 0xa8, 0xe9, 0x15, 0x8b, 0x76, 0x8a, 0x84, 0x3d, 0x7b, 0x56, 0xba, 0xee,
0xc7, 0x28, 0xd9, 0xb5, 0x1d, 0xa9, 0xac, 0x98, 0x40, 0xfd, 0x92, 0x42, 0x2d, 0x57, 0x75, 0x77,
0x73, 0xa0, 0xdf, 0xba, 0x41, 0xfe, 0xd6, 0x0d, 0x3c, 0xf9, 0xd6, 0xc9, 0x9c, 0xf4, 0x7e, 0x2e,
0xce, 0xe9, 0xd9, 0x7a, 0x2f, 0xce, 0xe9, 0x83, 0xf5, 0x7e, 0x02, 0x96, 0x1c, 0x3a, 0xb4, 0x5d,
0xd8, 0xcb, 0xab, 0xb5, 0xde, 0xed, 0x97, 0x13, 0xb2, 0x50, 0xdf, 0x81, 0x39, 0x12, 0xf7, 0xa8,
0x57, 0xd8, 0x59, 0xcb, 0xb1, 0x2d, 0x95, 0xf6, 0x3d, 0xd4, 0xf4, 0x38, 0x16, 0x4b, 0x7b, 0x36,
0xaa, 0xdd, 0xad, 0x52, 0xca, 0x97, 0x06, 0x3a, 0xcd, 0x6f, 0xbf, 0x5f, 0xde, 0x90, 0x59, 0x9c,
0x9d, 0x8f, 0x30, 0x56, 0xb7, 0xa6, 0x7a, 0xb6, 0x38, 0xda, 0xd3, 0x76, 0x2e, 0x95, 0x76, 0x0c,
0x35, 0xdd, 0xe6, 0xc5, 0xd2, 0x9e, 0x8d, 0x40, 0x59, 0x90, 0x37, 0xff, 0x7f, 0xff, 0xd0, 0x5b,
0xfb, 0xf3, 0xa1, 0xb7, 0xf6, 0xcf, 0x43, 0xcf, 0xf8, 0xf9, 0xb1, 0x67, 0xbc, 0x7f, 0xec, 0x19,
0xbf, 0x3f, 0xf6, 0x8c, 0xbf, 0x1f, 0x7b, 0xc6, 0x65, 0x4d, 0xb1, 0xbf, 0xfa, 0x37, 0x00, 0x00,
0xff, 0xff, 0x62, 0xce, 0xef, 0x76, 0x44, 0x09, 0x00, 0x00,
// 1027 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x9c, 0x55, 0xcf, 0x6e, 0xe3, 0xb6,
0x13, 0x8e, 0x2c, 0xf9, 0xdf, 0x38, 0x76, 0xfc, 0x23, 0x16, 0x81, 0xe2, 0xfd, 0xd5, 0x71, 0xd4,
0x43, 0xd3, 0xa0, 0x70, 0x9a, 0xb4, 0x97, 0x16, 0xe8, 0x21, 0x1b, 0x0b, 0xdb, 0x00, 0x69, 0x22,
0xd0, 0x0e, 0xb0, 0x37, 0x43, 0x89, 0x18, 0x9b, 0x80, 0x2c, 0xa9, 0x24, 0x95, 0x3f, 0xb7, 0x5e,
0xfa, 0x06, 0xed, 0x53, 0xf4, 0x45, 0xf6, 0xd8, 0x53, 0xd1, 0x53, 0xd1, 0xe4, 0x09, 0xfa, 0x08,
0x05, 0x49, 0xc9, 0x4e, 0x36, 0xd2, 0x1e, 0x7a, 0x9b, 0xf9, 0xf4, 0x69, 0x38, 0xdf, 0x70, 0x66,
0x08, 0xc0, 0xe7, 0x74, 0x31, 0x4c, 0x58, 0x2c, 0x62, 0x84, 0xae, 0xe2, 0x48, 0xf8, 0x34, 0x22,
0x2c, 0x18, 0x2a, 0xf8, 0xe6, 0xa0, 0xf7, 0x7a, 0x16, 0xc7, 0xb3, 0x90, 0xec, 0x2b, 0xc6, 0x65,
0x7a, 0xbd, 0x4f, 0x16, 0x89, 0xb8, 0xd7, 0x3f, 0xf4, 0x5e, 0xcd, 0xe2, 0x59, 0xac, 0xcc, 0x7d,
0x69, 0x69, 0xd4, 0xf9, 0xc3, 0x80, 0xf6, 0x31, 0x23, 0xbe, 0x20, 0x98, 0xfc, 0x98, 0x12, 0x2e,
0xd0, 0x26, 0x54, 0x68, 0x60, 0x1b, 0x03, 0x63, 0xb7, 0xf9, 0xa6, 0xf6, 0xf8, 0xd7, 0x76, 0xe5,
0x64, 0x84, 0x2b, 0x34, 0x40, 0x9b, 0x50, 0xbb, 0x4c, 0xa3, 0x20, 0x24, 0x76, 0x45, 0x7e, 0xc3,
0x99, 0x87, 0x6c, 0xa8, 0xb3, 0x34, 0x12, 0x74, 0x41, 0x6c, 0x53, 0x7d, 0xc8, 0x5d, 0xb4, 0x05,
0x8d, 0x28, 0x9e, 0x26, 0xf4, 0x26, 0x16, 0xb6, 0x35, 0x30, 0x76, 0x1b, 0xb8, 0x1e, 0xc5, 0x9e,
0x74, 0x51, 0x0f, 0x1a, 0x82, 0xb0, 0x05, 0x8d, 0xfc, 0xd0, 0xae, 0xaa, 0x4f, 0x4b, 0x1f, 0xbd,
0x82, 0x2a, 0x17, 0x01, 0x8d, 0xec, 0x9a, 0x0a, 0xa7, 0x1d, 0x79, 0x3c, 0x17, 0x41, 0x9c, 0x0a,
0xbb, 0xae, 0x8f, 0xd7, 0x5e, 0x86, 0x13, 0xc6, 0xec, 0xc6, 0x12, 0x27, 0x8c, 0x39, 0x0e, 0x74,
0x72, 0x5d, 0x3c, 0x89, 0x23, 0x4e, 0x50, 0x17, 0xcc, 0x24, 0x53, 0xd6, 0xc6, 0xd2, 0x74, 0x3a,
0xb0, 0x3e, 0x16, 0x3e, 0x13, 0x99, 0x74, 0x67, 0x07, 0xda, 0x23, 0x12, 0x92, 0x55, 0x2d, 0x5e,
0xfe, 0x72, 0x00, 0x9d, 0x9c, 0x92, 0x85, 0xdd, 0x86, 0x16, 0xb9, 0xa3, 0x62, 0xca, 0x85, 0x2f,
0x52, 0x9e, 0x71, 0x41, 0x42, 0x63, 0x85, 0x38, 0xbf, 0x99, 0xd0, 0x72, 0xef, 0xc8, 0x55, 0x1e,
0xf4, 0xa9, 0x76, 0xa3, 0x4c, 0x7b, 0xa5, 0x58, 0xbb, 0x59, 0xa2, 0xdd, 0x7a, 0xaa, 0x1d, 0x7d,
0x0a, 0x6d, 0x4e, 0x42, 0x1a, 0xa5, 0x77, 0xd3, 0xd0, 0xbf, 0x24, 0xba, 0xc4, 0x4d, 0xbc, 0x9e,
0x81, 0xa7, 0x12, 0x43, 0x5f, 0x80, 0x95, 0x72, 0xc2, 0x54, 0x95, 0x5b, 0x87, 0xf6, 0xf0, 0x65,
0x3f, 0x0d, 0x2f, 0x38, 0x61, 0x58, 0xb1, 0x10, 0x02, 0xcb, 0x67, 0x33, 0x6e, 0xd7, 0x07, 0xe6,
0x6e, 0x13, 0x2b, 0x5b, 0x56, 0x87, 0x44, 0x37, 0x76, 0x43, 0x41, 0xd2, 0x94, 0xc8, 0xd5, 0x6d,
0x60, 0x37, 0xd5, 0x71, 0xd2, 0x44, 0x0e, 0xac, 0x5f, 0xf9, 0x89, 0x7f, 0x49, 0x43, 0x2a, 0x28,
0xe1, 0x36, 0x28, 0xf2, 0x33, 0x0c, 0x7d, 0x0d, 0x75, 0x16, 0xd2, 0x05, 0x15, 0xdc, 0x6e, 0x0d,
0xcc, 0xdd, 0xd6, 0x61, 0xaf, 0x28, 0x19, 0xac, 0x28, 0x38, 0xa7, 0xa2, 0x3d, 0xf8, 0x5f, 0x14,
0x4f, 0x23, 0x72, 0x3b, 0x4d, 0x18, 0xbd, 0xa1, 0x21, 0x99, 0x11, 0x6e, 0xaf, 0xab, 0x7a, 0x6e,
0x44, 0xf1, 0x19, 0xb9, 0xf5, 0x96, 0x30, 0xfa, 0x1c, 0xba, 0x7e, 0x92, 0xf8, 0x6c, 0x11, 0xb3,
0x69, 0xc2, 0xe2, 0x6b, 0x1a, 0x12, 0xbb, 0xad, 0x92, 0xdc, 0xc8, 0x71, 0x4f, 0xc3, 0xce, 0x18,
0x2c, 0x29, 0x5b, 0x4a, 0x49, 0x57, 0x57, 0x9f, 0xd2, 0x40, 0x22, 0x33, 0x1a, 0xa8, 0x9b, 0x69,
0x63, 0x69, 0xa2, 0xcf, 0x60, 0xc3, 0x0f, 0x02, 0x2a, 0x68, 0x1c, 0xf9, 0xe1, 0x74, 0x46, 0x03,
0x6e, 0x9b, 0x03, 0x73, 0xb7, 0x8d, 0x3b, 0x2b, 0xf8, 0x2d, 0x0d, 0xb8, 0x33, 0x82, 0x9a, 0x4e,
0x5f, 0xd6, 0x51, 0xdc, 0x27, 0x44, 0xcf, 0x17, 0x56, 0xb6, 0xc4, 0xe6, 0x3e, 0xd3, 0x91, 0x2d,
0xac, 0x6c, 0x89, 0xf1, 0xf8, 0x5a, 0x5f, 0xb8, 0x85, 0x95, 0xed, 0x0c, 0x60, 0x5d, 0xf7, 0x51,
0x69, 0x43, 0x9f, 0x02, 0x78, 0xe2, 0xbe, 0xb4, 0x7b, 0x65, 0x7b, 0xdd, 0xd2, 0x40, 0xcc, 0x33,
0x11, 0xda, 0x91, 0x6d, 0x34, 0x27, 0x74, 0x36, 0xd7, 0xa7, 0xb5, 0x71, 0xe6, 0x39, 0x1b, 0xd0,
0x76, 0x6f, 0x48, 0x24, 0x78, 0x3e, 0x1f, 0x3f, 0x1b, 0x50, 0x55, 0x48, 0xe9, 0x92, 0x38, 0xc8,
0xe4, 0xc9, 0xf8, 0x9d, 0xc3, 0x4f, 0x8a, 0xee, 0x51, 0x05, 0x98, 0xdc, 0x27, 0x24, 0x53, 0x9f,
0x65, 0x69, 0xae, 0xb2, 0xfc, 0x60, 0xa2, 0xac, 0x17, 0x13, 0xa5, 0xe7, 0x76, 0x39, 0xa6, 0xce,
0x2f, 0x06, 0xb4, 0x33, 0x20, 0x2b, 0xcd, 0x7f, 0x58, 0x62, 0x34, 0xa2, 0xc2, 0x5b, 0x26, 0x92,
0xbb, 0xe8, 0x1b, 0x68, 0x26, 0x2c, 0xbe, 0x22, 0x9c, 0x13, 0x99, 0x8a, 0x6c, 0xcf, 0xd7, 0x45,
0xb2, 0x3c, 0x4d, 0xc2, 0x2b, 0xb6, 0x73, 0x0a, 0xf5, 0x0c, 0x2d, 0xb8, 0x8a, 0x7d, 0x39, 0xe9,
0xbe, 0xc8, 0x4b, 0xb5, 0x55, 0x14, 0x53, 0x6b, 0xd2, 0x3c, 0x29, 0xda, 0xf3, 0x53, 0xbe, 0x14,
0xbd, 0x01, 0x6d, 0x4c, 0x78, 0xba, 0xc8, 0x81, 0x3d, 0x17, 0x9a, 0xcb, 0xda, 0xa2, 0x06, 0x58,
0xee, 0xbb, 0x93, 0x49, 0x77, 0x0d, 0xd5, 0xc1, 0x3c, 0x3f, 0xff, 0xa1, 0x6b, 0x20, 0x80, 0xda,
0x31, 0x76, 0x8f, 0x26, 0x6e, 0xb7, 0x82, 0x9a, 0x50, 0x1d, 0x4f, 0x8e, 0xf0, 0xa4, 0x6b, 0xa2,
0x0e, 0x80, 0xfb, 0xce, 0x3d, 0x9e, 0x1e, 0x8d, 0x46, 0xee, 0xa8, 0x6b, 0xed, 0x7d, 0x0b, 0x55,
0x75, 0x2e, 0x6a, 0x41, 0x7d, 0x3c, 0x39, 0xf7, 0x3c, 0x77, 0xd4, 0x5d, 0x93, 0x0e, 0xbe, 0x38,
0x3b, 0x3b, 0x39, 0x7b, 0xab, 0x23, 0x79, 0x47, 0x17, 0x63, 0x77, 0xd4, 0xad, 0xc8, 0x0f, 0x3a,
0xea, 0xa8, 0x6b, 0x1e, 0xfe, 0x5a, 0x05, 0x6b, 0x3c, 0xa7, 0x0b, 0x74, 0x0e, 0x35, 0xbd, 0x7d,
0xd1, 0x4e, 0x91, 0xb0, 0x67, 0x2f, 0x4e, 0xcf, 0xf9, 0x18, 0x25, 0xbb, 0xd0, 0x23, 0x95, 0x15,
0x13, 0x68, 0x50, 0x52, 0xa8, 0xe5, 0x16, 0xef, 0x6d, 0x0e, 0xf5, 0x33, 0x38, 0xcc, 0x9f, 0xc1,
0xa1, 0x2b, 0x9f, 0x41, 0x99, 0x93, 0x5e, 0xdd, 0xc5, 0x39, 0x3d, 0xdb, 0xfc, 0xc5, 0x39, 0x7d,
0xb0, 0xf9, 0x4f, 0xc0, 0x92, 0xf3, 0x88, 0xb6, 0x0b, 0xdb, 0x7c, 0xb5, 0xf1, 0x7b, 0x83, 0x72,
0x42, 0x16, 0xea, 0x3b, 0x30, 0x3d, 0x71, 0x8f, 0xfa, 0x85, 0x9d, 0xb5, 0x9c, 0xe8, 0x52, 0x69,
0xdf, 0x43, 0x4d, 0x4f, 0x6a, 0xb1, 0xb4, 0x67, 0x53, 0xdc, 0xdb, 0x2a, 0xa5, 0x7c, 0x69, 0xa0,
0xd3, 0xfc, 0xf6, 0x07, 0xe5, 0x0d, 0x99, 0xc5, 0xd9, 0xf9, 0x08, 0x63, 0x75, 0x6b, 0xaa, 0x67,
0x8b, 0xa3, 0x3d, 0x6d, 0xe7, 0x52, 0x69, 0xc7, 0x50, 0xd3, 0x6d, 0x5e, 0x2c, 0xed, 0xd9, 0x08,
0x94, 0x05, 0x79, 0xf3, 0xff, 0xf7, 0x0f, 0xfd, 0xb5, 0x3f, 0x1f, 0xfa, 0x6b, 0xff, 0x3c, 0xf4,
0x8d, 0x9f, 0x1e, 0xfb, 0xc6, 0xfb, 0xc7, 0xbe, 0xf1, 0xfb, 0x63, 0xdf, 0xf8, 0xfb, 0xb1, 0x6f,
0x5c, 0xd6, 0x14, 0xfb, 0xab, 0x7f, 0x03, 0x00, 0x00, 0xff, 0xff, 0xcb, 0x48, 0xee, 0xf6, 0x5f,
0x09, 0x00, 0x00,
}

View file

@ -105,7 +105,8 @@ message StateRequest {
message StateResponse {
string id = 1 [(gogoproto.customname) = "ID"];
string bundle = 2;
repeated Process processes = 3;
uint32 initPid = 3;
repeated Process processes = 4;
}
// TODO: share core runtime types between shim and execution rpcs

View file

@ -9,7 +9,6 @@ import (
"net/url"
"os"
"os/signal"
"path/filepath"
"runtime"
"strconv"
"strings"
@ -26,8 +25,10 @@ import (
"github.com/docker/containerd/supervisor"
"github.com/docker/containerd/utils"
metrics "github.com/docker/go-metrics"
"github.com/pkg/errors"
"github.com/urfave/cli"
natsd "github.com/nats-io/gnatsd/server"
"github.com/nats-io/go-nats"
stand "github.com/nats-io/nats-streaming-server/server"
)
@ -42,6 +43,11 @@ const usage = `
high performance container runtime
`
const (
StanClusterID = "containerd"
stanClientID = "containerd"
)
func main() {
app := cli.NewApp()
app.Name = "containerd"
@ -127,19 +133,12 @@ func main() {
}
// Get events publisher
nec, err := getNATSPublisher(ea)
natsPoster, err := events.NewNATSPoster(StanClusterID, stanClientID)
if err != nil {
return err
}
defer nec.Close()
execCtx := log.WithModule(ctx, "execution")
execCtx = events.WithPoster(execCtx, events.GetNATSPoster(nec))
root := filepath.Join(context.GlobalString("root"), "shim")
err = os.Mkdir(root, 0700)
if err != nil && !os.IsExist(err) {
return err
}
execCtx = events.WithPoster(execCtx, natsPoster)
execService, err := supervisor.New(execCtx, context.GlobalString("root"))
if err != nil {
return err
@ -151,7 +150,7 @@ func main() {
switch info.Server.(type) {
case api.ExecutionServiceServer:
ctx = log.WithModule(ctx, "execution")
ctx = events.WithPoster(ctx, events.GetNATSPoster(nec))
ctx = events.WithPoster(ctx, natsPoster)
default:
fmt.Printf("Unknown type: %#v\n", info.Server)
}
@ -218,25 +217,10 @@ func dumpStacks(ctx gocontext.Context) {
log.G(ctx).Infof("=== BEGIN goroutine stack dump ===\n%s\n=== END goroutine stack dump ===", buf)
}
func startNATSServer(eventsAddress string) (e *stand.StanServer, err error) {
eventsURL, err := url.Parse(eventsAddress)
if err != nil {
return nil, err
}
no := stand.DefaultNatsServerOptions
nOpts := &no
nOpts.NoSigs = true
parts := strings.Split(eventsURL.Host, ":")
nOpts.Host = parts[0]
if len(parts) == 2 {
nOpts.Port, err = strconv.Atoi(parts[1])
} else {
nOpts.Port = nats.DefaultPort
}
func startNATSServer(address string) (s *stand.StanServer, err error) {
defer func() {
if r := recover(); r != nil {
e = nil
s = nil
if _, ok := r.(error); !ok {
err = fmt.Errorf("failed to start NATS server: %v", r)
} else {
@ -244,21 +228,32 @@ func startNATSServer(eventsAddress string) (e *stand.StanServer, err error) {
}
}
}()
s := stand.RunServerWithOpts(nil, nOpts)
return s, nil
}
func getNATSPublisher(eventsAddress string) (*nats.EncodedConn, error) {
nc, err := nats.Connect(eventsAddress)
so, no, err := getServerOptions(address)
if err != nil {
return nil, err
}
nec, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
s = stand.RunServerWithOpts(so, no)
return s, err
}
func getServerOptions(address string) (*stand.Options, *natsd.Options, error) {
url, err := url.Parse(address)
if err != nil {
nc.Close()
return nil, err
return nil, nil, errors.Wrapf(err, "failed to parse address url %q", address)
}
return nec, nil
no := stand.DefaultNatsServerOptions
parts := strings.Split(url.Host, ":")
if len(parts) == 2 {
no.Port, err = strconv.Atoi(parts[1])
} else {
no.Port = nats.DefaultPort
}
no.Host = parts[0]
so := stand.GetDefaultOptions()
so.ID = StanClusterID
return so, &no, nil
}

View file

@ -29,18 +29,19 @@ var deleteCommand = cli.Command{
return fmt.Errorf("container id must be provided")
}
pid := context.String("pid")
if pid != "" {
pid := uint32(context.Int64("pid"))
if pid != 0 {
_, err = executionService.DeleteProcess(gocontext.Background(), &execution.DeleteProcessRequest{
ContainerID: id,
ProcessID: pid,
Pid: pid,
})
if err != nil {
return err
}
return nil
}
if _, err := executionService.Delete(gocontext.Background(), &execution.DeleteContainerRequest{
if _, err := executionService.DeleteContainer(gocontext.Background(), &execution.DeleteContainerRequest{
ID: id,
}); err != nil {
return err

View file

@ -3,6 +3,7 @@ package main
import (
"os"
"path/filepath"
"time"
gocontext "context"
@ -18,10 +19,6 @@ var execCommand = cli.Command{
Name: "id, i",
Usage: "target container id",
},
cli.StringFlag{
Name: "pid, p",
Usage: "new process id",
},
cli.StringFlag{
Name: "cwd, c",
Usage: "current working directory for the process",
@ -42,17 +39,16 @@ var execCommand = cli.Command{
return err
}
id := context.String("id")
tmpDir, err := getTempDir(id)
tmpDir, err := getTempDir(time.Now().Format("2006-02-01_15:04:05"))
if err != nil {
return err
}
defer os.RemoveAll(tmpDir)
id := context.String("id")
sOpts := &execution.StartProcessRequest{
ContainerID: id,
Process: &execution.Process{
ID: context.String("pid"),
Cwd: context.String("cwd"),
Terminal: context.Bool("tty"),
Args: context.Args(),
@ -76,7 +72,7 @@ var execCommand = cli.Command{
_, err = executionService.DeleteProcess(gocontext.Background(), &execution.DeleteProcessRequest{
ContainerID: id,
ProcessID: sr.Process.ID,
Pid: sr.Process.Pid,
})
if err != nil {
return err

View file

@ -22,13 +22,13 @@ var inspectCommand = cli.Command{
if id == "" {
return fmt.Errorf("container id must be provided")
}
getResponse, err := executionService.Get(gocontext.Background(),
getResponse, err := executionService.GetContainer(gocontext.Background(),
&execution.GetContainerRequest{ID: id})
if err != nil {
return err
}
listProcResponse, err := executionService.ListProcesses(gocontext.Background(),
&execution.ListProcessesRequest{ID: id})
&execution.ListProcessesRequest{ContainerID: id})
if err != nil {
return err
}

View file

@ -16,7 +16,7 @@ var listCommand = cli.Command{
if err != nil {
return err
}
listResponse, err := executionService.List(gocontext.Background(), &execution.ListContainersRequest{
listResponse, err := executionService.ListContainers(gocontext.Background(), &execution.ListContainersRequest{
Owner: []string{},
})
if err != nil {
@ -25,7 +25,7 @@ var listCommand = cli.Command{
fmt.Printf("ID\tSTATUS\tPROCS\tBUNDLE\n")
for _, c := range listResponse.Containers {
listProcResponse, err := executionService.ListProcesses(gocontext.Background(),
&execution.ListProcessesRequest{ID: c.ID})
&execution.ListProcessesRequest{ContainerID: c.ID})
if err != nil {
return err
}

View file

@ -1,6 +1,7 @@
package main
import (
"encoding/json"
"fmt"
"os"
"path/filepath"
@ -8,10 +9,12 @@ import (
gocontext "context"
"github.com/crosbymichael/console"
"github.com/docker/containerd/api/execution"
execEvents "github.com/docker/containerd/execution"
"github.com/docker/docker/pkg/term"
"github.com/nats-io/go-nats"
"github.com/nats-io/go-nats-streaming"
"github.com/pkg/errors"
"github.com/urfave/cli"
)
@ -39,20 +42,23 @@ var runCommand = cli.Command{
}
// setup our event subscriber
nc, err := nats.Connect(nats.DefaultURL)
sc, err := stan.Connect("containerd", "ctr", stan.ConnectWait(5*time.Second))
if err != nil {
return err
}
nec, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
if err != nil {
nc.Close()
return err
}
defer nec.Close()
defer sc.Close()
evCh := make(chan *execEvents.ContainerExitEvent, 64)
sub, err := nec.Subscribe(execEvents.ContainersEventsSubjectSubscriber, func(e *execEvents.ContainerExitEvent) {
evCh <- e
evCh := make(chan *execEvents.ContainerEvent, 64)
sub, err := sc.Subscribe(fmt.Sprintf("containers.%s", id), func(m *stan.Msg) {
var e execEvents.ContainerEvent
err := json.Unmarshal(m.Data, &e)
if err != nil {
fmt.Printf("failed to unmarshal event: %v", err)
return
}
evCh <- &e
})
if err != nil {
return err
@ -78,19 +84,12 @@ var runCommand = cli.Command{
Stderr: filepath.Join(tmpDir, "stderr"),
}
var oldState *term.State
restoreTerm := func() {
if oldState != nil {
term.RestoreTerminal(os.Stdin.Fd(), oldState)
}
}
if crOpts.Console {
oldState, err = term.SetRawTerminal(os.Stdin.Fd())
if err != nil {
con := console.Current()
defer con.Reset()
if err := con.SetRaw(); err != nil {
return err
}
defer restoreTerm()
}
fwg, err := prepareStdio(crOpts.Stdin, crOpts.Stdout, crOpts.Stderr, crOpts.Console)
@ -98,15 +97,15 @@ var runCommand = cli.Command{
return err
}
cr, err := executionService.Create(gocontext.Background(), crOpts)
cr, err := executionService.CreateContainer(gocontext.Background(), crOpts)
if err != nil {
return err
return errors.Wrap(err, "CreateContainer RPC failed")
}
if _, err := executionService.Start(gocontext.Background(), &execution.StartContainerRequest{
if _, err := executionService.StartContainer(gocontext.Background(), &execution.StartContainerRequest{
ID: cr.Container.ID,
}); err != nil {
return err
return errors.Wrap(err, "StartContainer RPC failed")
}
var ec uint32
@ -118,28 +117,33 @@ var runCommand = cli.Command{
break eventLoop
}
if e.ID == cr.Container.ID && e.PID == cr.InitProcess.ID {
ec = e.StatusCode
if e.Type != "exit" {
continue
}
if e.ID == cr.Container.ID && e.Pid == cr.InitProcess.Pid {
ec = e.ExitStatus
break eventLoop
}
case <-time.After(1 * time.Second):
if nec.Conn.Status() != nats.CONNECTED {
if sc.NatsConn().Status() != nats.CONNECTED {
break eventLoop
}
}
}
if _, err := executionService.Delete(gocontext.Background(), &execution.DeleteContainerRequest{
if _, err := executionService.DeleteContainer(gocontext.Background(), &execution.DeleteContainerRequest{
ID: cr.Container.ID,
}); err != nil {
return err
return errors.Wrap(err, "DeleteContainer RPC failed")
}
// Ensure we read all io
fwg.Wait()
restoreTerm()
os.Exit(int(ec))
if ec != 0 {
return cli.NewExitError("", int(ec))
}
return nil
},

View file

@ -14,8 +14,8 @@ import (
gocontext "context"
"github.com/Sirupsen/logrus"
"github.com/docker/containerd/api/execution"
"github.com/pkg/errors"
"github.com/tonistiigi/fifo"
"github.com/urfave/cli"
"google.golang.org/grpc"
@ -39,7 +39,6 @@ func prepareStdio(stdin, stdout, stderr string, console bool) (*sync.WaitGroup,
}(f)
go func(w io.WriteCloser) {
io.Copy(w, os.Stdin)
logrus.Info("stdin copy finished")
w.Close()
}(f)
@ -56,7 +55,6 @@ func prepareStdio(stdin, stdout, stderr string, console bool) (*sync.WaitGroup,
go func(r io.ReadCloser) {
io.Copy(os.Stdout, r)
r.Close()
logrus.Info("stdout copy finished")
wg.Done()
}(f)
@ -74,7 +72,6 @@ func prepareStdio(stdin, stdout, stderr string, console bool) (*sync.WaitGroup,
go func(r io.ReadCloser) {
io.Copy(os.Stderr, r)
r.Close()
logrus.Info("stderr copy finished")
wg.Done()
}(f)
}
@ -99,7 +96,7 @@ func getGRPCConnection(context *cli.Context) (*grpc.ClientConn, error) {
conn, err := grpc.Dial(fmt.Sprintf("unix://%s", bindSocket), dialOpts...)
if err != nil {
return nil, err
return nil, errors.Wrapf(err, "failed to dial %q", bindSocket)
}
grpcConn = conn

View file

@ -2,30 +2,48 @@ package events
import (
"context"
"strings"
"encoding/json"
"time"
"github.com/Sirupsen/logrus"
"github.com/docker/containerd/log"
nats "github.com/nats-io/go-nats"
"github.com/nats-io/go-nats-streaming"
"github.com/pkg/errors"
)
type natsPoster struct {
nec *nats.EncodedConn
sc stan.Conn
}
func GetNATSPoster(nec *nats.EncodedConn) Poster {
return &natsPoster{nec}
func NewNATSPoster(clusterID, clientID string) (Poster, error) {
sc, err := stan.Connect(clusterID, clientID, stan.ConnectWait(5*time.Second))
if err != nil {
return nil, errors.Wrap(err, "failed to connect to nats streaming server")
}
return &natsPoster{sc}, nil
}
func (p *natsPoster) Post(ctx context.Context, e Event) {
subject := strings.Replace(log.GetModulePath(ctx), "/", ".", -1)
topic := getTopic(ctx)
if topic != "" {
subject = strings.Join([]string{subject, topic}, ".")
if topic == "" {
log.G(ctx).WithField("event", e).Warn("unable to post event, topic is empty")
return
}
if subject == "" {
log.GetLogger(ctx).WithField("event", e).Warn("unable to post event, subject is empty")
data, err := json.Marshal(e)
if err != nil {
log.G(ctx).WithError(err).WithFields(logrus.Fields{"event": e, "topic": topic}).
Warn("unable to marshal event")
return
}
p.nec.Publish(subject, e)
err = p.sc.Publish(topic, data)
if err != nil {
log.G(ctx).WithError(err).WithFields(logrus.Fields{"event": e, "topic": topic}).
Warn("unable to post event")
}
log.G(ctx).WithFields(logrus.Fields{"event": e, "topic": topic}).
Debug("Posted event")
}

View file

@ -2,16 +2,20 @@ package execution
import "time"
const (
ExitEvent = "exit"
OOMEvent = "oom"
CreateEvent = "create"
StartEvent = "start"
ExecEvent = "exec-added9"
)
type ContainerEvent struct {
Timestamp time.Time
ID string
Action string
}
type ContainerExitEvent struct {
ContainerEvent
PID string
StatusCode uint32
Type string
Pid uint32
ExitStatus uint32
}
const (

View file

@ -1,429 +0,0 @@
package shim
import (
"context"
"encoding/json"
"io/ioutil"
"os"
"os/exec"
"path/filepath"
"strconv"
"sync"
"syscall"
"github.com/Sirupsen/logrus"
"github.com/docker/containerd/execution"
"github.com/docker/containerd/log"
"github.com/pkg/errors"
"golang.org/x/sys/unix"
runc "github.com/crosbymichael/go-runc"
starttime "github.com/opencontainers/runc/libcontainer/system"
)
type newProcessOpts struct {
shimBinary string
runtime string
runtimeArgs []string
container *execution.Container
exec bool
stateDir string
execution.StartProcessOpts
}
func validateNewProcessOpts(o newProcessOpts) error {
if o.shimBinary == "" {
return errors.New("shim binary not specified")
}
if o.runtime == "" {
return errors.New("runtime not specified")
}
if o.container == nil {
return errors.New("container not specified")
}
if o.container.ID() == "" {
return errors.New("container id not specified")
}
if o.container.Bundle() == "" {
return errors.New("bundle not specified")
}
if o.stateDir == "" {
return errors.New("state dir not specified")
}
return nil
}
func newProcess(ctx context.Context, o newProcessOpts) (p *process, err error) {
if err = validateNewProcessOpts(o); err != nil {
return
}
p = &process{
id: o.ID,
stateDir: o.stateDir,
exitChan: make(chan struct{}),
ctx: ctx,
}
defer func() {
if err != nil {
p.cleanup()
p = nil
}
}()
if err = os.Mkdir(o.stateDir, 0700); err != nil {
err = errors.Wrap(err, "failed to create process state dir")
return
}
p.exitPipe, p.controlPipe, err = getControlPipes(o.stateDir)
if err != nil {
return
}
cmd, err := newShimProcess(o)
if err != nil {
return
}
defer func() {
if err != nil {
cmd.Process.Kill()
cmd.Wait()
}
}()
abortCh := make(chan syscall.WaitStatus, 1)
go func() {
var shimStatus syscall.WaitStatus
if err := cmd.Wait(); err != nil {
shimStatus = execution.UnknownStatusCode
} else {
shimStatus = cmd.ProcessState.Sys().(syscall.WaitStatus)
}
abortCh <- shimStatus
close(abortCh)
}()
p.pid, p.startTime, p.status, err = waitUntilReady(ctx, abortCh, o.stateDir)
if err != nil {
return
}
return
}
func loadProcess(ctx context.Context, stateDir, id string) (p *process, err error) {
p = &process{
id: id,
stateDir: stateDir,
exitChan: make(chan struct{}),
status: execution.Running,
ctx: ctx,
}
defer func() {
if err != nil {
p.cleanup()
p = nil
}
}()
p.pid, err = getPidFromFile(filepath.Join(stateDir, pidFilename))
if err != nil {
err = errors.Wrap(err, "failed to read pid")
return
}
p.startTime, err = getStartTimeFromFile(filepath.Join(stateDir, startTimeFilename))
if err != nil {
return
}
path := filepath.Join(stateDir, exitPipeFilename)
p.exitPipe, err = os.OpenFile(path, syscall.O_RDONLY|syscall.O_NONBLOCK, 0)
if err != nil {
err = errors.Wrapf(err, "failed to open exit pipe")
return
}
path = filepath.Join(stateDir, controlPipeFilename)
p.controlPipe, err = os.OpenFile(path, syscall.O_RDWR|syscall.O_NONBLOCK, 0)
if err != nil {
err = errors.Wrapf(err, "failed to open control pipe")
return
}
markAsStopped := func(p *process) (*process, error) {
p.setStatus(execution.Stopped)
return p, nil
}
if err = syscall.Kill(int(p.pid), 0); err != nil {
if err == syscall.ESRCH {
return markAsStopped(p)
}
err = errors.Wrapf(err, "failed to check if process is still alive")
return
}
cstime, err := starttime.GetProcessStartTime(int(p.pid))
if err != nil {
if os.IsNotExist(err) {
return markAsStopped(p)
}
err = errors.Wrapf(err, "failed retrieve current process start time")
return
}
if p.startTime != cstime {
return markAsStopped(p)
}
return
}
type process struct {
stateDir string
id string
pid int64
exitChan chan struct{}
exitPipe *os.File
controlPipe *os.File
startTime string
status execution.Status
ctx context.Context
mu sync.Mutex
}
func (p *process) ID() string {
return p.id
}
func (p *process) Pid() int64 {
return p.pid
}
func (p *process) Wait() (uint32, error) {
<-p.exitChan
log.G(p.ctx).WithFields(logrus.Fields{"process-id": p.ID(), "pid": p.pid}).
Debugf("wait is over")
// Cleanup those fds
p.exitPipe.Close()
p.controlPipe.Close()
// If the container process is still alive, it means the shim crashed
// and the child process had updated it PDEATHSIG to something
// else than SIGKILL. Or that epollCtl failed
if p.isAlive() {
err := syscall.Kill(int(p.pid), syscall.SIGKILL)
if err != nil {
return execution.UnknownStatusCode, errors.Wrap(err, "failed to kill process")
}
return uint32(128 + int(syscall.SIGKILL)), nil
}
data, err := ioutil.ReadFile(filepath.Join(p.stateDir, exitStatusFilename))
if err != nil {
return execution.UnknownStatusCode, errors.Wrap(err, "failed to read process exit status")
}
if len(data) == 0 {
return execution.UnknownStatusCode, errors.New(execution.ErrProcessNotExited.Error())
}
status, err := strconv.Atoi(string(data))
if err != nil {
return execution.UnknownStatusCode, errors.Wrapf(err, "failed to parse exit status")
}
p.setStatus(execution.Stopped)
return uint32(status), nil
}
func (p *process) Signal(sig os.Signal) error {
err := syscall.Kill(int(p.pid), sig.(syscall.Signal))
if err != nil {
return errors.Wrap(err, "failed to signal process")
}
return nil
}
func (p *process) Status() execution.Status {
p.mu.Lock()
s := p.status
p.mu.Unlock()
return s
}
func (p *process) setStatus(s execution.Status) {
p.mu.Lock()
p.status = s
p.mu.Unlock()
}
func (p *process) isAlive() bool {
if err := syscall.Kill(int(p.pid), 0); err != nil {
if err == syscall.ESRCH {
return false
}
log.G(p.ctx).WithFields(logrus.Fields{"process-id": p.ID(), "pid": p.pid}).
Warnf("kill(0) failed: %v", err)
return false
}
// check that we have the same starttime
stime, err := starttime.GetProcessStartTime(int(p.pid))
if err != nil {
if os.IsNotExist(err) {
return false
}
log.G(p.ctx).WithFields(logrus.Fields{"process-id": p.ID(), "pid": p.pid}).
Warnf("failed to get process start time: %v", err)
return false
}
if p.startTime != stime {
return false
}
return true
}
func (p *process) cleanup() {
for _, f := range []*os.File{p.exitPipe, p.controlPipe} {
if f != nil {
f.Close()
}
}
if err := os.RemoveAll(p.stateDir); err != nil {
log.G(p.ctx).Warnf("failed to remove process state dir: %v", err)
}
}
func waitUntilReady(ctx context.Context, abortCh chan syscall.WaitStatus, root string) (pid int64, stime string, status execution.Status, err error) {
status = execution.Unknown
for {
select {
case <-ctx.Done():
return
case wait := <-abortCh:
if wait.Signaled() {
err = errors.Errorf("shim died prematurely: %v", wait.Signal())
return
}
err = errors.Errorf("shim exited prematurely with exit code %v", wait.ExitStatus())
return
default:
}
pid, err = getPidFromFile(filepath.Join(root, pidFilename))
if err == nil {
break
} else if !os.IsNotExist(err) {
return
}
}
status = execution.Created
stime, err = starttime.GetProcessStartTime(int(pid))
switch {
case os.IsNotExist(err):
status = execution.Stopped
case err != nil:
return
default:
var b []byte
path := filepath.Join(root, startTimeFilename)
b, err = ioutil.ReadFile(path)
switch {
case os.IsNotExist(err):
err = ioutil.WriteFile(path, []byte(stime), 0600)
if err != nil {
return
}
case err != nil:
err = errors.Wrapf(err, "failed to get start time for pid %d", pid)
return
case string(b) != stime:
status = execution.Stopped
}
}
return pid, stime, status, nil
}
func newShimProcess(o newProcessOpts) (*exec.Cmd, error) {
cmd := exec.Command(o.shimBinary, o.container.ID(), o.container.Bundle(), o.runtime)
cmd.Dir = o.stateDir
cmd.SysProcAttr = &syscall.SysProcAttr{
Setpgid: true,
}
state := processState{
Process: o.Spec,
Exec: o.exec,
Stdin: o.Stdin,
Stdout: o.Stdout,
Stderr: o.Stderr,
RuntimeArgs: o.runtimeArgs,
NoPivotRoot: false,
CheckpointPath: "",
RootUID: int(o.Spec.User.UID),
RootGID: int(o.Spec.User.GID),
}
f, err := os.Create(filepath.Join(o.stateDir, "process.json"))
if err != nil {
return nil, errors.Wrapf(err, "failed to create shim's process.json for container %s", o.container.ID())
}
defer f.Close()
if err := json.NewEncoder(f).Encode(state); err != nil {
return nil, errors.Wrapf(err, "failed to create shim's processState for container %s", o.container.ID())
}
if err := cmd.Start(); err != nil {
return nil, errors.Wrapf(err, "failed to start shim for container %s", o.container.ID())
}
return cmd, nil
}
func getControlPipes(root string) (exitPipe *os.File, controlPipe *os.File, err error) {
path := filepath.Join(root, exitPipeFilename)
if err = unix.Mkfifo(path, 0700); err != nil {
err = errors.Wrap(err, "failed to create shim exit fifo")
return
}
if exitPipe, err = os.OpenFile(path, syscall.O_RDONLY|syscall.O_NONBLOCK, 0); err != nil {
err = errors.Wrap(err, "failed to open shim exit fifo")
return
}
path = filepath.Join(root, controlPipeFilename)
if err = unix.Mkfifo(path, 0700); err != nil {
err = errors.Wrap(err, "failed to create shim control fifo")
return
}
if controlPipe, err = os.OpenFile(path, syscall.O_RDWR|syscall.O_NONBLOCK, 0); err != nil {
err = errors.Wrap(err, "failed to open shim control fifo")
return
}
return
}
func getPidFromFile(path string) (int64, error) {
pid, err := runc.ReadPidFile(path)
if err != nil {
return -1, err
}
return int64(pid), nil
}
func getStartTimeFromFile(path string) (string, error) {
stime, err := ioutil.ReadFile(path)
if err != nil {
return "", errors.Wrapf(err, "failed to read start time")
}
return string(stime), nil
}

View file

@ -1,431 +0,0 @@
package shim
import (
"bytes"
"context"
"encoding/json"
"io/ioutil"
"os"
"os/exec"
"path/filepath"
"sync"
"syscall"
"github.com/Sirupsen/logrus"
"github.com/docker/containerd/execution"
"github.com/docker/containerd/log"
"github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors"
)
const (
DefaultShimBinary = "containerd-shim"
pidFilename = "pid"
startTimeFilename = "starttime"
exitPipeFilename = "exit"
controlPipeFilename = "control"
exitStatusFilename = "exitStatus"
)
func New(ctx context.Context, root, shim, runtime string, runtimeArgs []string) (*ShimRuntime, error) {
fd, err := syscall.EpollCreate1(0)
if err != nil {
return nil, errors.Wrap(err, "epollcreate1 failed")
}
s := &ShimRuntime{
ctx: ctx,
epollFd: fd,
root: root,
binaryName: shim,
runtime: runtime,
runtimeArgs: runtimeArgs,
exitChannels: make(map[int]*process),
containers: make(map[string]*execution.Container),
}
s.loadContainers()
go s.monitor()
return s, nil
}
type ShimRuntime struct {
ctx context.Context
mutex sync.Mutex
exitChannels map[int]*process
containers map[string]*execution.Container
epollFd int
root string
binaryName string
runtime string
runtimeArgs []string
}
type ProcessOpts struct {
Bundle string
Terminal bool
Stdin string
Stdout string
Stderr string
}
type processState struct {
specs.Process
Exec bool `json:"exec"`
Stdin string `json:"containerdStdin"`
Stdout string `json:"containerdStdout"`
Stderr string `json:"containerdStderr"`
RuntimeArgs []string `json:"runtimeArgs"`
NoPivotRoot bool `json:"noPivotRoot"`
CheckpointPath string `json:"checkpoint"`
RootUID int `json:"rootUID"`
RootGID int `json:"rootGID"`
}
func (s *ShimRuntime) Create(ctx context.Context, id string, o execution.CreateOpts) (*execution.Container, error) {
log.G(s.ctx).WithFields(logrus.Fields{"container-id": id, "options": o}).Debug("Create()")
if s.getContainer(id) != nil {
return nil, execution.ErrContainerExists
}
containerCtx := log.WithModule(log.WithModule(ctx, "container"), id)
container, err := execution.NewContainer(containerCtx, filepath.Join(s.root, id), id, o.Bundle)
if err != nil {
return nil, err
}
defer func() {
if err != nil {
container.Cleanup()
}
}()
// extract Process spec from bundle's config.json
var spec specs.Spec
f, err := os.Open(filepath.Join(o.Bundle, "config.json"))
if err != nil {
return nil, errors.Wrap(err, "failed to open config.json")
}
defer f.Close()
if err := json.NewDecoder(f).Decode(&spec); err != nil {
return nil, errors.Wrap(err, "failed to decode container OCI specs")
}
processOpts := newProcessOpts{
shimBinary: s.binaryName,
runtime: s.runtime,
runtimeArgs: s.runtimeArgs,
container: container,
exec: false,
stateDir: container.ProcessStateDir(execution.InitProcessID),
StartProcessOpts: execution.StartProcessOpts{
ID: execution.InitProcessID,
Spec: spec.Process,
Console: o.Console,
Stdin: o.Stdin,
Stdout: o.Stdout,
Stderr: o.Stderr,
},
}
processCtx := log.WithModule(log.WithModule(containerCtx, "process"), execution.InitProcessID)
process, err := newProcess(processCtx, processOpts)
if err != nil {
return nil, err
}
s.monitorProcess(process)
container.AddProcess(process)
s.addContainer(container)
return container, nil
}
func (s *ShimRuntime) Start(ctx context.Context, c *execution.Container) error {
log.G(s.ctx).WithFields(logrus.Fields{"container": c}).Debug("Start()")
cmd := exec.CommandContext(ctx, s.runtime, append(s.runtimeArgs, "start", c.ID())...)
out, err := cmd.CombinedOutput()
if err != nil {
return errors.Wrapf(err, "'%s start' failed with output: %v", s.runtime, string(out))
}
return nil
}
func (s *ShimRuntime) List(ctx context.Context) ([]*execution.Container, error) {
log.G(s.ctx).Debug("List()")
containers := make([]*execution.Container, 0)
s.mutex.Lock()
for _, c := range s.containers {
containers = append(containers, c)
}
s.mutex.Unlock()
return containers, nil
}
func (s *ShimRuntime) Load(ctx context.Context, id string) (*execution.Container, error) {
log.G(s.ctx).WithFields(logrus.Fields{"container-id": id}).Debug("Start()")
s.mutex.Lock()
c, ok := s.containers[id]
s.mutex.Unlock()
if !ok {
return nil, errors.New(execution.ErrContainerNotFound.Error())
}
return c, nil
}
func (s *ShimRuntime) Delete(ctx context.Context, c *execution.Container) error {
log.G(s.ctx).WithFields(logrus.Fields{"container": c}).Debug("Delete()")
if c.Status() != execution.Stopped {
return errors.Errorf("cannot delete a container in the '%s' state", c.Status())
}
c.Cleanup()
s.removeContainer(c)
return nil
}
func (s *ShimRuntime) Pause(ctx context.Context, c *execution.Container) error {
log.G(s.ctx).WithFields(logrus.Fields{"container": c}).Debug("Pause()")
cmd := exec.CommandContext(ctx, s.runtime, append(s.runtimeArgs, "pause", c.ID())...)
out, err := cmd.CombinedOutput()
if err != nil {
return errors.Wrapf(err, "'%s pause' failed with output: %v", s.runtime, string(out))
}
return nil
}
func (s *ShimRuntime) Resume(ctx context.Context, c *execution.Container) error {
log.G(s.ctx).WithFields(logrus.Fields{"container": c}).Debug("Resume()")
cmd := exec.CommandContext(ctx, s.runtime, append(s.runtimeArgs, "resume", c.ID())...)
out, err := cmd.CombinedOutput()
if err != nil {
return errors.Wrapf(err, "'%s resume' failed with output: %v", s.runtime, string(out))
}
return nil
}
func (s *ShimRuntime) StartProcess(ctx context.Context, c *execution.Container, o execution.StartProcessOpts) (p execution.Process, err error) {
log.G(s.ctx).WithFields(logrus.Fields{"container": c, "options": o}).Debug("StartProcess()")
processOpts := newProcessOpts{
shimBinary: s.binaryName,
runtime: s.runtime,
runtimeArgs: s.runtimeArgs,
container: c,
exec: true,
stateDir: c.ProcessStateDir(o.ID),
StartProcessOpts: o,
}
processCtx := log.WithModule(log.WithModule(c.Context(), "process"), execution.InitProcessID)
process, err := newProcess(processCtx, processOpts)
if err != nil {
return nil, err
}
process.status = execution.Running
s.monitorProcess(process)
c.AddProcess(process)
return process, nil
}
func (s *ShimRuntime) SignalProcess(ctx context.Context, c *execution.Container, id string, sig os.Signal) error {
log.G(s.ctx).WithFields(logrus.Fields{"container": c, "process-id": id, "signal": sig}).
Debug("SignalProcess()")
process := c.GetProcess(id)
if process == nil {
return errors.Errorf("container %s has no process named %s", c.ID(), id)
}
err := syscall.Kill(int(process.Pid()), sig.(syscall.Signal))
if err != nil {
return errors.Wrapf(err, "failed to send %v signal to container %s process %v", sig, c.ID(), process.Pid())
}
return err
}
func (s *ShimRuntime) DeleteProcess(ctx context.Context, c *execution.Container, id string) error {
log.G(s.ctx).WithFields(logrus.Fields{"container": c, "process-id": id}).
Debug("DeleteProcess()")
if p := c.GetProcess(id); p != nil {
p.(*process).cleanup()
return c.RemoveProcess(id)
}
return errors.Errorf("container %s has no process named %s", c.ID(), id)
}
func (s *ShimRuntime) monitor() {
var events [128]syscall.EpollEvent
for {
n, err := syscall.EpollWait(s.epollFd, events[:], -1)
if err != nil {
if err == syscall.EINTR {
continue
}
log.G(s.ctx).Error("epollwait failed:", err)
}
for i := 0; i < n; i++ {
fd := int(events[i].Fd)
s.mutex.Lock()
p := s.exitChannels[fd]
delete(s.exitChannels, fd)
s.mutex.Unlock()
if err = syscall.EpollCtl(s.epollFd, syscall.EPOLL_CTL_DEL, fd, &syscall.EpollEvent{
Events: syscall.EPOLLHUP,
Fd: int32(fd),
}); err != nil {
log.G(s.ctx).Error("epollctl deletion failed:", err)
}
close(p.exitChan)
}
}
}
func (s *ShimRuntime) addContainer(c *execution.Container) {
s.mutex.Lock()
s.containers[c.ID()] = c
s.mutex.Unlock()
}
func (s *ShimRuntime) removeContainer(c *execution.Container) {
s.mutex.Lock()
delete(s.containers, c.ID())
s.mutex.Unlock()
}
func (s *ShimRuntime) getContainer(id string) *execution.Container {
s.mutex.Lock()
c := s.containers[id]
s.mutex.Unlock()
return c
}
// monitorProcess adds a process to the list of monitored process if
// we fail to do so, we closed the exitChan channel used by Wait().
// Since service always call on Wait() for generating "exit" events,
// this will ensure the process gets killed
func (s *ShimRuntime) monitorProcess(p *process) {
if p.status == execution.Stopped {
close(p.exitChan)
return
}
fd := int(p.exitPipe.Fd())
event := syscall.EpollEvent{
Fd: int32(fd),
Events: syscall.EPOLLHUP,
}
s.mutex.Lock()
s.exitChannels[fd] = p
s.mutex.Unlock()
if err := syscall.EpollCtl(s.epollFd, syscall.EPOLL_CTL_ADD, fd, &event); err != nil {
s.mutex.Lock()
delete(s.exitChannels, fd)
s.mutex.Unlock()
close(p.exitChan)
return
}
// TODO: take care of the OOM handler
}
func (s *ShimRuntime) unmonitorProcess(p *process) {
s.mutex.Lock()
for fd, proc := range s.exitChannels {
if proc == p {
delete(s.exitChannels, fd)
break
}
}
s.mutex.Unlock()
}
func (s *ShimRuntime) loadContainers() {
cs, err := ioutil.ReadDir(s.root)
if err != nil {
log.G(s.ctx).WithField("statedir", s.root).
Warn("failed to load containers, state dir cannot be listed:", err)
return
}
for _, c := range cs {
if !c.IsDir() {
continue
}
stateDir := filepath.Join(s.root, c.Name())
containerCtx := log.WithModule(log.WithModule(s.ctx, "container"), c.Name())
container, err := execution.LoadContainer(containerCtx, stateDir, c.Name())
if err != nil {
log.G(s.ctx).WithField("container-id", c.Name()).Warn(err)
continue
}
processDirs, err := container.ProcessesStateDir()
if err != nil {
log.G(s.ctx).WithField("container-id", c.Name()).Warn(err)
continue
}
for processID, processStateDir := range processDirs {
processCtx := log.WithModule(log.WithModule(containerCtx, "process"), processID)
var p *process
p, err = loadProcess(processCtx, processStateDir, processID)
if err != nil {
log.G(s.ctx).WithFields(logrus.Fields{"container-id": c.Name(), "process": processID}).Warn(err)
break
}
if processID == execution.InitProcessID && p.status == execution.Running {
p.status = s.loadContainerStatus(container.ID())
}
container.AddProcess(p)
}
// if successfull, add the container to our list
if err == nil {
for _, p := range container.Processes() {
s.monitorProcess(p.(*process))
}
s.addContainer(container)
log.G(s.ctx).Infof("restored container %s", container.ID())
}
}
}
func (s *ShimRuntime) loadContainerStatus(id string) execution.Status {
cmd := exec.Command(s.runtime, append(s.runtimeArgs, "state", id)...)
out, err := cmd.CombinedOutput()
if err != nil {
return execution.Unknown
}
var st struct{ Status string }
if err := json.NewDecoder(bytes.NewReader(out)).Decode(&st); err != nil {
return execution.Unknown
}
return execution.Status(st.Status)
}

View file

@ -1,12 +0,0 @@
package execution
type Supervisor struct {
}
type waiter interface {
Wait() (uint32, error)
}
func (s *Supervisor) Monitor(w waiter, cb func(uint32, error)) {
go cb(w.Wait())
}

View file

@ -1,7 +1,6 @@
package shim
import (
"fmt"
"sync"
"syscall"
@ -9,6 +8,7 @@ import (
apishim "github.com/docker/containerd/api/shim"
"github.com/docker/containerd/utils"
google_protobuf "github.com/golang/protobuf/ptypes/empty"
"github.com/pkg/errors"
"golang.org/x/net/context"
)
@ -25,6 +25,7 @@ func New() *Service {
type Service struct {
initProcess *initProcess
id string
bundle string
mu sync.Mutex
processes map[int]process
events chan *apishim.Event
@ -37,10 +38,11 @@ func (s *Service) Create(ctx context.Context, r *apishim.CreateRequest) (*apishi
return nil, err
}
s.mu.Lock()
s.id = r.ID
s.bundle = r.Bundle
s.initProcess = process
pid := process.Pid()
s.processes[pid] = process
s.id = r.ID
s.mu.Unlock()
s.events <- &apishim.Event{
Type: apishim.EventType_CREATE,
@ -69,7 +71,7 @@ func (s *Service) Delete(ctx context.Context, r *apishim.DeleteRequest) (*apishi
p, ok := s.processes[int(r.Pid)]
s.mu.Unlock()
if !ok {
return nil, fmt.Errorf("process does not exist %d", r.Pid)
return nil, errors.Errorf("process does not exist %d", r.Pid)
}
if err := p.Delete(ctx); err != nil {
return nil, err
@ -104,7 +106,7 @@ func (s *Service) Exec(ctx context.Context, r *apishim.ExecRequest) (*apishim.Ex
func (s *Service) Pty(ctx context.Context, r *apishim.PtyRequest) (*google_protobuf.Empty, error) {
if r.Pid == 0 {
return nil, fmt.Errorf("pid not provided in request")
return nil, errors.Errorf("pid not provided in request")
}
ws := console.WinSize{
Width: uint16(r.Width),
@ -114,7 +116,7 @@ func (s *Service) Pty(ctx context.Context, r *apishim.PtyRequest) (*google_proto
p, ok := s.processes[int(r.Pid)]
s.mu.Unlock()
if !ok {
return nil, fmt.Errorf("process does not exist %d", r.Pid)
return nil, errors.Errorf("process does not exist %d", r.Pid)
}
if err := p.Resize(ws); err != nil {
return nil, err
@ -134,6 +136,8 @@ func (s *Service) Events(r *apishim.EventsRequest, stream apishim.Shim_EventsSer
func (s *Service) State(ctx context.Context, r *apishim.StateRequest) (*apishim.StateResponse, error) {
o := &apishim.StateResponse{
ID: s.id,
Bundle: s.bundle,
InitPid: uint32(s.initProcess.Pid()),
Processes: []*apishim.Process{},
}
s.mu.Lock()

View file

@ -3,12 +3,19 @@ package supervisor
import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strings"
"sync"
"time"
api "github.com/docker/containerd/api/execution"
"github.com/docker/containerd/api/shim"
"github.com/docker/containerd/events"
"github.com/docker/containerd/execution"
"github.com/docker/containerd/log"
google_protobuf "github.com/golang/protobuf/ptypes/empty"
"github.com/pkg/errors"
"golang.org/x/net/context"
)
@ -19,16 +26,22 @@ var (
// New creates a new GRPC services for execution
func New(ctx context.Context, root string) (*Service, error) {
clients, err := loadClients(root)
ctx = log.WithModule(ctx, "supervisor")
log.G(ctx).WithField("root", root).Debugf("New()")
if err := os.MkdirAll(root, 0700); err != nil {
return nil, errors.Wrapf(err, "unable to create root directory %q", root)
}
clients, err := loadClients(ctx, root)
if err != nil {
return nil, err
}
s := &Service{
root: root,
shims: clients,
ctx: ctx,
}
for _, c := range clients {
if err := s.monitor(c); err != nil {
if err := s.monitor(events.GetPoster(ctx), c); err != nil {
return nil, err
}
}
@ -38,24 +51,23 @@ func New(ctx context.Context, root string) (*Service, error) {
type Service struct {
mu sync.Mutex
ctx context.Context
root string
shims map[string]shim.ShimClient
shims map[string]*shimClient
}
func (s *Service) Create(ctx context.Context, r *api.CreateContainerRequest) (*api.CreateContainerResponse, error) {
s.mu.Lock()
if _, ok := s.shims[r.ID]; ok {
s.mu.Unlock()
return nil, fmt.Errorf("container already exists %q", r.ID)
}
client, err := newShimClient(filepath.Join(s.root, r.ID))
func (s *Service) CreateContainer(ctx context.Context, r *api.CreateContainerRequest) (*api.CreateContainerResponse, error) {
client, err := s.newShim(r.ID)
if err != nil {
s.mu.Unlock()
return nil, err
}
s.shims[r.ID] = client
s.mu.Unlock()
if err := s.monitor(client); err != nil {
defer func() {
if err != nil {
s.removeShim(r.ID)
}
}()
if err := s.monitor(events.GetPoster(ctx), client); err != nil {
return nil, err
}
createResponse, err := client.Create(ctx, &shim.CreateRequest{
@ -67,8 +79,9 @@ func (s *Service) Create(ctx context.Context, r *api.CreateContainerRequest) (*a
Stderr: r.Stderr,
})
if err != nil {
return nil, err
return nil, errors.Wrapf(err, "shim create request failed")
}
client.initPid = createResponse.Pid
return &api.CreateContainerResponse{
Container: &api.Container{
ID: r.ID,
@ -79,7 +92,7 @@ func (s *Service) Create(ctx context.Context, r *api.CreateContainerRequest) (*a
}, nil
}
func (s *Service) Start(ctx context.Context, r *api.StartContainerRequest) (*google_protobuf.Empty, error) {
func (s *Service) StartContainer(ctx context.Context, r *api.StartContainerRequest) (*google_protobuf.Empty, error) {
client, err := s.getShim(r.ID)
if err != nil {
return nil, err
@ -90,21 +103,22 @@ func (s *Service) Start(ctx context.Context, r *api.StartContainerRequest) (*goo
return empty, nil
}
func (s *Service) Delete(ctx context.Context, r *api.DeleteContainerRequest) (*google_protobuf.Empty, error) {
func (s *Service) DeleteContainer(ctx context.Context, r *api.DeleteContainerRequest) (*google_protobuf.Empty, error) {
client, err := s.getShim(r.ID)
if err != nil {
return nil, err
}
_, err = client.Delete(ctx, &shim.DeleteRequest{
Pid: r.Pid,
Pid: client.initPid,
})
if err != nil {
return nil, err
}
s.removeShim(r.ID)
return empty, nil
}
func (s *Service) List(ctx context.Context, r *api.ListContainersRequest) (*api.ListContainersResponse, error) {
func (s *Service) ListContainers(ctx context.Context, r *api.ListContainersRequest) (*api.ListContainersResponse, error) {
resp := &api.ListContainersResponse{}
for _, client := range s.shims {
status, err := client.State(ctx, &shim.StateRequest{})
@ -118,7 +132,7 @@ func (s *Service) List(ctx context.Context, r *api.ListContainersRequest) (*api.
}
return resp, nil
}
func (s *Service) Get(ctx context.Context, r *api.GetContainerRequest) (*api.GetContainerResponse, error) {
func (s *Service) GetContainer(ctx context.Context, r *api.GetContainerRequest) (*api.GetContainerResponse, error) {
client, err := s.getShim(r.ID)
if err != nil {
return nil, err
@ -136,12 +150,12 @@ func (s *Service) Get(ctx context.Context, r *api.GetContainerRequest) (*api.Get
}, nil
}
func (s *Service) Update(ctx context.Context, r *api.UpdateContainerRequest) (*google_protobuf.Empty, error) {
func (s *Service) UpdateContainer(ctx context.Context, r *api.UpdateContainerRequest) (*google_protobuf.Empty, error) {
panic("not implemented")
return empty, nil
}
func (s *Service) Pause(ctx context.Context, r *api.PauseContainerRequest) (*google_protobuf.Empty, error) {
func (s *Service) PauseContainer(ctx context.Context, r *api.PauseContainerRequest) (*google_protobuf.Empty, error) {
client, err := s.getShim(r.ID)
if err != nil {
return nil, err
@ -149,7 +163,7 @@ func (s *Service) Pause(ctx context.Context, r *api.PauseContainerRequest) (*goo
return client.Pause(ctx, &shim.PauseRequest{})
}
func (s *Service) Resume(ctx context.Context, r *api.ResumeContainerRequest) (*google_protobuf.Empty, error) {
func (s *Service) ResumeContainer(ctx context.Context, r *api.ResumeContainerRequest) (*google_protobuf.Empty, error) {
client, err := s.getShim(r.ID)
if err != nil {
return nil, err
@ -158,7 +172,35 @@ func (s *Service) Resume(ctx context.Context, r *api.ResumeContainerRequest) (*g
}
func (s *Service) StartProcess(ctx context.Context, r *api.StartProcessRequest) (*api.StartProcessResponse, error) {
panic("not implemented")
client, err := s.getShim(r.ContainerID)
if err != nil {
return nil, err
}
er := &shim.ExecRequest{
Terminal: r.Console,
Stdin: r.Stdin,
Stdout: r.Stdout,
Stderr: r.Stderr,
Args: r.Process.Args,
Env: r.Process.Env,
Cwd: r.Process.Cwd,
}
if r.Process.User != nil {
er.User.Uid = r.Process.User.Uid
er.User.Gid = r.Process.User.Gid
er.User.AdditionalGids = r.Process.User.AdditionalGids
}
resp, err := client.Exec(ctx, er)
if err != nil {
return nil, errors.Wrapf(err, "failed to exec into container %q", r.ContainerID)
}
r.Process.Pid = resp.Pid
return &api.StartProcessResponse{
Process: r.Process,
}, nil
}
// containerd managed execs + system pids forked in container
@ -171,7 +213,20 @@ func (s *Service) SignalProcess(ctx context.Context, r *api.SignalProcessRequest
}
func (s *Service) DeleteProcess(ctx context.Context, r *api.DeleteProcessRequest) (*google_protobuf.Empty, error) {
panic("not implemented")
client, err := s.getShim(r.ContainerID)
if err != nil {
return nil, err
}
_, err = client.Delete(ctx, &shim.DeleteRequest{
Pid: r.Pid,
})
if err != nil {
return nil, err
}
if r.Pid == client.initPid {
s.removeShim(r.ContainerID)
}
return empty, nil
}
func (s *Service) ListProcesses(ctx context.Context, r *api.ListProcessesRequest) (*api.ListProcessesResponse, error) {
@ -180,13 +235,65 @@ func (s *Service) ListProcesses(ctx context.Context, r *api.ListProcessesRequest
// monitor monitors the shim's event rpc and forwards container and process
// events to callers
func (s *Service) monitor(client shim.ShimClient) error {
func (s *Service) monitor(poster events.Poster, client *shimClient) error {
// we use the service context here because we don't want to be
// tied to the Create rpc call
stream, err := client.Events(s.ctx, &shim.EventsRequest{})
if err != nil {
return errors.Wrapf(err, "failed to get events stream for client at %q", client.root)
}
go func() {
for {
e, err := stream.Recv()
if err != nil {
if err.Error() == "EOF" || strings.Contains(err.Error(), "transport is closing") {
break
}
log.G(s.ctx).WithError(err).WithField("container", client.id).
Warnf("event stream for client at %q got terminated", client.root)
break
}
var topic string
if e.Type == shim.EventType_CREATE {
topic = "containers"
} else {
topic = fmt.Sprintf("containers.%s", e.ID)
}
ctx := events.WithTopic(s.ctx, topic)
poster.Post(ctx, execution.ContainerEvent{
Timestamp: time.Now(),
ID: e.ID,
Type: toExecutionEventType(e.Type),
Pid: e.Pid,
ExitStatus: e.ExitStatus,
})
}
}()
return nil
}
func (s *Service) getShim(id string) (shim.ShimClient, error) {
func (s *Service) newShim(id string) (*shimClient, error) {
s.mu.Lock()
defer s.mu.Unlock()
if _, ok := s.shims[id]; ok {
return nil, errors.Errorf("container %q already exists", id)
}
client, err := newShimClient(filepath.Join(s.root, id), id)
if err != nil {
return nil, err
}
s.shims[id] = client
return client, nil
}
func (s *Service) getShim(id string) (*shimClient, error) {
s.mu.Lock()
defer s.mu.Unlock()
client, ok := s.shims[id]
if !ok {
return nil, fmt.Errorf("container does not exist %q", id)
@ -194,22 +301,40 @@ func (s *Service) getShim(id string) (shim.ShimClient, error) {
return client, nil
}
func loadClients(root string) (map[string]shim.ShimClient, error) {
func (s *Service) removeShim(id string) {
s.mu.Lock()
defer s.mu.Unlock()
client, ok := s.shims[id]
if ok {
client.stop()
delete(s.shims, id)
}
}
func loadClients(ctx context.Context, root string) (map[string]*shimClient, error) {
files, err := ioutil.ReadDir(root)
if err != nil {
return nil, err
}
out := make(map[string]shim.ShimClient)
out := make(map[string]*shimClient)
for _, f := range files {
if !f.IsDir() {
continue
}
socket := filepath.Join(root, f.Name(), "shim.sock")
client, err := connectToShim(socket)
//
id := f.Name()
client, err := loadShimClient(filepath.Join(root, id), id)
if err != nil {
return nil, err
log.G(ctx).WithError(err).WithField("id", id).Warn("failed to load container")
// TODO: send an exit event with 255 as exit status
continue
}
out[f.Name()] = client
}
return out, nil
}
func toExecutionEventType(et shim.EventType) string {
return strings.Replace(strings.ToLower(et.String()), "_", "-", -1)
}

View file

@ -1,29 +1,106 @@
package supervisor
import (
"context"
"fmt"
"io/ioutil"
"log"
"net"
"os"
"os/exec"
"path/filepath"
"syscall"
"time"
"github.com/docker/containerd/api/shim"
"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"
)
func newShimClient(root string) (shim.ShimClient, error) {
// TODO: start the shim process
cmd := exec.Command("containerd-shim")
if err := cmd.Start(); err != nil {
return nil, err
func newShimClient(root, id string) (*shimClient, error) {
if err := os.Mkdir(root, 0700); err != nil {
return nil, errors.Wrap(err, "failed to create shim working dir")
}
cmd := exec.Command("containerd-shim")
cmd.Dir = root
cmd.SysProcAttr = &syscall.SysProcAttr{
Setpgid: true,
}
if err := cmd.Start(); err != nil {
return nil, errors.Wrapf(err, "failed to start shim")
}
socket := filepath.Join(root, "shim.sock")
return connectToShim(socket)
sc, err := connectToShim(socket)
if err != nil {
syscall.Kill(cmd.Process.Pid, syscall.SIGKILL)
cmd.Wait()
return nil, err
}
s := &shimClient{
ShimClient: sc,
shimCmd: cmd,
syncCh: make(chan struct{}),
root: root,
id: id,
}
go func() {
cmd.Wait()
close(s.syncCh)
}()
return s, nil
}
func loadShimClient(root, id string) (*shimClient, error) {
socket := filepath.Join(root, "shim.sock")
client, err := connectToShim(socket)
if err != nil {
// TODO: failed to connect to the shim, check if it's alive
// - if it is kill it
// - in both case call runc killall and runc delete on the id
return nil, err
}
resp, err := client.State(context.Background(), &shim.StateRequest{})
if err != nil {
return nil, errors.Wrapf(err, "failed to fetch state for container %s", id)
}
return &shimClient{
ShimClient: client,
root: root,
id: id,
initPid: resp.InitPid,
}, nil
}
type shimClient struct {
shim.ShimClient
shimCmd *exec.Cmd
syncCh chan struct{}
root string
id string
initPid uint32
}
func (s *shimClient) stop() {
if s.shimCmd != nil {
select {
case <-s.syncCh:
default:
syscall.Kill(s.shimCmd.Process.Pid, syscall.SIGTERM)
select {
case <-s.syncCh:
case <-time.After(10 * time.Second):
syscall.Kill(s.shimCmd.Process.Pid, syscall.SIGKILL)
}
}
}
os.RemoveAll(s.root)
}
func connectToShim(socket string) (shim.ShimClient, error) {
@ -33,12 +110,13 @@ func connectToShim(socket string) (shim.ShimClient, error) {
dialOpts = append(dialOpts,
grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("unix", socket, timeout)
},
))
// FIXME: probably need a retry here
}),
grpc.WithBlock(),
grpc.WithTimeout(2*time.Second),
)
conn, err := grpc.Dial(fmt.Sprintf("unix://%s", socket), dialOpts...)
if err != nil {
return nil, err
return nil, errors.Wrapf(err, "failed to connect to shim via \"%s\"", fmt.Sprintf("unix://%s", socket))
}
return shim.NewShimClient(conn), nil
}

476
vendor/github.com/nats-io/go-nats-streaming/stan.go generated vendored Normal file
View file

@ -0,0 +1,476 @@
// Copyright 2016 Apcera Inc. All rights reserved.
// Package stan is a Go client for the NATS Streaming messaging system (https://nats.io).
package stan
import (
"errors"
"fmt"
"runtime"
"sync"
"time"
"github.com/nats-io/go-nats"
"github.com/nats-io/go-nats-streaming/pb"
"github.com/nats-io/nuid"
)
// Version is the NATS Streaming Go Client version
const Version = "0.3.4"
const (
// DefaultNatsURL is the default URL the client connects to
DefaultNatsURL = "nats://localhost:4222"
// DefaultConnectWait is the default timeout used for the connect operation
DefaultConnectWait = 2 * time.Second
// DefaultDiscoverPrefix is the prefix subject used to connect to the NATS Streaming server
DefaultDiscoverPrefix = "_STAN.discover"
// DefaultACKPrefix is the prefix subject used to send ACKs to the NATS Streaming server
DefaultACKPrefix = "_STAN.acks"
// DefaultMaxPubAcksInflight is the default maximum number of published messages
// without outstanding ACKs from the server
DefaultMaxPubAcksInflight = 16384
)
// Conn represents a connection to the NATS Streaming subsystem. It can Publish and
// Subscribe to messages within the NATS Streaming cluster.
type Conn interface {
// Publish
Publish(subject string, data []byte) error
PublishAsync(subject string, data []byte, ah AckHandler) (string, error)
// Subscribe
Subscribe(subject string, cb MsgHandler, opts ...SubscriptionOption) (Subscription, error)
// QueueSubscribe
QueueSubscribe(subject, qgroup string, cb MsgHandler, opts ...SubscriptionOption) (Subscription, error)
// Close
Close() error
// NatsConn returns the underlying NATS conn. Use this with care. For
// example, closing the wrapped NATS conn will put the NATS Streaming Conn
// in an invalid state.
NatsConn() *nats.Conn
}
// Errors
var (
ErrConnectReqTimeout = errors.New("stan: connect request timeout")
ErrCloseReqTimeout = errors.New("stan: close request timeout")
ErrSubReqTimeout = errors.New("stan: subscribe request timeout")
ErrUnsubReqTimeout = errors.New("stan: unsubscribe request timeout")
ErrConnectionClosed = errors.New("stan: connection closed")
ErrTimeout = errors.New("stan: publish ack timeout")
ErrBadAck = errors.New("stan: malformed ack")
ErrBadSubscription = errors.New("stan: invalid subscription")
ErrBadConnection = errors.New("stan: invalid connection")
ErrManualAck = errors.New("stan: cannot manually ack in auto-ack mode")
ErrNilMsg = errors.New("stan: nil message")
ErrNoServerSupport = errors.New("stan: not supported by server")
)
// AckHandler is used for Async Publishing to provide status of the ack.
// The func will be passed teh GUID and any error state. No error means the
// message was successfully received by NATS Streaming.
type AckHandler func(string, error)
// Options can be used to a create a customized connection.
type Options struct {
NatsURL string
NatsConn *nats.Conn
ConnectTimeout time.Duration
AckTimeout time.Duration
DiscoverPrefix string
MaxPubAcksInflight int
}
// DefaultOptions are the NATS Streaming client's default options
var DefaultOptions = Options{
NatsURL: DefaultNatsURL,
ConnectTimeout: DefaultConnectWait,
AckTimeout: DefaultAckWait,
DiscoverPrefix: DefaultDiscoverPrefix,
MaxPubAcksInflight: DefaultMaxPubAcksInflight,
}
// Option is a function on the options for a connection.
type Option func(*Options) error
// NatsURL is an Option to set the URL the client should connect to.
func NatsURL(u string) Option {
return func(o *Options) error {
o.NatsURL = u
return nil
}
}
// ConnectWait is an Option to set the timeout for establishing a connection.
func ConnectWait(t time.Duration) Option {
return func(o *Options) error {
o.ConnectTimeout = t
return nil
}
}
// PubAckWait is an Option to set the timeout for waiting for an ACK for a
// published message.
func PubAckWait(t time.Duration) Option {
return func(o *Options) error {
o.AckTimeout = t
return nil
}
}
// MaxPubAcksInflight is an Option to set the maximum number of published
// messages without outstanding ACKs from the server.
func MaxPubAcksInflight(max int) Option {
return func(o *Options) error {
o.MaxPubAcksInflight = max
return nil
}
}
// NatsConn is an Option to set the underlying NATS connection to be used
// by a NATS Streaming Conn object.
func NatsConn(nc *nats.Conn) Option {
return func(o *Options) error {
o.NatsConn = nc
return nil
}
}
// A conn represents a bare connection to a stan cluster.
type conn struct {
sync.RWMutex
clientID string
serverID string
pubPrefix string // Publish prefix set by stan, append our subject.
subRequests string // Subject to send subscription requests.
unsubRequests string // Subject to send unsubscribe requests.
subCloseRequests string // Subject to send subscription close requests.
closeRequests string // Subject to send close requests.
ackSubject string // publish acks
ackSubscription *nats.Subscription
hbSubscription *nats.Subscription
subMap map[string]*subscription
pubAckMap map[string]*ack
pubAckChan chan (struct{})
opts Options
nc *nats.Conn
ncOwned bool // NATS Streaming created the connection, so needs to close it.
}
// Closure for ack contexts.
type ack struct {
t *time.Timer
ah AckHandler
ch chan error
}
// Connect will form a connection to the NATS Streaming subsystem.
func Connect(stanClusterID, clientID string, options ...Option) (Conn, error) {
// Process Options
c := conn{clientID: clientID, opts: DefaultOptions}
for _, opt := range options {
if err := opt(&c.opts); err != nil {
return nil, err
}
}
// Check if the user has provided a connection as an option
c.nc = c.opts.NatsConn
// Create a NATS connection if it doesn't exist.
if c.nc == nil {
nc, err := nats.Connect(c.opts.NatsURL)
if err != nil {
return nil, err
}
c.nc = nc
c.ncOwned = true
} else if !c.nc.IsConnected() {
// Bail if the custom NATS connection is disconnected
return nil, ErrBadConnection
}
// Create a heartbeat inbox
hbInbox := nats.NewInbox()
var err error
if c.hbSubscription, err = c.nc.Subscribe(hbInbox, c.processHeartBeat); err != nil {
c.Close()
return nil, err
}
// Send Request to discover the cluster
discoverSubject := c.opts.DiscoverPrefix + "." + stanClusterID
req := &pb.ConnectRequest{ClientID: clientID, HeartbeatInbox: hbInbox}
b, _ := req.Marshal()
reply, err := c.nc.Request(discoverSubject, b, c.opts.ConnectTimeout)
if err != nil {
c.Close()
if err == nats.ErrTimeout {
return nil, ErrConnectReqTimeout
}
return nil, err
}
// Process the response, grab server pubPrefix
cr := &pb.ConnectResponse{}
err = cr.Unmarshal(reply.Data)
if err != nil {
c.Close()
return nil, err
}
if cr.Error != "" {
c.Close()
return nil, errors.New(cr.Error)
}
// Capture cluster configuration endpoints to publish and subscribe/unsubscribe.
c.pubPrefix = cr.PubPrefix
c.subRequests = cr.SubRequests
c.unsubRequests = cr.UnsubRequests
c.subCloseRequests = cr.SubCloseRequests
c.closeRequests = cr.CloseRequests
// Setup the ACK subscription
c.ackSubject = DefaultACKPrefix + "." + nuid.Next()
if c.ackSubscription, err = c.nc.Subscribe(c.ackSubject, c.processAck); err != nil {
c.Close()
return nil, err
}
c.ackSubscription.SetPendingLimits(1024*1024, 32*1024*1024)
c.pubAckMap = make(map[string]*ack)
// Create Subscription map
c.subMap = make(map[string]*subscription)
c.pubAckChan = make(chan struct{}, c.opts.MaxPubAcksInflight)
// Attach a finalizer
runtime.SetFinalizer(&c, func(sc *conn) { sc.Close() })
return &c, nil
}
// Close a connection to the stan system.
func (sc *conn) Close() error {
if sc == nil {
return ErrBadConnection
}
sc.Lock()
defer sc.Unlock()
if sc.nc == nil {
// We are already closed.
return nil
}
// Capture for NATS calls below.
nc := sc.nc
if sc.ncOwned {
defer nc.Close()
}
// Signals we are closed.
sc.nc = nil
// Now close ourselves.
if sc.ackSubscription != nil {
sc.ackSubscription.Unsubscribe()
}
req := &pb.CloseRequest{ClientID: sc.clientID}
b, _ := req.Marshal()
reply, err := nc.Request(sc.closeRequests, b, sc.opts.ConnectTimeout)
if err != nil {
if err == nats.ErrTimeout {
return ErrCloseReqTimeout
}
return err
}
cr := &pb.CloseResponse{}
err = cr.Unmarshal(reply.Data)
if err != nil {
return err
}
if cr.Error != "" {
return errors.New(cr.Error)
}
return nil
}
// NatsConn returns the underlying NATS conn. Use this with care. For example,
// closing the wrapped NATS conn will put the NATS Streaming Conn in an invalid
// state.
func (sc *conn) NatsConn() *nats.Conn {
return sc.nc
}
// Process a heartbeat from the NATS Streaming cluster
func (sc *conn) processHeartBeat(m *nats.Msg) {
// No payload assumed, just reply.
sc.RLock()
nc := sc.nc
sc.RUnlock()
if nc != nil {
nc.Publish(m.Reply, nil)
}
}
// Process an ack from the NATS Streaming cluster
func (sc *conn) processAck(m *nats.Msg) {
pa := &pb.PubAck{}
err := pa.Unmarshal(m.Data)
if err != nil {
// FIXME, make closure to have context?
fmt.Printf("Error processing unmarshal\n")
return
}
// Remove
a := sc.removeAck(pa.Guid)
if a != nil {
// Capture error if it exists.
if pa.Error != "" {
err = errors.New(pa.Error)
}
if a.ah != nil {
// Perform the ackHandler callback
a.ah(pa.Guid, err)
} else if a.ch != nil {
// Send to channel directly
a.ch <- err
}
}
}
// Publish will publish to the cluster and wait for an ACK.
func (sc *conn) Publish(subject string, data []byte) error {
ch := make(chan error)
_, err := sc.publishAsync(subject, data, nil, ch)
if err == nil {
err = <-ch
}
return err
}
// PublishAsync will publish to the cluster on pubPrefix+subject and asynchronously
// process the ACK or error state. It will return the GUID for the message being sent.
func (sc *conn) PublishAsync(subject string, data []byte, ah AckHandler) (string, error) {
return sc.publishAsync(subject, data, ah, nil)
}
func (sc *conn) publishAsync(subject string, data []byte, ah AckHandler, ch chan error) (string, error) {
a := &ack{ah: ah, ch: ch}
sc.Lock()
if sc.nc == nil {
sc.Unlock()
return "", ErrConnectionClosed
}
subj := sc.pubPrefix + "." + subject
// This is only what we need from PubMsg in the timer below,
// so do this so that pe doesn't escape (and we same on new object)
peGUID := nuid.Next()
pe := &pb.PubMsg{ClientID: sc.clientID, Guid: peGUID, Subject: subject, Data: data}
b, _ := pe.Marshal()
// Map ack to guid.
sc.pubAckMap[peGUID] = a
// snapshot
ackSubject := sc.ackSubject
ackTimeout := sc.opts.AckTimeout
pac := sc.pubAckChan
sc.Unlock()
// Use the buffered channel to control the number of outstanding acks.
pac <- struct{}{}
err := sc.nc.PublishRequest(subj, ackSubject, b)
if err != nil {
sc.removeAck(peGUID)
return "", err
}
// Setup the timer for expiration.
sc.Lock()
a.t = time.AfterFunc(ackTimeout, func() {
sc.removeAck(peGUID)
if a.ah != nil {
ah(peGUID, ErrTimeout)
} else if a.ch != nil {
a.ch <- ErrTimeout
}
})
sc.Unlock()
return peGUID, nil
}
// removeAck removes the ack from the pubAckMap and cancels any state, e.g. timers
func (sc *conn) removeAck(guid string) *ack {
var t *time.Timer
sc.Lock()
a := sc.pubAckMap[guid]
if a != nil {
t = a.t
delete(sc.pubAckMap, guid)
}
pac := sc.pubAckChan
sc.Unlock()
// Cancel timer if needed.
if t != nil {
t.Stop()
}
// Remove from channel to unblock PublishAsync
if a != nil && len(pac) > 0 {
<-pac
}
return a
}
// Process an msg from the NATS Streaming cluster
func (sc *conn) processMsg(raw *nats.Msg) {
msg := &Msg{}
err := msg.Unmarshal(raw.Data)
if err != nil {
panic("Error processing unmarshal for msg")
}
// Lookup the subscription
sc.RLock()
nc := sc.nc
isClosed := nc == nil
sub := sc.subMap[raw.Subject]
sc.RUnlock()
// Check if sub is no longer valid or connection has been closed.
if sub == nil || isClosed {
return
}
// Store in msg for backlink
msg.Sub = sub
sub.RLock()
cb := sub.cb
ackSubject := sub.ackInbox
isManualAck := sub.opts.ManualAcks
subsc := sub.sc // Can be nil if sub has been unsubscribed.
sub.RUnlock()
// Perform the callback
if cb != nil && subsc != nil {
cb(msg)
}
// Proces auto-ack
if !isManualAck && nc != nil {
ack := &pb.Ack{Subject: msg.Subject, Sequence: msg.Sequence}
b, _ := ack.Marshal()
if err := nc.Publish(ackSubject, b); err != nil {
// FIXME(dlc) - Async error handler? Retry?
}
}
}

376
vendor/github.com/nats-io/go-nats-streaming/sub.go generated vendored Normal file
View file

@ -0,0 +1,376 @@
// Copyright 2016 Apcera Inc. All rights reserved.
// Package stan is a Go client for the NATS Streaming messaging system (https://nats.io).
package stan
import (
"errors"
"sync"
"time"
"github.com/nats-io/go-nats"
"github.com/nats-io/go-nats-streaming/pb"
)
const (
// DefaultAckWait indicates how long the server should wait for an ACK before resending a message
DefaultAckWait = 30 * time.Second
// DefaultMaxInflight indicates how many messages with outstanding ACKs the server can send
DefaultMaxInflight = 1024
)
// Msg is the client defined message, which includes proto, then back link to subscription.
type Msg struct {
pb.MsgProto // MsgProto: Seq, Subject, Reply[opt], Data, Timestamp, CRC32[opt]
Sub Subscription
}
// Subscriptions and Options
// Subscription represents a subscription within the NATS Streaming cluster. Subscriptions
// will be rate matched and follow at-least delivery semantics.
type Subscription interface {
// Unsubscribe removes interest in the subscription.
// For durables, it means that the durable interest is also removed from
// the server. Restarting a durable with the same name will not resume
// the subscription, it will be considered a new one.
Unsubscribe() error
// Close removes this subscriber from the server, but unlike Unsubscribe(),
// the durable interest is not removed. If the client has connected to a server
// for which this feature is not available, Close() will return a ErrNoServerSupport
// error.
Close() error
}
// A subscription represents a subscription to a stan cluster.
type subscription struct {
sync.RWMutex
sc *conn
subject string
qgroup string
inbox string
ackInbox string
inboxSub *nats.Subscription
opts SubscriptionOptions
cb MsgHandler
}
// SubscriptionOption is a function on the options for a subscription.
type SubscriptionOption func(*SubscriptionOptions) error
// MsgHandler is a callback function that processes messages delivered to
// asynchronous subscribers.
type MsgHandler func(msg *Msg)
// SubscriptionOptions are used to control the Subscription's behavior.
type SubscriptionOptions struct {
// DurableName, if set will survive client restarts.
DurableName string
// Controls the number of messages the cluster will have inflight without an ACK.
MaxInflight int
// Controls the time the cluster will wait for an ACK for a given message.
AckWait time.Duration
// StartPosition enum from proto.
StartAt pb.StartPosition
// Optional start sequence number.
StartSequence uint64
// Optional start time.
StartTime time.Time
// Option to do Manual Acks
ManualAcks bool
}
// DefaultSubscriptionOptions are the default subscriptions' options
var DefaultSubscriptionOptions = SubscriptionOptions{
MaxInflight: DefaultMaxInflight,
AckWait: DefaultAckWait,
}
// MaxInflight is an Option to set the maximum number of messages the cluster will send
// without an ACK.
func MaxInflight(m int) SubscriptionOption {
return func(o *SubscriptionOptions) error {
o.MaxInflight = m
return nil
}
}
// AckWait is an Option to set the timeout for waiting for an ACK from the cluster's
// point of view for delivered messages.
func AckWait(t time.Duration) SubscriptionOption {
return func(o *SubscriptionOptions) error {
o.AckWait = t
return nil
}
}
// StartAt sets the desired start position for the message stream.
func StartAt(sp pb.StartPosition) SubscriptionOption {
return func(o *SubscriptionOptions) error {
o.StartAt = sp
return nil
}
}
// StartAtSequence sets the desired start sequence position and state.
func StartAtSequence(seq uint64) SubscriptionOption {
return func(o *SubscriptionOptions) error {
o.StartAt = pb.StartPosition_SequenceStart
o.StartSequence = seq
return nil
}
}
// StartAtTime sets the desired start time position and state.
func StartAtTime(start time.Time) SubscriptionOption {
return func(o *SubscriptionOptions) error {
o.StartAt = pb.StartPosition_TimeDeltaStart
o.StartTime = start
return nil
}
}
// StartAtTimeDelta sets the desired start time position and state using the delta.
func StartAtTimeDelta(ago time.Duration) SubscriptionOption {
return func(o *SubscriptionOptions) error {
o.StartAt = pb.StartPosition_TimeDeltaStart
o.StartTime = time.Now().Add(-ago)
return nil
}
}
// StartWithLastReceived is a helper function to set start position to last received.
func StartWithLastReceived() SubscriptionOption {
return func(o *SubscriptionOptions) error {
o.StartAt = pb.StartPosition_LastReceived
return nil
}
}
// DeliverAllAvailable will deliver all messages available.
func DeliverAllAvailable() SubscriptionOption {
return func(o *SubscriptionOptions) error {
o.StartAt = pb.StartPosition_First
return nil
}
}
// SetManualAckMode will allow clients to control their own acks to delivered messages.
func SetManualAckMode() SubscriptionOption {
return func(o *SubscriptionOptions) error {
o.ManualAcks = true
return nil
}
}
// DurableName sets the DurableName for the subcriber.
func DurableName(name string) SubscriptionOption {
return func(o *SubscriptionOptions) error {
o.DurableName = name
return nil
}
}
// Subscribe will perform a subscription with the given options to the NATS Streaming cluster.
func (sc *conn) Subscribe(subject string, cb MsgHandler, options ...SubscriptionOption) (Subscription, error) {
return sc.subscribe(subject, "", cb, options...)
}
// QueueSubscribe will perform a queue subscription with the given options to the NATS Streaming cluster.
func (sc *conn) QueueSubscribe(subject, qgroup string, cb MsgHandler, options ...SubscriptionOption) (Subscription, error) {
return sc.subscribe(subject, qgroup, cb, options...)
}
// subscribe will perform a subscription with the given options to the NATS Streaming cluster.
func (sc *conn) subscribe(subject, qgroup string, cb MsgHandler, options ...SubscriptionOption) (Subscription, error) {
sub := &subscription{subject: subject, qgroup: qgroup, inbox: nats.NewInbox(), cb: cb, sc: sc, opts: DefaultSubscriptionOptions}
for _, opt := range options {
if err := opt(&sub.opts); err != nil {
return nil, err
}
}
sc.Lock()
if sc.nc == nil {
sc.Unlock()
return nil, ErrConnectionClosed
}
// Register subscription.
sc.subMap[sub.inbox] = sub
nc := sc.nc
sc.Unlock()
// Hold lock throughout.
sub.Lock()
defer sub.Unlock()
// Listen for actual messages.
nsub, err := nc.Subscribe(sub.inbox, sc.processMsg)
if err != nil {
return nil, err
}
sub.inboxSub = nsub
// Create a subscription request
// FIXME(dlc) add others.
sr := &pb.SubscriptionRequest{
ClientID: sc.clientID,
Subject: subject,
QGroup: qgroup,
Inbox: sub.inbox,
MaxInFlight: int32(sub.opts.MaxInflight),
AckWaitInSecs: int32(sub.opts.AckWait / time.Second),
StartPosition: sub.opts.StartAt,
DurableName: sub.opts.DurableName,
}
// Conditionals
switch sr.StartPosition {
case pb.StartPosition_TimeDeltaStart:
sr.StartTimeDelta = time.Now().UnixNano() - sub.opts.StartTime.UnixNano()
case pb.StartPosition_SequenceStart:
sr.StartSequence = sub.opts.StartSequence
}
b, _ := sr.Marshal()
reply, err := sc.nc.Request(sc.subRequests, b, sc.opts.ConnectTimeout)
if err != nil {
sub.inboxSub.Unsubscribe()
if err == nats.ErrTimeout {
err = ErrSubReqTimeout
}
return nil, err
}
r := &pb.SubscriptionResponse{}
if err := r.Unmarshal(reply.Data); err != nil {
sub.inboxSub.Unsubscribe()
return nil, err
}
if r.Error != "" {
sub.inboxSub.Unsubscribe()
return nil, errors.New(r.Error)
}
sub.ackInbox = r.AckInbox
return sub, nil
}
// closeOrUnsubscribe performs either close or unsubsribe based on
// given boolean.
func (sub *subscription) closeOrUnsubscribe(doClose bool) error {
if sub == nil {
return ErrBadSubscription
}
sub.Lock()
sc := sub.sc
if sc == nil {
// Already closed.
sub.Unlock()
return ErrBadSubscription
}
sub.sc = nil
sub.inboxSub.Unsubscribe()
sub.inboxSub = nil
sub.Unlock()
if sc == nil {
return ErrBadSubscription
}
sc.Lock()
if sc.nc == nil {
sc.Unlock()
return ErrConnectionClosed
}
delete(sc.subMap, sub.inbox)
reqSubject := sc.unsubRequests
if doClose {
reqSubject = sc.subCloseRequests
if reqSubject == "" {
sc.Unlock()
return ErrNoServerSupport
}
}
// Snapshot connection to avoid data race, since the connection may be
// closing while we try to send the request
nc := sc.nc
sc.Unlock()
usr := &pb.UnsubscribeRequest{
ClientID: sc.clientID,
Subject: sub.subject,
Inbox: sub.ackInbox,
}
b, _ := usr.Marshal()
reply, err := nc.Request(reqSubject, b, sc.opts.ConnectTimeout)
if err != nil {
if err == nats.ErrTimeout {
if doClose {
return ErrCloseReqTimeout
}
return ErrUnsubReqTimeout
}
return err
}
r := &pb.SubscriptionResponse{}
if err := r.Unmarshal(reply.Data); err != nil {
return err
}
if r.Error != "" {
return errors.New(r.Error)
}
return nil
}
// Unsubscribe implements the Subscription interface
func (sub *subscription) Unsubscribe() error {
return sub.closeOrUnsubscribe(false)
}
// Close implements the Subscription interface
func (sub *subscription) Close() error {
return sub.closeOrUnsubscribe(true)
}
// Ack manually acknowledges a message.
// The subscriber had to be created with SetManualAckMode() option.
func (msg *Msg) Ack() error {
if msg == nil {
return ErrNilMsg
}
// Look up subscription
sub := msg.Sub.(*subscription)
if sub == nil {
return ErrBadSubscription
}
sub.RLock()
ackSubject := sub.ackInbox
isManualAck := sub.opts.ManualAcks
sc := sub.sc
sub.RUnlock()
// Check for error conditions.
if sc == nil {
return ErrBadSubscription
}
// Get nc from the connection (needs locking to avoid race)
sc.RLock()
nc := sc.nc
sc.RUnlock()
if nc == nil {
return ErrBadConnection
}
if !isManualAck {
return ErrManualAck
}
// Ack here.
ack := &pb.Ack{Subject: msg.Subject, Sequence: msg.Sequence}
b, _ := ack.Marshal()
return nc.Publish(ackSubject, b)
}