62 lines
1.2 KiB
Go
62 lines
1.2 KiB
Go
|
package events
|
||
|
|
||
|
import (
|
||
|
"fmt"
|
||
|
"sync"
|
||
|
)
|
||
|
|
||
|
// Channel provides a sink that can be listened on. The writer and channel
|
||
|
// listener must operate in separate goroutines.
|
||
|
//
|
||
|
// Consumers should listen on Channel.C until Closed is closed.
|
||
|
type Channel struct {
|
||
|
C chan Event
|
||
|
|
||
|
closed chan struct{}
|
||
|
once sync.Once
|
||
|
}
|
||
|
|
||
|
// NewChannel returns a channel. If buffer is zero, the channel is
|
||
|
// unbuffered.
|
||
|
func NewChannel(buffer int) *Channel {
|
||
|
return &Channel{
|
||
|
C: make(chan Event, buffer),
|
||
|
closed: make(chan struct{}),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Done returns a channel that will always proceed once the sink is closed.
|
||
|
func (ch *Channel) Done() chan struct{} {
|
||
|
return ch.closed
|
||
|
}
|
||
|
|
||
|
// Write the event to the channel. Must be called in a separate goroutine from
|
||
|
// the listener.
|
||
|
func (ch *Channel) Write(event Event) error {
|
||
|
select {
|
||
|
case ch.C <- event:
|
||
|
return nil
|
||
|
case <-ch.closed:
|
||
|
return ErrSinkClosed
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Close the channel sink.
|
||
|
func (ch *Channel) Close() error {
|
||
|
ch.once.Do(func() {
|
||
|
close(ch.closed)
|
||
|
})
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (ch *Channel) String() string {
|
||
|
// Serialize a copy of the Channel that doesn't contain the sync.Once,
|
||
|
// to avoid a data race.
|
||
|
ch2 := map[string]interface{}{
|
||
|
"C": ch.C,
|
||
|
"closed": ch.closed,
|
||
|
}
|
||
|
return fmt.Sprint(ch2)
|
||
|
}
|