diff --git a/integration-test/check_test.go b/integration-test/check_test.go index a3214aa..6aabd65 100644 --- a/integration-test/check_test.go +++ b/integration-test/check_test.go @@ -8,6 +8,7 @@ import ( "os" "os/exec" "path/filepath" + "strings" "sync" "testing" "time" @@ -31,18 +32,21 @@ func Test(t *testing.T) { } func init() { - check.Suite(&ContainerdSuite{}) + check.Suite(&ContainerdSuite{lastEventTs: uint64(time.Now().Unix())}) } type ContainerdSuite struct { cwd string outputDir string + stateDir string + grpcSocket string logFile *os.File cd *exec.Cmd syncChild chan error grpcClient types.APIClient eventFiltersMutex sync.Mutex eventFilters map[string]func(event *types.Event) + lastEventTs uint64 } // getClient returns a connection to the Suite containerd @@ -69,15 +73,18 @@ func (cs *ContainerdSuite) getClient(socket string) error { // via `SetContainerEventFilter()`, it will be invoked every time an // event for that id is received func (cs *ContainerdSuite) ContainerdEventsHandler(events types.API_EventsClient) { - timestamp := uint64(time.Now().Unix()) for { e, err := events.Recv() if err != nil { + // If daemon died or exited, return + if strings.Contains(err.Error(), "transport is closing") { + break + } time.Sleep(1 * time.Second) - events, _ = cs.grpcClient.Events(context.Background(), &types.EventsRequest{Timestamp: timestamp}) + events, _ = cs.grpcClient.Events(context.Background(), &types.EventsRequest{Timestamp: cs.lastEventTs}) continue } - timestamp = e.Timestamp + cs.lastEventTs = e.Timestamp cs.eventFiltersMutex.Lock() if f, ok := cs.eventFilters[e.Id]; ok { f(e) @@ -97,6 +104,73 @@ func generateReferenceSpecs(destination string) error { return specs.Run() } +func (cs *ContainerdSuite) StopDaemon(kill bool) { + if cs.cd == nil { + return + } + + if kill { + cs.cd.Process.Kill() + <-cs.syncChild + cs.cd = nil + } else { + // Terminate gently if possible + cs.cd.Process.Signal(os.Interrupt) + + done := false + for done == false { + select { + case err := <-cs.syncChild: + if err != nil { + fmt.Printf("master containerd did not exit cleanly: %v\n", err) + } + done = true + case <-time.After(3 * time.Second): + fmt.Println("Timeout while waiting for containerd to exit, killing it!") + cs.cd.Process.Kill() + } + } + } +} + +func (cs *ContainerdSuite) RestartDaemon(kill bool) error { + cs.StopDaemon(kill) + + cd := exec.Command("containerd", "--debug", + "--state-dir", cs.stateDir, + "--listen", cs.grpcSocket, + "--metrics-interval", "0m0s", + "--runtime-args", fmt.Sprintf("--root=%s", filepath.Join(cs.cwd, cs.outputDir, "runc")), + ) + cd.Stderr = cs.logFile + cd.Stdout = cs.logFile + + if err := cd.Start(); err != nil { + return err + } + cs.cd = cd + + if err := cs.getClient(cs.grpcSocket); err != nil { + // Kill the daemon + cs.cd.Process.Kill() + return err + } + + // Monitor events + events, err := cs.grpcClient.Events(context.Background(), &types.EventsRequest{Timestamp: cs.lastEventTs}) + if err != nil { + return err + } + + go cs.ContainerdEventsHandler(events) + + go func() { + cs.syncChild <- cd.Wait() + }() + + return nil +} + func (cs *ContainerdSuite) SetUpSuite(c *check.C) { bundleMap = make(map[string]Bundle) cs.eventFilters = make(map[string]func(event *types.Event)) @@ -122,14 +196,14 @@ func (cs *ContainerdSuite) SetUpSuite(c *check.C) { } // Create our output directory - od := fmt.Sprintf(outputDirFormat, time.Now().Format("2006-01-02_150405.000000")) - cdStateDir := fmt.Sprintf("%s/containerd-master", od) - if err := os.MkdirAll(cdStateDir, 0755); err != nil { - c.Fatalf("Unable to created output directory '%s': %v", cdStateDir, err) + cs.outputDir = fmt.Sprintf(outputDirFormat, time.Now().Format("2006-01-02_150405.000000")) + cs.stateDir = filepath.Join(cs.outputDir, "containerd-master") + if err := os.MkdirAll(cs.stateDir, 0755); err != nil { + c.Fatalf("Unable to created output directory '%s': %v", cs.stateDir, err) } - cdGRPCSock := filepath.Join(od, "containerd-master", "containerd.sock") - cdLogFile := filepath.Join(od, "containerd-master", "containerd.log") + cs.grpcSocket = filepath.Join(cs.outputDir, "containerd-master", "containerd.sock") + cdLogFile := filepath.Join(cs.outputDir, "containerd-master", "containerd.log") f, err := os.OpenFile(cdLogFile, os.O_CREATE|os.O_TRUNC|os.O_RDWR|os.O_SYNC, 0777) if err != nil { @@ -137,39 +211,8 @@ func (cs *ContainerdSuite) SetUpSuite(c *check.C) { } cs.logFile = f - cd := exec.Command("containerd", "--debug", - "--state-dir", cdStateDir, - "--listen", cdGRPCSock, - "--metrics-interval", "0m0s", - "--runtime-args", fmt.Sprintf("--root=%s", filepath.Join(cs.cwd, cdStateDir, "runc")), - ) - cd.Stderr = f - cd.Stdout = f - - if err := cd.Start(); err != nil { - c.Fatalf("Unable to start the master containerd: %v", err) - } - - cs.outputDir = od - cs.cd = cd cs.syncChild = make(chan error) - if err := cs.getClient(cdGRPCSock); err != nil { - // Kill the daemon - cs.cd.Process.Kill() - c.Fatalf("Failed to connect to daemon: %v", err) - } - - // Monitor events - events, err := cs.grpcClient.Events(context.Background(), &types.EventsRequest{}) - if err != nil { - c.Fatalf("Could not register containerd event handler: %v", err) - } - - go cs.ContainerdEventsHandler(events) - - go func() { - cs.syncChild <- cd.Wait() - }() + cs.RestartDaemon(false) } func (cs *ContainerdSuite) TearDownSuite(c *check.C) { diff --git a/integration-test/container_utils_test.go b/integration-test/container_utils_test.go index 95bc2fd..ec836a0 100644 --- a/integration-test/container_utils_test.go +++ b/integration-test/container_utils_test.go @@ -6,6 +6,7 @@ import ( "io" "os" "path/filepath" + "sort" "syscall" "github.com/docker/containerd/api/grpc/types" @@ -264,3 +265,23 @@ func (cs *ContainerdSuite) AddProcessToContainer(init *containerProcess, pid, cw return c, nil } + +type containerSorter struct { + c []*types.Container +} + +func (s *containerSorter) Len() int { + return len(s.c) +} + +func (s *containerSorter) Swap(i, j int) { + s.c[i], s.c[j] = s.c[j], s.c[i] +} + +func (s *containerSorter) Less(i, j int) bool { + return s.c[i].Id < s.c[j].Id +} + +func sortContainers(c []*types.Container) { + sort.Sort(&containerSorter{c}) +} diff --git a/integration-test/start_test.go b/integration-test/start_test.go index a2312fe..8990830 100644 --- a/integration-test/start_test.go +++ b/integration-test/start_test.go @@ -1,6 +1,7 @@ package main import ( + "fmt" "path/filepath" "syscall" "time" @@ -311,3 +312,116 @@ func (cs *ContainerdSuite) TestStartBusyboxTopPauseResume(t *check.C) { t.Assert(containers[0].Id, checker.Equals, "top") t.Assert(containers[0].Status, checker.Equals, "running") } + +func (cs *ContainerdSuite) TestRestart(t *check.C) { + bundleName := "busybox-top" + if err := CreateBusyboxBundle(bundleName, []string{"top"}); err != nil { + t.Fatal(err) + } + + totalCtr := 10 + + for i := 0; i < totalCtr; i++ { + containerId := fmt.Sprintf("top%d", i) + c, err := cs.StartContainer(containerId, bundleName) + if err != nil { + t.Fatal(err) + } + + e := c.GetNextEvent() + + t.Assert(*e, checker.Equals, types.Event{ + Type: "start-container", + Id: containerId, + Status: 0, + Pid: "", + Timestamp: e.Timestamp, + }) + } + + // restart daemon gracefully (SIGINT) + cs.RestartDaemon(false) + + // check that status is running + containers, err := cs.ListRunningContainers() + if err != nil { + t.Fatal(err) + } + sortContainers(containers) + t.Assert(len(containers), checker.Equals, totalCtr) + for i := 0; i < totalCtr; i++ { + t.Assert(containers[i].Id, checker.Equals, fmt.Sprintf("top%d", i)) + t.Assert(containers[i].Status, checker.Equals, "running") + } + + // Now kill daemon (SIGKILL) + cs.StopDaemon(true) + + // Sleep a second to allow thevent e timestamp to change since + // it's second based + <-time.After(3 * time.Second) + + // Kill a couple of containers + killedCtr := map[int]bool{4: true, 2: true} + + var f func(*types.Event) + deathChans := make([]chan error, len(killedCtr)) + deathChansIdx := 0 + + for i, _ := range killedCtr { + ch := make(chan error, 1) + deathChans[deathChansIdx] = ch + deathChansIdx++ + syscall.Kill(int(containers[i].Pids[0]), syscall.SIGKILL) + + // Filter to be notified of their death + containerId := fmt.Sprintf("top%d", i) + f = func(event *types.Event) { + expectedEvent := types.Event{ + Type: "exit", + Id: containerId, + Status: 137, + Pid: "init", + } + expectedEvent.Timestamp = event.Timestamp + if ok := t.Check(*event, checker.Equals, expectedEvent); !ok { + ch <- fmt.Errorf("Unexpected event: %#v", *event) + } else { + ch <- nil + } + } + cs.SetContainerEventFilter(containerId, f) + } + + cs.RestartDaemon(true) + + // Ensure we got our events + for i, _ := range deathChans { + done := false + for done == false { + select { + case err := <-deathChans[i]: + t.Assert(err, checker.Equals, nil) + done = true + case <-time.After(3 * time.Second): + t.Fatal("Exit event for container not received after 3 seconds") + } + } + } + + // check that status is running + containers, err = cs.ListRunningContainers() + if err != nil { + t.Fatal(err) + } + sortContainers(containers) + t.Assert(len(containers), checker.Equals, totalCtr-len(killedCtr)) + idShift := 0 + for i := 0; i < totalCtr-len(killedCtr); i++ { + if _, ok := killedCtr[i+idShift]; ok { + idShift++ + } + t.Assert(containers[i].Id, checker.Equals, fmt.Sprintf("top%d", i+idShift)) + t.Assert(containers[i].Status, checker.Equals, "running") + } +}