package notifications

import (
	"container/list"
	"fmt"
	"sync"
	"time"

	"github.com/Sirupsen/logrus"
)

// NOTE(stevvooe): This file contains definitions for several utility sinks.
// Typically, the broadcaster is the only sink that should be required
// externally, but others are suitable for export if the need arises. Albeit,
// the tight integration with endpoint metrics should be removed.

// Broadcaster sends events to multiple, reliable Sinks. The goal of this
// component is to dispatch events to configured endpoints. Reliability can be
// provided by wrapping incoming sinks.
type Broadcaster struct {
	sinks  []Sink
	events chan []Event
	closed chan chan struct{}
}

// NewBroadcaster ...
// Add appends one or more sinks to the list of sinks. The broadcaster
// behavior will be affected by the properties of the sink. Generally, the
// sink should accept all messages and deal with reliability on its own. Use
// of EventQueue and RetryingSink should be used here.
func NewBroadcaster(sinks ...Sink) *Broadcaster {
	b := Broadcaster{
		sinks:  sinks,
		events: make(chan []Event),
		closed: make(chan chan struct{}),
	}

	// Start the broadcaster
	go b.run()

	return &b
}

// Write accepts a block of events to be dispatched to all sinks. This method
// will never fail and should never block (hopefully!). The caller cedes the
// slice memory to the broadcaster and should not modify it after calling
// write.
func (b *Broadcaster) Write(events ...Event) error {
	select {
	case b.events <- events:
	case <-b.closed:
		return ErrSinkClosed
	}
	return nil
}

// Close the broadcaster, ensuring that all messages are flushed to the
// underlying sink before returning.
func (b *Broadcaster) Close() error {
	logrus.Infof("broadcaster: closing")
	select {
	case <-b.closed:
		// already closed
		return fmt.Errorf("broadcaster: already closed")
	default:
		// do a little chan handoff dance to synchronize closing
		closed := make(chan struct{})
		b.closed <- closed
		close(b.closed)
		<-closed
		return nil
	}
}

// run is the main broadcast loop, started when the broadcaster is created.
// Under normal conditions, it waits for events on the event channel. After
// Close is called, this goroutine will exit.
func (b *Broadcaster) run() {
	for {
		select {
		case block := <-b.events:
			for _, sink := range b.sinks {
				if err := sink.Write(block...); err != nil {
					logrus.Errorf("broadcaster: error writing events to %v, these events will be lost: %v", sink, err)
				}
			}
		case closing := <-b.closed:

			// close all the underlying sinks
			for _, sink := range b.sinks {
				if err := sink.Close(); err != nil {
					logrus.Errorf("broadcaster: error closing sink %v: %v", sink, err)
				}
			}
			closing <- struct{}{}

			logrus.Debugf("broadcaster: closed")
			return
		}
	}
}

// eventQueue accepts all messages into a queue for asynchronous consumption
// by a sink. It is unbounded and thread safe but the sink must be reliable or
// events will be dropped.
type eventQueue struct {
	sink      Sink
	events    *list.List
	listeners []eventQueueListener
	cond      *sync.Cond
	mu        sync.Mutex
	closed    bool
}

// eventQueueListener is called when various events happen on the queue.
type eventQueueListener interface {
	ingress(events ...Event)
	egress(events ...Event)
}

// newEventQueue returns a queue to the provided sink. If the updater is non-
// nil, it will be called to update pending metrics on ingress and egress.
func newEventQueue(sink Sink, listeners ...eventQueueListener) *eventQueue {
	eq := eventQueue{
		sink:      sink,
		events:    list.New(),
		listeners: listeners,
	}

	eq.cond = sync.NewCond(&eq.mu)
	go eq.run()
	return &eq
}

// Write accepts the events into the queue, only failing if the queue has
// beend closed.
func (eq *eventQueue) Write(events ...Event) error {
	eq.mu.Lock()
	defer eq.mu.Unlock()

	if eq.closed {
		return ErrSinkClosed
	}

	for _, listener := range eq.listeners {
		listener.ingress(events...)
	}
	eq.events.PushBack(events)
	eq.cond.Signal() // signal waiters

	return nil
}

// Close shutsdown the event queue, flushing
func (eq *eventQueue) Close() error {
	eq.mu.Lock()
	defer eq.mu.Unlock()

	if eq.closed {
		return fmt.Errorf("eventqueue: already closed")
	}

	// set closed flag
	eq.closed = true
	eq.cond.Signal() // signal flushes queue
	eq.cond.Wait()   // wait for signal from last flush

	return eq.sink.Close()
}

// run is the main goroutine to flush events to the target sink.
func (eq *eventQueue) run() {
	for {
		block := eq.next()

		if block == nil {
			return // nil block means event queue is closed.
		}

		if err := eq.sink.Write(block...); err != nil {
			logrus.Warnf("eventqueue: error writing events to %v, these events will be lost: %v", eq.sink, err)
		}

		for _, listener := range eq.listeners {
			listener.egress(block...)
		}
	}
}

// next encompasses the critical section of the run loop. When the queue is
// empty, it will block on the condition. If new data arrives, it will wake
// and return a block. When closed, a nil slice will be returned.
func (eq *eventQueue) next() []Event {
	eq.mu.Lock()
	defer eq.mu.Unlock()

	for eq.events.Len() < 1 {
		if eq.closed {
			eq.cond.Broadcast()
			return nil
		}

		eq.cond.Wait()
	}

	front := eq.events.Front()
	block := front.Value.([]Event)
	eq.events.Remove(front)

	return block
}

// retryingSink retries the write until success or an ErrSinkClosed is
// returned. Underlying sink must have p > 0 of succeeding or the sink will
// block. Internally, it is a circuit breaker retries to manage reset.
// Concurrent calls to a retrying sink are serialized through the sink,
// meaning that if one is in-flight, another will not proceed.
type retryingSink struct {
	mu     sync.Mutex
	sink   Sink
	closed bool

	// circuit breaker heuristics
	failures struct {
		threshold int
		recent    int
		last      time.Time
		backoff   time.Duration // time after which we retry after failure.
	}
}

type retryingSinkListener interface {
	active(events ...Event)
	retry(events ...Event)
}

// TODO(stevvooe): We are using circuit break here, which actually doesn't
// make a whole lot of sense for this use case, since we always retry. Move
// this to use bounded exponential backoff.

// newRetryingSink returns a sink that will retry writes to a sink, backing
// off on failure. Parameters threshold and backoff adjust the behavior of the
// circuit breaker.
func newRetryingSink(sink Sink, threshold int, backoff time.Duration) *retryingSink {
	rs := &retryingSink{
		sink: sink,
	}
	rs.failures.threshold = threshold
	rs.failures.backoff = backoff

	return rs
}

// Write attempts to flush the events to the downstream sink until it succeeds
// or the sink is closed.
func (rs *retryingSink) Write(events ...Event) error {
	rs.mu.Lock()
	defer rs.mu.Unlock()

retry:

	if rs.closed {
		return ErrSinkClosed
	}

	if !rs.proceed() {
		logrus.Warnf("%v encountered too many errors, backing off", rs.sink)
		rs.wait(rs.failures.backoff)
		goto retry
	}

	if err := rs.write(events...); err != nil {
		if err == ErrSinkClosed {
			// terminal!
			return err
		}

		logrus.Errorf("retryingsink: error writing events: %v, retrying", err)
		goto retry
	}

	return nil
}

// Close closes the sink and the underlying sink.
func (rs *retryingSink) Close() error {
	rs.mu.Lock()
	defer rs.mu.Unlock()

	if rs.closed {
		return fmt.Errorf("retryingsink: already closed")
	}

	rs.closed = true
	return rs.sink.Close()
}

// write provides a helper that dispatches failure and success properly. Used
// by write as the single-flight write call.
func (rs *retryingSink) write(events ...Event) error {
	if err := rs.sink.Write(events...); err != nil {
		rs.failure()
		return err
	}

	rs.reset()
	return nil
}

// wait backoff time against the sink, unlocking so others can proceed. Should
// only be called by methods that currently have the mutex.
func (rs *retryingSink) wait(backoff time.Duration) {
	rs.mu.Unlock()
	defer rs.mu.Lock()

	// backoff here
	time.Sleep(backoff)
}

// reset marks a successful call.
func (rs *retryingSink) reset() {
	rs.failures.recent = 0
	rs.failures.last = time.Time{}
}

// failure records a failure.
func (rs *retryingSink) failure() {
	rs.failures.recent++
	rs.failures.last = time.Now().UTC()
}

// proceed returns true if the call should proceed based on circuit breaker
// heuristics.
func (rs *retryingSink) proceed() bool {
	return rs.failures.recent < rs.failures.threshold ||
		time.Now().UTC().After(rs.failures.last.Add(rs.failures.backoff))
}