Add containerd restore test

Signed-off-by: Kenfe-Mickael Laventure <mickael.laventure@gmail.com>
This commit is contained in:
Kenfe-Mickael Laventure 2016-04-20 18:08:36 -07:00
parent 7b2491bccc
commit 4dbc495ff8
3 changed files with 220 additions and 42 deletions

View file

@ -8,6 +8,7 @@ import (
"os" "os"
"os/exec" "os/exec"
"path/filepath" "path/filepath"
"strings"
"sync" "sync"
"testing" "testing"
"time" "time"
@ -31,18 +32,21 @@ func Test(t *testing.T) {
} }
func init() { func init() {
check.Suite(&ContainerdSuite{}) check.Suite(&ContainerdSuite{lastEventTs: uint64(time.Now().Unix())})
} }
type ContainerdSuite struct { type ContainerdSuite struct {
cwd string cwd string
outputDir string outputDir string
stateDir string
grpcSocket string
logFile *os.File logFile *os.File
cd *exec.Cmd cd *exec.Cmd
syncChild chan error syncChild chan error
grpcClient types.APIClient grpcClient types.APIClient
eventFiltersMutex sync.Mutex eventFiltersMutex sync.Mutex
eventFilters map[string]func(event *types.Event) eventFilters map[string]func(event *types.Event)
lastEventTs uint64
} }
// getClient returns a connection to the Suite containerd // getClient returns a connection to the Suite containerd
@ -69,15 +73,18 @@ func (cs *ContainerdSuite) getClient(socket string) error {
// via `SetContainerEventFilter()`, it will be invoked every time an // via `SetContainerEventFilter()`, it will be invoked every time an
// event for that id is received // event for that id is received
func (cs *ContainerdSuite) ContainerdEventsHandler(events types.API_EventsClient) { func (cs *ContainerdSuite) ContainerdEventsHandler(events types.API_EventsClient) {
timestamp := uint64(time.Now().Unix())
for { for {
e, err := events.Recv() e, err := events.Recv()
if err != nil { if err != nil {
// If daemon died or exited, return
if strings.Contains(err.Error(), "transport is closing") {
break
}
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
events, _ = cs.grpcClient.Events(context.Background(), &types.EventsRequest{Timestamp: timestamp}) events, _ = cs.grpcClient.Events(context.Background(), &types.EventsRequest{Timestamp: cs.lastEventTs})
continue continue
} }
timestamp = e.Timestamp cs.lastEventTs = e.Timestamp
cs.eventFiltersMutex.Lock() cs.eventFiltersMutex.Lock()
if f, ok := cs.eventFilters[e.Id]; ok { if f, ok := cs.eventFilters[e.Id]; ok {
f(e) f(e)
@ -97,6 +104,73 @@ func generateReferenceSpecs(destination string) error {
return specs.Run() return specs.Run()
} }
func (cs *ContainerdSuite) StopDaemon(kill bool) {
if cs.cd == nil {
return
}
if kill {
cs.cd.Process.Kill()
<-cs.syncChild
cs.cd = nil
} else {
// Terminate gently if possible
cs.cd.Process.Signal(os.Interrupt)
done := false
for done == false {
select {
case err := <-cs.syncChild:
if err != nil {
fmt.Printf("master containerd did not exit cleanly: %v\n", err)
}
done = true
case <-time.After(3 * time.Second):
fmt.Println("Timeout while waiting for containerd to exit, killing it!")
cs.cd.Process.Kill()
}
}
}
}
func (cs *ContainerdSuite) RestartDaemon(kill bool) error {
cs.StopDaemon(kill)
cd := exec.Command("containerd", "--debug",
"--state-dir", cs.stateDir,
"--listen", cs.grpcSocket,
"--metrics-interval", "0m0s",
"--runtime-args", fmt.Sprintf("--root=%s", filepath.Join(cs.cwd, cs.outputDir, "runc")),
)
cd.Stderr = cs.logFile
cd.Stdout = cs.logFile
if err := cd.Start(); err != nil {
return err
}
cs.cd = cd
if err := cs.getClient(cs.grpcSocket); err != nil {
// Kill the daemon
cs.cd.Process.Kill()
return err
}
// Monitor events
events, err := cs.grpcClient.Events(context.Background(), &types.EventsRequest{Timestamp: cs.lastEventTs})
if err != nil {
return err
}
go cs.ContainerdEventsHandler(events)
go func() {
cs.syncChild <- cd.Wait()
}()
return nil
}
func (cs *ContainerdSuite) SetUpSuite(c *check.C) { func (cs *ContainerdSuite) SetUpSuite(c *check.C) {
bundleMap = make(map[string]Bundle) bundleMap = make(map[string]Bundle)
cs.eventFilters = make(map[string]func(event *types.Event)) cs.eventFilters = make(map[string]func(event *types.Event))
@ -122,14 +196,14 @@ func (cs *ContainerdSuite) SetUpSuite(c *check.C) {
} }
// Create our output directory // Create our output directory
od := fmt.Sprintf(outputDirFormat, time.Now().Format("2006-01-02_150405.000000")) cs.outputDir = fmt.Sprintf(outputDirFormat, time.Now().Format("2006-01-02_150405.000000"))
cdStateDir := fmt.Sprintf("%s/containerd-master", od) cs.stateDir = filepath.Join(cs.outputDir, "containerd-master")
if err := os.MkdirAll(cdStateDir, 0755); err != nil { if err := os.MkdirAll(cs.stateDir, 0755); err != nil {
c.Fatalf("Unable to created output directory '%s': %v", cdStateDir, err) c.Fatalf("Unable to created output directory '%s': %v", cs.stateDir, err)
} }
cdGRPCSock := filepath.Join(od, "containerd-master", "containerd.sock") cs.grpcSocket = filepath.Join(cs.outputDir, "containerd-master", "containerd.sock")
cdLogFile := filepath.Join(od, "containerd-master", "containerd.log") cdLogFile := filepath.Join(cs.outputDir, "containerd-master", "containerd.log")
f, err := os.OpenFile(cdLogFile, os.O_CREATE|os.O_TRUNC|os.O_RDWR|os.O_SYNC, 0777) f, err := os.OpenFile(cdLogFile, os.O_CREATE|os.O_TRUNC|os.O_RDWR|os.O_SYNC, 0777)
if err != nil { if err != nil {
@ -137,39 +211,8 @@ func (cs *ContainerdSuite) SetUpSuite(c *check.C) {
} }
cs.logFile = f cs.logFile = f
cd := exec.Command("containerd", "--debug",
"--state-dir", cdStateDir,
"--listen", cdGRPCSock,
"--metrics-interval", "0m0s",
"--runtime-args", fmt.Sprintf("--root=%s", filepath.Join(cs.cwd, cdStateDir, "runc")),
)
cd.Stderr = f
cd.Stdout = f
if err := cd.Start(); err != nil {
c.Fatalf("Unable to start the master containerd: %v", err)
}
cs.outputDir = od
cs.cd = cd
cs.syncChild = make(chan error) cs.syncChild = make(chan error)
if err := cs.getClient(cdGRPCSock); err != nil { cs.RestartDaemon(false)
// Kill the daemon
cs.cd.Process.Kill()
c.Fatalf("Failed to connect to daemon: %v", err)
}
// Monitor events
events, err := cs.grpcClient.Events(context.Background(), &types.EventsRequest{})
if err != nil {
c.Fatalf("Could not register containerd event handler: %v", err)
}
go cs.ContainerdEventsHandler(events)
go func() {
cs.syncChild <- cd.Wait()
}()
} }
func (cs *ContainerdSuite) TearDownSuite(c *check.C) { func (cs *ContainerdSuite) TearDownSuite(c *check.C) {

View file

@ -6,6 +6,7 @@ import (
"io" "io"
"os" "os"
"path/filepath" "path/filepath"
"sort"
"syscall" "syscall"
"github.com/docker/containerd/api/grpc/types" "github.com/docker/containerd/api/grpc/types"
@ -264,3 +265,23 @@ func (cs *ContainerdSuite) AddProcessToContainer(init *containerProcess, pid, cw
return c, nil return c, nil
} }
type containerSorter struct {
c []*types.Container
}
func (s *containerSorter) Len() int {
return len(s.c)
}
func (s *containerSorter) Swap(i, j int) {
s.c[i], s.c[j] = s.c[j], s.c[i]
}
func (s *containerSorter) Less(i, j int) bool {
return s.c[i].Id < s.c[j].Id
}
func sortContainers(c []*types.Container) {
sort.Sort(&containerSorter{c})
}

View file

@ -1,6 +1,7 @@
package main package main
import ( import (
"fmt"
"path/filepath" "path/filepath"
"syscall" "syscall"
"time" "time"
@ -311,3 +312,116 @@ func (cs *ContainerdSuite) TestStartBusyboxTopPauseResume(t *check.C) {
t.Assert(containers[0].Id, checker.Equals, "top") t.Assert(containers[0].Id, checker.Equals, "top")
t.Assert(containers[0].Status, checker.Equals, "running") t.Assert(containers[0].Status, checker.Equals, "running")
} }
func (cs *ContainerdSuite) TestRestart(t *check.C) {
bundleName := "busybox-top"
if err := CreateBusyboxBundle(bundleName, []string{"top"}); err != nil {
t.Fatal(err)
}
totalCtr := 10
for i := 0; i < totalCtr; i++ {
containerId := fmt.Sprintf("top%d", i)
c, err := cs.StartContainer(containerId, bundleName)
if err != nil {
t.Fatal(err)
}
e := c.GetNextEvent()
t.Assert(*e, checker.Equals, types.Event{
Type: "start-container",
Id: containerId,
Status: 0,
Pid: "",
Timestamp: e.Timestamp,
})
}
// restart daemon gracefully (SIGINT)
cs.RestartDaemon(false)
// check that status is running
containers, err := cs.ListRunningContainers()
if err != nil {
t.Fatal(err)
}
sortContainers(containers)
t.Assert(len(containers), checker.Equals, totalCtr)
for i := 0; i < totalCtr; i++ {
t.Assert(containers[i].Id, checker.Equals, fmt.Sprintf("top%d", i))
t.Assert(containers[i].Status, checker.Equals, "running")
}
// Now kill daemon (SIGKILL)
cs.StopDaemon(true)
// Sleep a second to allow thevent e timestamp to change since
// it's second based
<-time.After(3 * time.Second)
// Kill a couple of containers
killedCtr := map[int]bool{4: true, 2: true}
var f func(*types.Event)
deathChans := make([]chan error, len(killedCtr))
deathChansIdx := 0
for i, _ := range killedCtr {
ch := make(chan error, 1)
deathChans[deathChansIdx] = ch
deathChansIdx++
syscall.Kill(int(containers[i].Pids[0]), syscall.SIGKILL)
// Filter to be notified of their death
containerId := fmt.Sprintf("top%d", i)
f = func(event *types.Event) {
expectedEvent := types.Event{
Type: "exit",
Id: containerId,
Status: 137,
Pid: "init",
}
expectedEvent.Timestamp = event.Timestamp
if ok := t.Check(*event, checker.Equals, expectedEvent); !ok {
ch <- fmt.Errorf("Unexpected event: %#v", *event)
} else {
ch <- nil
}
}
cs.SetContainerEventFilter(containerId, f)
}
cs.RestartDaemon(true)
// Ensure we got our events
for i, _ := range deathChans {
done := false
for done == false {
select {
case err := <-deathChans[i]:
t.Assert(err, checker.Equals, nil)
done = true
case <-time.After(3 * time.Second):
t.Fatal("Exit event for container not received after 3 seconds")
}
}
}
// check that status is running
containers, err = cs.ListRunningContainers()
if err != nil {
t.Fatal(err)
}
sortContainers(containers)
t.Assert(len(containers), checker.Equals, totalCtr-len(killedCtr))
idShift := 0
for i := 0; i < totalCtr-len(killedCtr); i++ {
if _, ok := killedCtr[i+idShift]; ok {
idShift++
}
t.Assert(containers[i].Id, checker.Equals, fmt.Sprintf("top%d", i+idShift))
t.Assert(containers[i].Status, checker.Equals, "running")
}
}