diff --git a/events/events.go b/events/events.go new file mode 100644 index 0000000..51e6957 --- /dev/null +++ b/events/events.go @@ -0,0 +1,9 @@ +package events + +type Event interface{} + +type Envelope struct { + Tx int64 `json:",omitempty"` + Topic string + Event interface{} +} diff --git a/events/events_test.go b/events/events_test.go new file mode 100644 index 0000000..8b8acce --- /dev/null +++ b/events/events_test.go @@ -0,0 +1,33 @@ +package events + +import ( + "context" + "fmt" + "testing" +) + +func TestBasicEvent(t *testing.T) { + ctx := context.Background() + + // simulate a layer pull with events + ctx, commit, _ := WithTx(ctx) + + G(ctx).Post("pull ubuntu") + + for layer := 0; layer < 4; layer++ { + // make a subtransaction for each layer + ctx, commit, _ := WithTx(ctx) + + G(ctx).Post(fmt.Sprintf("fetch layer %v", layer)) + + ctx = WithTopic(ctx, "content") + // simulate sub-operations with a separate topic, on the content store + G(ctx).Post(fmt.Sprintf("received sha:256")) + + G(ctx).Post(fmt.Sprintf("unpack layer %v", layer)) + + commit() + } + + commit() +} diff --git a/events/poster.go b/events/poster.go new file mode 100644 index 0000000..170f899 --- /dev/null +++ b/events/poster.go @@ -0,0 +1,55 @@ +package events + +import ( + "context" + + "github.com/Sirupsen/logrus" + "github.com/docker/containerd/log" +) + +var ( + G = GetPoster +) + +// Poster posts the event. +type Poster interface { + Post(event Event) +} + +type posterKey struct{} + +func GetPoster(ctx context.Context) Poster { + poster := ctx.Value(ctx) + if poster == nil { + logger := log.G(ctx) + tx, _ := getTx(ctx) + topic := getTopic(ctx) + + // likely means we don't have a configured event system. Just return + // the default poster, which merely logs events. + return posterFunc(func(event Event) { + fields := logrus.Fields{"event": event} + + if topic != "" { + fields["topic"] = topic + } + + if tx != nil { + fields["tx.id"] = tx.id + if tx.parent != nil { + fields["tx.parent.id"] = tx.parent.id + } + } + + logger.WithFields(fields).Info("event posted") + }) + } + + return poster.(Poster) +} + +type posterFunc func(event Event) + +func (fn posterFunc) Post(event Event) { + fn(event) +} diff --git a/events/topic.go b/events/topic.go new file mode 100644 index 0000000..053ef9d --- /dev/null +++ b/events/topic.go @@ -0,0 +1,35 @@ +package events + +import "context" + +type topicKey struct{} + +// WithTopic returns a context with a new topic set, such that events emmited +// from the resulting context will be marked with the topic. +// +// A topic groups events by the target module they operate on. This is +// primarily designed to support multi-module log compaction of events. In +// typical journaling systems, the entries operate on a single data structure. +// When compacting the journal, we can replace all former log entries with a +// summary data structure that will result in the same state. +// +// By providing a compaction mechansim by topic, we can prune down to a data +// structure oriented towards a single topic, leaving unrelated messages alone. +func WithTopic(ctx context.Context, topic string) context.Context { + return context.WithValue(ctx, topicKey{}, topic) +} + +func getTopic(ctx context.Context) string { + topic := ctx.Value(topicKey{}) + + if topic == nil { + return "" + } + + return topic.(string) +} + +// RegisterCompactor sets the compacter for the given topic. +func RegisterCompactor(topic string, compactor interface{}) { + panic("not implemented") +} diff --git a/events/transaction.go b/events/transaction.go new file mode 100644 index 0000000..c1615eb --- /dev/null +++ b/events/transaction.go @@ -0,0 +1,94 @@ +package events + +import ( + "context" + "fmt" + "sync" + "sync/atomic" + "time" +) + +var txCounter int64 // replace this with something that won't break + +// nextTXID provides the next transaction identifier. +func nexttxID() int64 { + // TODO(stevvooe): Need to coordinate this with existing transaction logs. + // For now, this is a toy, but not a racy one. + return atomic.AddInt64(&txCounter, 1) +} + +type transaction struct { + id int64 + parent *transaction // if nil, no parent transaction + finish sync.Once + start, end time.Time // informational +} + +// begin creates a sub-transaction. +func (tx *transaction) begin(poster Poster) *transaction { + id := nexttxID() + + child := &transaction{ + id: id, + parent: tx, + start: time.Now(), + } + + // post the transaction started event + poster.Post(child.makeTransactionEvent("begin")) // tranactions are really just events + + return child +} + +// commit writes out the transaction. +func (tx *transaction) commit(poster Poster) { + tx.finish.Do(func() { + tx.end = time.Now() + poster.Post(tx.makeTransactionEvent("commit")) + }) +} + +func (tx *transaction) rollback(poster Poster, cause error) { + tx.finish.Do(func() { + tx.end = time.Now() + event := tx.makeTransactionEvent("rollback") + event = fmt.Sprintf("%s error=%q", event, cause.Error()) + poster.Post(event) + }) +} + +func (tx *transaction) makeTransactionEvent(status string) Event { + // TODO(stevvooe): obviously, we need more structure than this. + event := fmt.Sprintf("%v %v", status, tx.id) + if tx.parent != nil { + event += " parent=" + fmt.Sprint(tx.parent.id) + } + + return event +} + +type txKey struct{} + +func getTx(ctx context.Context) (*transaction, bool) { + tx := ctx.Value(txKey{}) + if tx == nil { + return nil, false + } + + return tx.(*transaction), true +} + +// WithTx returns a new context with an event transaction, such that events +// posted to the underlying context will be committed to the event log as a +// group, organized by a transaction id, when commit is called. +func WithTx(pctx context.Context) (ctx context.Context, commit func(), rollback func(err error)) { + poster := G(pctx) + parent, _ := getTx(pctx) + tx := parent.begin(poster) + + return context.WithValue(pctx, txKey{}, tx), func() { + tx.commit(poster) + }, func(err error) { + tx.rollback(poster, err) + } +}