Add journal queue
This commit is contained in:
parent
0136213e78
commit
18338b29a1
|
@ -217,7 +217,6 @@ func (s *server) createContainer(w http.ResponseWriter, r *http.Request) {
|
||||||
e := containerd.NewEvent(containerd.StartContainerEventType)
|
e := containerd.NewEvent(containerd.StartContainerEventType)
|
||||||
e.ID = id
|
e.ID = id
|
||||||
e.BundlePath = c.BundlePath
|
e.BundlePath = c.BundlePath
|
||||||
logrus.Debug(c.Stderr, c.Stdout)
|
|
||||||
e.Stdio = &containerd.Stdio{
|
e.Stdio = &containerd.Stdio{
|
||||||
Stderr: c.Stderr,
|
Stderr: c.Stderr,
|
||||||
Stdout: c.Stdout,
|
Stdout: c.Stdout,
|
||||||
|
|
28
journal.go
28
journal.go
|
@ -4,6 +4,8 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
|
||||||
|
"github.com/Sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
type entry struct {
|
type entry struct {
|
||||||
|
@ -18,24 +20,38 @@ func newJournal(path string) (*journal, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return &journal{
|
j := &journal{
|
||||||
f: f,
|
f: f,
|
||||||
enc: json.NewEncoder(f),
|
enc: json.NewEncoder(f),
|
||||||
}, nil
|
wc: make(chan *Event, 2048),
|
||||||
|
}
|
||||||
|
go j.start()
|
||||||
|
return j, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type journal struct {
|
type journal struct {
|
||||||
f *os.File
|
f *os.File
|
||||||
enc *json.Encoder
|
enc *json.Encoder
|
||||||
|
wc chan *Event
|
||||||
}
|
}
|
||||||
|
|
||||||
func (j *journal) write(e *Event) error {
|
func (j *journal) start() {
|
||||||
et := &entry{
|
for e := range j.wc {
|
||||||
Event: e,
|
et := &entry{
|
||||||
|
Event: e,
|
||||||
|
}
|
||||||
|
if err := j.enc.Encode(et); err != nil {
|
||||||
|
logrus.WithField("error", err).Error("write event to journal")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return j.enc.Encode(et)
|
}
|
||||||
|
|
||||||
|
func (j *journal) write(e *Event) {
|
||||||
|
j.wc <- e
|
||||||
}
|
}
|
||||||
|
|
||||||
func (j *journal) Close() error {
|
func (j *journal) Close() error {
|
||||||
|
// TODO: add waitgroup to make sure journal is flushed
|
||||||
|
close(j.wc)
|
||||||
return j.f.Close()
|
return j.f.Close()
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,7 +13,6 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"syscall"
|
"syscall"
|
||||||
|
|
||||||
"github.com/Sirupsen/logrus"
|
|
||||||
"github.com/opencontainers/runc/libcontainer"
|
"github.com/opencontainers/runc/libcontainer"
|
||||||
"github.com/opencontainers/runc/libcontainer/configs"
|
"github.com/opencontainers/runc/libcontainer/configs"
|
||||||
_ "github.com/opencontainers/runc/libcontainer/nsenter"
|
_ "github.com/opencontainers/runc/libcontainer/nsenter"
|
||||||
|
@ -341,7 +340,6 @@ func (r *libcontainerRuntime) newProcess(p specs.Process, stdio *Stdio) (*libcon
|
||||||
)
|
)
|
||||||
if stdio != nil {
|
if stdio != nil {
|
||||||
if stdio.Stdout != "" {
|
if stdio.Stdout != "" {
|
||||||
logrus.Debug("adding stdout")
|
|
||||||
f, err := os.OpenFile(stdio.Stdout, os.O_CREATE|os.O_WRONLY, 0755)
|
f, err := os.OpenFile(stdio.Stdout, os.O_CREATE|os.O_WRONLY, 0755)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
|
@ -72,9 +72,7 @@ func (s *Supervisor) Start(events chan *Event) error {
|
||||||
s.events = events
|
s.events = events
|
||||||
go func() {
|
go func() {
|
||||||
for e := range events {
|
for e := range events {
|
||||||
if err := s.journal.write(e); err != nil {
|
s.journal.write(e)
|
||||||
logrus.WithField("error", err).Error("write journal entry")
|
|
||||||
}
|
|
||||||
switch e.Type {
|
switch e.Type {
|
||||||
case ExitEventType:
|
case ExitEventType:
|
||||||
logrus.WithFields(logrus.Fields{"pid": e.Pid, "status": e.Status}).
|
logrus.WithFields(logrus.Fields{"pid": e.Pid, "status": e.Status}).
|
||||||
|
|
Loading…
Reference in New Issue