Merge pull request #212 from mlaventure/test-containerd-restart
Test containerd restart
This commit is contained in:
commit
a8dd154da1
4 changed files with 221 additions and 42 deletions
|
@ -77,6 +77,7 @@ var daemonFlags = []cli.Flag{
|
||||||
}
|
}
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
|
logrus.SetFormatter(&logrus.TextFormatter{TimestampFormat: time.RFC3339Nano})
|
||||||
appendPlatformFlags()
|
appendPlatformFlags()
|
||||||
app := cli.NewApp()
|
app := cli.NewApp()
|
||||||
app.Name = "containerd"
|
app.Name = "containerd"
|
||||||
|
|
|
@ -8,6 +8,7 @@ import (
|
||||||
"os"
|
"os"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
@ -31,18 +32,21 @@ func Test(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
check.Suite(&ContainerdSuite{})
|
check.Suite(&ContainerdSuite{lastEventTs: uint64(time.Now().Unix())})
|
||||||
}
|
}
|
||||||
|
|
||||||
type ContainerdSuite struct {
|
type ContainerdSuite struct {
|
||||||
cwd string
|
cwd string
|
||||||
outputDir string
|
outputDir string
|
||||||
|
stateDir string
|
||||||
|
grpcSocket string
|
||||||
logFile *os.File
|
logFile *os.File
|
||||||
cd *exec.Cmd
|
cd *exec.Cmd
|
||||||
syncChild chan error
|
syncChild chan error
|
||||||
grpcClient types.APIClient
|
grpcClient types.APIClient
|
||||||
eventFiltersMutex sync.Mutex
|
eventFiltersMutex sync.Mutex
|
||||||
eventFilters map[string]func(event *types.Event)
|
eventFilters map[string]func(event *types.Event)
|
||||||
|
lastEventTs uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
// getClient returns a connection to the Suite containerd
|
// getClient returns a connection to the Suite containerd
|
||||||
|
@ -69,15 +73,18 @@ func (cs *ContainerdSuite) getClient(socket string) error {
|
||||||
// via `SetContainerEventFilter()`, it will be invoked every time an
|
// via `SetContainerEventFilter()`, it will be invoked every time an
|
||||||
// event for that id is received
|
// event for that id is received
|
||||||
func (cs *ContainerdSuite) ContainerdEventsHandler(events types.API_EventsClient) {
|
func (cs *ContainerdSuite) ContainerdEventsHandler(events types.API_EventsClient) {
|
||||||
timestamp := uint64(time.Now().Unix())
|
|
||||||
for {
|
for {
|
||||||
e, err := events.Recv()
|
e, err := events.Recv()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
// If daemon died or exited, return
|
||||||
|
if strings.Contains(err.Error(), "transport is closing") {
|
||||||
|
break
|
||||||
|
}
|
||||||
time.Sleep(1 * time.Second)
|
time.Sleep(1 * time.Second)
|
||||||
events, _ = cs.grpcClient.Events(context.Background(), &types.EventsRequest{Timestamp: timestamp})
|
events, _ = cs.grpcClient.Events(context.Background(), &types.EventsRequest{Timestamp: cs.lastEventTs})
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
timestamp = e.Timestamp
|
cs.lastEventTs = e.Timestamp
|
||||||
cs.eventFiltersMutex.Lock()
|
cs.eventFiltersMutex.Lock()
|
||||||
if f, ok := cs.eventFilters[e.Id]; ok {
|
if f, ok := cs.eventFilters[e.Id]; ok {
|
||||||
f(e)
|
f(e)
|
||||||
|
@ -97,6 +104,73 @@ func generateReferenceSpecs(destination string) error {
|
||||||
return specs.Run()
|
return specs.Run()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (cs *ContainerdSuite) StopDaemon(kill bool) {
|
||||||
|
if cs.cd == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if kill {
|
||||||
|
cs.cd.Process.Kill()
|
||||||
|
<-cs.syncChild
|
||||||
|
cs.cd = nil
|
||||||
|
} else {
|
||||||
|
// Terminate gently if possible
|
||||||
|
cs.cd.Process.Signal(os.Interrupt)
|
||||||
|
|
||||||
|
done := false
|
||||||
|
for done == false {
|
||||||
|
select {
|
||||||
|
case err := <-cs.syncChild:
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("master containerd did not exit cleanly: %v\n", err)
|
||||||
|
}
|
||||||
|
done = true
|
||||||
|
case <-time.After(3 * time.Second):
|
||||||
|
fmt.Println("Timeout while waiting for containerd to exit, killing it!")
|
||||||
|
cs.cd.Process.Kill()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cs *ContainerdSuite) RestartDaemon(kill bool) error {
|
||||||
|
cs.StopDaemon(kill)
|
||||||
|
|
||||||
|
cd := exec.Command("containerd", "--debug",
|
||||||
|
"--state-dir", cs.stateDir,
|
||||||
|
"--listen", cs.grpcSocket,
|
||||||
|
"--metrics-interval", "0m0s",
|
||||||
|
"--runtime-args", fmt.Sprintf("--root=%s", filepath.Join(cs.cwd, cs.outputDir, "runc")),
|
||||||
|
)
|
||||||
|
cd.Stderr = cs.logFile
|
||||||
|
cd.Stdout = cs.logFile
|
||||||
|
|
||||||
|
if err := cd.Start(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
cs.cd = cd
|
||||||
|
|
||||||
|
if err := cs.getClient(cs.grpcSocket); err != nil {
|
||||||
|
// Kill the daemon
|
||||||
|
cs.cd.Process.Kill()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Monitor events
|
||||||
|
events, err := cs.grpcClient.Events(context.Background(), &types.EventsRequest{Timestamp: cs.lastEventTs})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
go cs.ContainerdEventsHandler(events)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
cs.syncChild <- cd.Wait()
|
||||||
|
}()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (cs *ContainerdSuite) SetUpSuite(c *check.C) {
|
func (cs *ContainerdSuite) SetUpSuite(c *check.C) {
|
||||||
bundleMap = make(map[string]Bundle)
|
bundleMap = make(map[string]Bundle)
|
||||||
cs.eventFilters = make(map[string]func(event *types.Event))
|
cs.eventFilters = make(map[string]func(event *types.Event))
|
||||||
|
@ -122,14 +196,14 @@ func (cs *ContainerdSuite) SetUpSuite(c *check.C) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create our output directory
|
// Create our output directory
|
||||||
od := fmt.Sprintf(outputDirFormat, time.Now().Format("2006-01-02_150405.000000"))
|
cs.outputDir = fmt.Sprintf(outputDirFormat, time.Now().Format("2006-01-02_150405.000000"))
|
||||||
cdStateDir := fmt.Sprintf("%s/containerd-master", od)
|
cs.stateDir = filepath.Join(cs.outputDir, "containerd-master")
|
||||||
if err := os.MkdirAll(cdStateDir, 0755); err != nil {
|
if err := os.MkdirAll(cs.stateDir, 0755); err != nil {
|
||||||
c.Fatalf("Unable to created output directory '%s': %v", cdStateDir, err)
|
c.Fatalf("Unable to created output directory '%s': %v", cs.stateDir, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
cdGRPCSock := filepath.Join(od, "containerd-master", "containerd.sock")
|
cs.grpcSocket = filepath.Join(cs.outputDir, "containerd-master", "containerd.sock")
|
||||||
cdLogFile := filepath.Join(od, "containerd-master", "containerd.log")
|
cdLogFile := filepath.Join(cs.outputDir, "containerd-master", "containerd.log")
|
||||||
|
|
||||||
f, err := os.OpenFile(cdLogFile, os.O_CREATE|os.O_TRUNC|os.O_RDWR|os.O_SYNC, 0777)
|
f, err := os.OpenFile(cdLogFile, os.O_CREATE|os.O_TRUNC|os.O_RDWR|os.O_SYNC, 0777)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -137,39 +211,8 @@ func (cs *ContainerdSuite) SetUpSuite(c *check.C) {
|
||||||
}
|
}
|
||||||
cs.logFile = f
|
cs.logFile = f
|
||||||
|
|
||||||
cd := exec.Command("containerd", "--debug",
|
|
||||||
"--state-dir", cdStateDir,
|
|
||||||
"--listen", cdGRPCSock,
|
|
||||||
"--metrics-interval", "0m0s",
|
|
||||||
"--runtime-args", fmt.Sprintf("--root=%s", filepath.Join(cs.cwd, cdStateDir, "runc")),
|
|
||||||
)
|
|
||||||
cd.Stderr = f
|
|
||||||
cd.Stdout = f
|
|
||||||
|
|
||||||
if err := cd.Start(); err != nil {
|
|
||||||
c.Fatalf("Unable to start the master containerd: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
cs.outputDir = od
|
|
||||||
cs.cd = cd
|
|
||||||
cs.syncChild = make(chan error)
|
cs.syncChild = make(chan error)
|
||||||
if err := cs.getClient(cdGRPCSock); err != nil {
|
cs.RestartDaemon(false)
|
||||||
// Kill the daemon
|
|
||||||
cs.cd.Process.Kill()
|
|
||||||
c.Fatalf("Failed to connect to daemon: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Monitor events
|
|
||||||
events, err := cs.grpcClient.Events(context.Background(), &types.EventsRequest{})
|
|
||||||
if err != nil {
|
|
||||||
c.Fatalf("Could not register containerd event handler: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
go cs.ContainerdEventsHandler(events)
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
cs.syncChild <- cd.Wait()
|
|
||||||
}()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cs *ContainerdSuite) TearDownSuite(c *check.C) {
|
func (cs *ContainerdSuite) TearDownSuite(c *check.C) {
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"sort"
|
||||||
"syscall"
|
"syscall"
|
||||||
|
|
||||||
"github.com/docker/containerd/api/grpc/types"
|
"github.com/docker/containerd/api/grpc/types"
|
||||||
|
@ -264,3 +265,23 @@ func (cs *ContainerdSuite) AddProcessToContainer(init *containerProcess, pid, cw
|
||||||
|
|
||||||
return c, nil
|
return c, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type containerSorter struct {
|
||||||
|
c []*types.Container
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *containerSorter) Len() int {
|
||||||
|
return len(s.c)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *containerSorter) Swap(i, j int) {
|
||||||
|
s.c[i], s.c[j] = s.c[j], s.c[i]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *containerSorter) Less(i, j int) bool {
|
||||||
|
return s.c[i].Id < s.c[j].Id
|
||||||
|
}
|
||||||
|
|
||||||
|
func sortContainers(c []*types.Container) {
|
||||||
|
sort.Sort(&containerSorter{c})
|
||||||
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
@ -311,3 +312,116 @@ func (cs *ContainerdSuite) TestStartBusyboxTopPauseResume(t *check.C) {
|
||||||
t.Assert(containers[0].Id, checker.Equals, "top")
|
t.Assert(containers[0].Id, checker.Equals, "top")
|
||||||
t.Assert(containers[0].Status, checker.Equals, "running")
|
t.Assert(containers[0].Status, checker.Equals, "running")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (cs *ContainerdSuite) TestRestart(t *check.C) {
|
||||||
|
bundleName := "busybox-top"
|
||||||
|
if err := CreateBusyboxBundle(bundleName, []string{"top"}); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
totalCtr := 10
|
||||||
|
|
||||||
|
for i := 0; i < totalCtr; i++ {
|
||||||
|
containerId := fmt.Sprintf("top%d", i)
|
||||||
|
c, err := cs.StartContainer(containerId, bundleName)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
e := c.GetNextEvent()
|
||||||
|
|
||||||
|
t.Assert(*e, checker.Equals, types.Event{
|
||||||
|
Type: "start-container",
|
||||||
|
Id: containerId,
|
||||||
|
Status: 0,
|
||||||
|
Pid: "",
|
||||||
|
Timestamp: e.Timestamp,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// restart daemon gracefully (SIGINT)
|
||||||
|
cs.RestartDaemon(false)
|
||||||
|
|
||||||
|
// check that status is running
|
||||||
|
containers, err := cs.ListRunningContainers()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
sortContainers(containers)
|
||||||
|
t.Assert(len(containers), checker.Equals, totalCtr)
|
||||||
|
for i := 0; i < totalCtr; i++ {
|
||||||
|
t.Assert(containers[i].Id, checker.Equals, fmt.Sprintf("top%d", i))
|
||||||
|
t.Assert(containers[i].Status, checker.Equals, "running")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now kill daemon (SIGKILL)
|
||||||
|
cs.StopDaemon(true)
|
||||||
|
|
||||||
|
// Sleep a second to allow thevent e timestamp to change since
|
||||||
|
// it's second based
|
||||||
|
<-time.After(3 * time.Second)
|
||||||
|
|
||||||
|
// Kill a couple of containers
|
||||||
|
killedCtr := map[int]bool{4: true, 2: true}
|
||||||
|
|
||||||
|
var f func(*types.Event)
|
||||||
|
deathChans := make([]chan error, len(killedCtr))
|
||||||
|
deathChansIdx := 0
|
||||||
|
|
||||||
|
for i, _ := range killedCtr {
|
||||||
|
ch := make(chan error, 1)
|
||||||
|
deathChans[deathChansIdx] = ch
|
||||||
|
deathChansIdx++
|
||||||
|
syscall.Kill(int(containers[i].Pids[0]), syscall.SIGKILL)
|
||||||
|
|
||||||
|
// Filter to be notified of their death
|
||||||
|
containerId := fmt.Sprintf("top%d", i)
|
||||||
|
f = func(event *types.Event) {
|
||||||
|
expectedEvent := types.Event{
|
||||||
|
Type: "exit",
|
||||||
|
Id: containerId,
|
||||||
|
Status: 137,
|
||||||
|
Pid: "init",
|
||||||
|
}
|
||||||
|
expectedEvent.Timestamp = event.Timestamp
|
||||||
|
if ok := t.Check(*event, checker.Equals, expectedEvent); !ok {
|
||||||
|
ch <- fmt.Errorf("Unexpected event: %#v", *event)
|
||||||
|
} else {
|
||||||
|
ch <- nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
cs.SetContainerEventFilter(containerId, f)
|
||||||
|
}
|
||||||
|
|
||||||
|
cs.RestartDaemon(true)
|
||||||
|
|
||||||
|
// Ensure we got our events
|
||||||
|
for i, _ := range deathChans {
|
||||||
|
done := false
|
||||||
|
for done == false {
|
||||||
|
select {
|
||||||
|
case err := <-deathChans[i]:
|
||||||
|
t.Assert(err, checker.Equals, nil)
|
||||||
|
done = true
|
||||||
|
case <-time.After(3 * time.Second):
|
||||||
|
t.Fatal("Exit event for container not received after 3 seconds")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// check that status is running
|
||||||
|
containers, err = cs.ListRunningContainers()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
sortContainers(containers)
|
||||||
|
t.Assert(len(containers), checker.Equals, totalCtr-len(killedCtr))
|
||||||
|
idShift := 0
|
||||||
|
for i := 0; i < totalCtr-len(killedCtr); i++ {
|
||||||
|
if _, ok := killedCtr[i+idShift]; ok {
|
||||||
|
idShift++
|
||||||
|
}
|
||||||
|
t.Assert(containers[i].Id, checker.Equals, fmt.Sprintf("top%d", i+idShift))
|
||||||
|
t.Assert(containers[i].Status, checker.Equals, "running")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue