Sync container start
This commit is contained in:
		
							parent
							
								
									e81513349f
								
							
						
					
					
						commit
						86ec7e8fd2
					
				
					 7 changed files with 50 additions and 81 deletions
				
			
		|  | @ -2,6 +2,7 @@ package containerd | |||
| 
 | ||||
| type Container interface { | ||||
| 	ID() string | ||||
| 	Start() error | ||||
| 	Pid() (int, error) | ||||
| 	SetExited(status int) | ||||
| 	Delete() error | ||||
|  |  | |||
|  | @ -43,7 +43,7 @@ var DaemonCommand = cli.Command{ | |||
| 			}() | ||||
| 			go metrics.Log(metrics.DefaultRegistry, 60*time.Second, l) | ||||
| 		} | ||||
| 		if err := daemon(context.String("state-dir"), 20, context.Int("buffer-size")); err != nil { | ||||
| 		if err := daemon(context.String("state-dir"), 10, context.Int("buffer-size")); err != nil { | ||||
| 			logrus.Fatal(err) | ||||
| 		} | ||||
| 	}, | ||||
|  |  | |||
|  | @ -10,6 +10,7 @@ var ( | |||
| 	ErrContainerExists   = errors.New("containerd: container already exists") | ||||
| 
 | ||||
| 	// Internal errors | ||||
| 	errShutdown   = errors.New("containerd: supervisor is shutdown") | ||||
| 	errRootNotAbs = errors.New("containerd: rootfs path is not an absolute path") | ||||
| 	errShutdown          = errors.New("containerd: supervisor is shutdown") | ||||
| 	errRootNotAbs        = errors.New("containerd: rootfs path is not an absolute path") | ||||
| 	errNoContainerForPid = errors.New("containerd: pid not registered for any container") | ||||
| ) | ||||
|  |  | |||
							
								
								
									
										9
									
								
								event.go
									
										
									
									
									
								
							
							
						
						
									
										9
									
								
								event.go
									
										
									
									
									
								
							|  | @ -18,15 +18,6 @@ func (e *ExitEvent) String() string { | |||
| 	return "exit event" | ||||
| } | ||||
| 
 | ||||
| type StartedEvent struct { | ||||
| 	ID        string | ||||
| 	Container Container | ||||
| } | ||||
| 
 | ||||
| func (s *StartedEvent) String() string { | ||||
| 	return "started event" | ||||
| } | ||||
| 
 | ||||
| type CreateContainerEvent struct { | ||||
| 	ID         string | ||||
| 	BundlePath string | ||||
|  |  | |||
							
								
								
									
										10
									
								
								jobs.go
									
										
									
									
									
								
							
							
						
						
									
										10
									
								
								jobs.go
									
										
									
									
									
								
							|  | @ -1,10 +0,0 @@ | |||
| package containerd | ||||
| 
 | ||||
| type Job interface { | ||||
| } | ||||
| 
 | ||||
| type CreateJob struct { | ||||
| 	ID         string | ||||
| 	BundlePath string | ||||
| 	Err        chan error | ||||
| } | ||||
|  | @ -173,6 +173,10 @@ func (c *libcontainerContainer) Pid() (int, error) { | |||
| 	return c.initProcess.Pid() | ||||
| } | ||||
| 
 | ||||
| func (c *libcontainerContainer) Start() error { | ||||
| 	return c.c.Start(c.initProcess) | ||||
| } | ||||
| 
 | ||||
| func (c *libcontainerContainer) SetExited(status int) { | ||||
| 	c.exitStatus = status | ||||
| 	// meh | ||||
|  | @ -222,10 +226,6 @@ func (r *libcontainerRuntime) Create(id, bundlePath string) (Container, error) { | |||
| 		c:           container, | ||||
| 		initProcess: process, | ||||
| 	} | ||||
| 	if err := container.Start(process); err != nil { | ||||
| 		container.Destroy() | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	return c, nil | ||||
| } | ||||
| 
 | ||||
|  |  | |||
|  | @ -2,10 +2,10 @@ package containerd | |||
| 
 | ||||
| import ( | ||||
| 	"os" | ||||
| 	"sync" | ||||
| 	"time" | ||||
| 
 | ||||
| 	"github.com/Sirupsen/logrus" | ||||
| 	"github.com/opencontainers/runc/libcontainer" | ||||
| 	"github.com/rcrowley/go-metrics" | ||||
| ) | ||||
| 
 | ||||
|  | @ -26,14 +26,8 @@ func NewSupervisor(stateDir string, concurrency int) (*Supervisor, error) { | |||
| 	} | ||||
| 	s := &Supervisor{ | ||||
| 		stateDir:   stateDir, | ||||
| 		processes:  make(map[int]Container), | ||||
| 		containers: make(map[string]Container), | ||||
| 		runtime:    runtime, | ||||
| 		jobs:       make(chan Job, 1024), | ||||
| 	} | ||||
| 	for i := 0; i < concurrency; i++ { | ||||
| 		s.workerGroup.Add(1) | ||||
| 		go s.worker(i) | ||||
| 	} | ||||
| 	return s, nil | ||||
| } | ||||
|  | @ -42,15 +36,11 @@ type Supervisor struct { | |||
| 	// stateDir is the directory on the system to store container runtime state information. | ||||
| 	stateDir string | ||||
| 
 | ||||
| 	processes  map[int]Container | ||||
| 	containers map[string]Container | ||||
| 
 | ||||
| 	runtime Runtime | ||||
| 
 | ||||
| 	events chan Event | ||||
| 	jobs   chan Job | ||||
| 
 | ||||
| 	workerGroup sync.WaitGroup | ||||
| } | ||||
| 
 | ||||
| // Start is a non-blocking call that runs the supervisor for monitoring contianer processes and | ||||
|  | @ -71,35 +61,56 @@ func (s *Supervisor) Start(events chan Event) error { | |||
| 					"pid":    e.Pid, | ||||
| 					"status": e.Status, | ||||
| 				}).Debug("containerd: process exited") | ||||
| 				if container, ok := s.processes[e.Pid]; ok { | ||||
| 					container.SetExited(e.Status) | ||||
| 					delete(s.processes, e.Pid) | ||||
| 					delete(s.containers, container.ID()) | ||||
| 					if err := container.Delete(); err != nil { | ||||
| 						logrus.WithField("error", err).Error("containerd: deleting container") | ||||
| 					} | ||||
| 				} | ||||
| 			case *StartedEvent: | ||||
| 				s.containers[e.ID] = e.Container | ||||
| 				pid, err := e.Container.Pid() | ||||
| 				container, err := s.getContainerForPid(e.Pid) | ||||
| 				if err != nil { | ||||
| 					logrus.WithField("error", err).Error("containerd: getting container pid") | ||||
| 					if err != errNoContainerForPid { | ||||
| 						logrus.WithField("error", err).Error("containerd: find container for pid") | ||||
| 					} | ||||
| 					continue | ||||
| 				} | ||||
| 				s.processes[pid] = e.Container | ||||
| 			case *CreateContainerEvent: | ||||
| 				j := &CreateJob{ | ||||
| 					ID:         e.ID, | ||||
| 					BundlePath: e.BundlePath, | ||||
| 					Err:        e.Err, | ||||
| 				container.SetExited(e.Status) | ||||
| 				delete(s.containers, container.ID()) | ||||
| 				if err := container.Delete(); err != nil { | ||||
| 					logrus.WithField("error", err).Error("containerd: deleting container") | ||||
| 				} | ||||
| 				s.jobs <- j | ||||
| 			case *CreateContainerEvent: | ||||
| 				start := time.Now() | ||||
| 				container, err := s.runtime.Create(e.ID, e.BundlePath) | ||||
| 				if err != nil { | ||||
| 					e.Err <- err | ||||
| 					continue | ||||
| 				} | ||||
| 				s.containers[e.ID] = container | ||||
| 				if err := container.Start(); err != nil { | ||||
| 					e.Err <- err | ||||
| 					continue | ||||
| 				} | ||||
| 				e.Err <- nil | ||||
| 				containerStartTimer.UpdateSince(start) | ||||
| 			} | ||||
| 		} | ||||
| 	}() | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (s *Supervisor) getContainerForPid(pid int) (Container, error) { | ||||
| 	for _, container := range s.containers { | ||||
| 		cpid, err := container.Pid() | ||||
| 		if err != nil { | ||||
| 			if lerr, ok := err.(libcontainer.Error); ok { | ||||
| 				if lerr.Code() == libcontainer.ProcessNotExecuted { | ||||
| 					continue | ||||
| 				} | ||||
| 			} | ||||
| 			logrus.WithField("error", err).Error("containerd: get container pid") | ||||
| 		} | ||||
| 		if pid == cpid { | ||||
| 			return container, nil | ||||
| 		} | ||||
| 	} | ||||
| 	return nil, errNoContainerForPid | ||||
| } | ||||
| 
 | ||||
| func (s *Supervisor) SendEvent(evt Event) { | ||||
| 	s.events <- evt | ||||
| } | ||||
|  | @ -108,28 +119,3 @@ func (s *Supervisor) SendEvent(evt Event) { | |||
| func (s *Supervisor) Stop() { | ||||
| 
 | ||||
| } | ||||
| 
 | ||||
| func (s *Supervisor) worker(id int) { | ||||
| 	defer func() { | ||||
| 		s.workerGroup.Done() | ||||
| 		logrus.WithField("worker", id).Debug("containerd: worker finished") | ||||
| 	}() | ||||
| 	logrus.WithField("worker", id).Debug("containerd: starting worker") | ||||
| 	for job := range s.jobs { | ||||
| 		switch j := job.(type) { | ||||
| 		case *CreateJob: | ||||
| 			start := time.Now() | ||||
| 			container, err := s.runtime.Create(j.ID, j.BundlePath) | ||||
| 			if err != nil { | ||||
| 				j.Err <- err | ||||
| 				continue | ||||
| 			} | ||||
| 			s.SendEvent(&StartedEvent{ | ||||
| 				ID:        j.ID, | ||||
| 				Container: container, | ||||
| 			}) | ||||
| 			j.Err <- nil | ||||
| 			containerStartTimer.UpdateSince(start) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  |  | |||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue