events: implemenet transaction event API
With this changeset, we provide an implementation for a transaction event API. The goal is to support the emission of events with transactions and sub-transactions, allowing a process post events that may be rolled back later due to an error in the entire process. When journaled, a consumer will be able to commit and rollback with the same behavior as the producer. Events are left to full definition by each component. We may require more structure in the future to ensure consistency but we need more use case before making decisions in that direction. Events may be organized by a topic. A topic defines a single stream of messages, that could be associated with a specific component. The topic defines a grouped stream that may be compacted as one. Only the contextual API is implemented here. After using, we will make a determination to see how useful this across the board. Signed-off-by: Stephen J Day <stephen.day@docker.com>
This commit is contained in:
parent
bde30191f4
commit
e620833c9e
5 changed files with 226 additions and 0 deletions
9
events/events.go
Normal file
9
events/events.go
Normal file
|
@ -0,0 +1,9 @@
|
|||
package events
|
||||
|
||||
type Event interface{}
|
||||
|
||||
type Envelope struct {
|
||||
Tx int64 `json:",omitempty"`
|
||||
Topic string
|
||||
Event interface{}
|
||||
}
|
33
events/events_test.go
Normal file
33
events/events_test.go
Normal file
|
@ -0,0 +1,33 @@
|
|||
package events
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestBasicEvent(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
// simulate a layer pull with events
|
||||
ctx, commit, _ := WithTx(ctx)
|
||||
|
||||
G(ctx).Post("pull ubuntu")
|
||||
|
||||
for layer := 0; layer < 4; layer++ {
|
||||
// make a subtransaction for each layer
|
||||
ctx, commit, _ := WithTx(ctx)
|
||||
|
||||
G(ctx).Post(fmt.Sprintf("fetch layer %v", layer))
|
||||
|
||||
ctx = WithTopic(ctx, "content")
|
||||
// simulate sub-operations with a separate topic, on the content store
|
||||
G(ctx).Post(fmt.Sprintf("received sha:256"))
|
||||
|
||||
G(ctx).Post(fmt.Sprintf("unpack layer %v", layer))
|
||||
|
||||
commit()
|
||||
}
|
||||
|
||||
commit()
|
||||
}
|
55
events/poster.go
Normal file
55
events/poster.go
Normal file
|
@ -0,0 +1,55 @@
|
|||
package events
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/docker/containerd/log"
|
||||
)
|
||||
|
||||
var (
|
||||
G = GetPoster
|
||||
)
|
||||
|
||||
// Poster posts the event.
|
||||
type Poster interface {
|
||||
Post(event Event)
|
||||
}
|
||||
|
||||
type posterKey struct{}
|
||||
|
||||
func GetPoster(ctx context.Context) Poster {
|
||||
poster := ctx.Value(ctx)
|
||||
if poster == nil {
|
||||
logger := log.G(ctx)
|
||||
tx, _ := getTx(ctx)
|
||||
topic := getTopic(ctx)
|
||||
|
||||
// likely means we don't have a configured event system. Just return
|
||||
// the default poster, which merely logs events.
|
||||
return posterFunc(func(event Event) {
|
||||
fields := logrus.Fields{"event": event}
|
||||
|
||||
if topic != "" {
|
||||
fields["topic"] = topic
|
||||
}
|
||||
|
||||
if tx != nil {
|
||||
fields["tx.id"] = tx.id
|
||||
if tx.parent != nil {
|
||||
fields["tx.parent.id"] = tx.parent.id
|
||||
}
|
||||
}
|
||||
|
||||
logger.WithFields(fields).Info("event posted")
|
||||
})
|
||||
}
|
||||
|
||||
return poster.(Poster)
|
||||
}
|
||||
|
||||
type posterFunc func(event Event)
|
||||
|
||||
func (fn posterFunc) Post(event Event) {
|
||||
fn(event)
|
||||
}
|
35
events/topic.go
Normal file
35
events/topic.go
Normal file
|
@ -0,0 +1,35 @@
|
|||
package events
|
||||
|
||||
import "context"
|
||||
|
||||
type topicKey struct{}
|
||||
|
||||
// WithTopic returns a context with a new topic set, such that events emmited
|
||||
// from the resulting context will be marked with the topic.
|
||||
//
|
||||
// A topic groups events by the target module they operate on. This is
|
||||
// primarily designed to support multi-module log compaction of events. In
|
||||
// typical journaling systems, the entries operate on a single data structure.
|
||||
// When compacting the journal, we can replace all former log entries with a
|
||||
// summary data structure that will result in the same state.
|
||||
//
|
||||
// By providing a compaction mechansim by topic, we can prune down to a data
|
||||
// structure oriented towards a single topic, leaving unrelated messages alone.
|
||||
func WithTopic(ctx context.Context, topic string) context.Context {
|
||||
return context.WithValue(ctx, topicKey{}, topic)
|
||||
}
|
||||
|
||||
func getTopic(ctx context.Context) string {
|
||||
topic := ctx.Value(topicKey{})
|
||||
|
||||
if topic == nil {
|
||||
return ""
|
||||
}
|
||||
|
||||
return topic.(string)
|
||||
}
|
||||
|
||||
// RegisterCompactor sets the compacter for the given topic.
|
||||
func RegisterCompactor(topic string, compactor interface{}) {
|
||||
panic("not implemented")
|
||||
}
|
94
events/transaction.go
Normal file
94
events/transaction.go
Normal file
|
@ -0,0 +1,94 @@
|
|||
package events
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
var txCounter int64 // replace this with something that won't break
|
||||
|
||||
// nextTXID provides the next transaction identifier.
|
||||
func nexttxID() int64 {
|
||||
// TODO(stevvooe): Need to coordinate this with existing transaction logs.
|
||||
// For now, this is a toy, but not a racy one.
|
||||
return atomic.AddInt64(&txCounter, 1)
|
||||
}
|
||||
|
||||
type transaction struct {
|
||||
id int64
|
||||
parent *transaction // if nil, no parent transaction
|
||||
finish sync.Once
|
||||
start, end time.Time // informational
|
||||
}
|
||||
|
||||
// begin creates a sub-transaction.
|
||||
func (tx *transaction) begin(poster Poster) *transaction {
|
||||
id := nexttxID()
|
||||
|
||||
child := &transaction{
|
||||
id: id,
|
||||
parent: tx,
|
||||
start: time.Now(),
|
||||
}
|
||||
|
||||
// post the transaction started event
|
||||
poster.Post(child.makeTransactionEvent("begin")) // tranactions are really just events
|
||||
|
||||
return child
|
||||
}
|
||||
|
||||
// commit writes out the transaction.
|
||||
func (tx *transaction) commit(poster Poster) {
|
||||
tx.finish.Do(func() {
|
||||
tx.end = time.Now()
|
||||
poster.Post(tx.makeTransactionEvent("commit"))
|
||||
})
|
||||
}
|
||||
|
||||
func (tx *transaction) rollback(poster Poster, cause error) {
|
||||
tx.finish.Do(func() {
|
||||
tx.end = time.Now()
|
||||
event := tx.makeTransactionEvent("rollback")
|
||||
event = fmt.Sprintf("%s error=%q", event, cause.Error())
|
||||
poster.Post(event)
|
||||
})
|
||||
}
|
||||
|
||||
func (tx *transaction) makeTransactionEvent(status string) Event {
|
||||
// TODO(stevvooe): obviously, we need more structure than this.
|
||||
event := fmt.Sprintf("%v %v", status, tx.id)
|
||||
if tx.parent != nil {
|
||||
event += " parent=" + fmt.Sprint(tx.parent.id)
|
||||
}
|
||||
|
||||
return event
|
||||
}
|
||||
|
||||
type txKey struct{}
|
||||
|
||||
func getTx(ctx context.Context) (*transaction, bool) {
|
||||
tx := ctx.Value(txKey{})
|
||||
if tx == nil {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
return tx.(*transaction), true
|
||||
}
|
||||
|
||||
// WithTx returns a new context with an event transaction, such that events
|
||||
// posted to the underlying context will be committed to the event log as a
|
||||
// group, organized by a transaction id, when commit is called.
|
||||
func WithTx(pctx context.Context) (ctx context.Context, commit func(), rollback func(err error)) {
|
||||
poster := G(pctx)
|
||||
parent, _ := getTx(pctx)
|
||||
tx := parent.begin(poster)
|
||||
|
||||
return context.WithValue(pctx, txKey{}, tx), func() {
|
||||
tx.commit(poster)
|
||||
}, func(err error) {
|
||||
tx.rollback(poster, err)
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue