*: initial update to kube 1.8
Signed-off-by: Antonio Murdaca <runcom@redhat.com>
This commit is contained in:
parent
2453222695
commit
d6e819133d
1237 changed files with 84117 additions and 564982 deletions
5
vendor/k8s.io/client-go/tools/cache/controller.go
generated
vendored
5
vendor/k8s.io/client-go/tools/cache/controller.go
generated
vendored
|
@ -116,7 +116,10 @@ func (c *controller) Run(stopCh <-chan struct{}) {
|
|||
c.reflector = r
|
||||
c.reflectorMutex.Unlock()
|
||||
|
||||
r.RunUntil(stopCh)
|
||||
var wg wait.Group
|
||||
defer wg.Wait()
|
||||
|
||||
wg.StartWithChannel(stopCh, r.Run)
|
||||
|
||||
wait.Until(c.processLoop, time.Second, stopCh)
|
||||
}
|
||||
|
|
323
vendor/k8s.io/client-go/tools/cache/heap.go
generated
vendored
Normal file
323
vendor/k8s.io/client-go/tools/cache/heap.go
generated
vendored
Normal file
|
@ -0,0 +1,323 @@
|
|||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// This file implements a heap data structure.
|
||||
|
||||
package cache
|
||||
|
||||
import (
|
||||
"container/heap"
|
||||
"fmt"
|
||||
"sync"
|
||||
)
|
||||
|
||||
const (
|
||||
closedMsg = "heap is closed"
|
||||
)
|
||||
|
||||
type LessFunc func(interface{}, interface{}) bool
|
||||
type heapItem struct {
|
||||
obj interface{} // The object which is stored in the heap.
|
||||
index int // The index of the object's key in the Heap.queue.
|
||||
}
|
||||
|
||||
type itemKeyValue struct {
|
||||
key string
|
||||
obj interface{}
|
||||
}
|
||||
|
||||
// heapData is an internal struct that implements the standard heap interface
|
||||
// and keeps the data stored in the heap.
|
||||
type heapData struct {
|
||||
// items is a map from key of the objects to the objects and their index.
|
||||
// We depend on the property that items in the map are in the queue and vice versa.
|
||||
items map[string]*heapItem
|
||||
// queue implements a heap data structure and keeps the order of elements
|
||||
// according to the heap invariant. The queue keeps the keys of objects stored
|
||||
// in "items".
|
||||
queue []string
|
||||
|
||||
// keyFunc is used to make the key used for queued item insertion and retrieval, and
|
||||
// should be deterministic.
|
||||
keyFunc KeyFunc
|
||||
// lessFunc is used to compare two objects in the heap.
|
||||
lessFunc LessFunc
|
||||
}
|
||||
|
||||
var (
|
||||
_ = heap.Interface(&heapData{}) // heapData is a standard heap
|
||||
)
|
||||
|
||||
// Less compares two objects and returns true if the first one should go
|
||||
// in front of the second one in the heap.
|
||||
func (h *heapData) Less(i, j int) bool {
|
||||
if i > len(h.queue) || j > len(h.queue) {
|
||||
return false
|
||||
}
|
||||
itemi, ok := h.items[h.queue[i]]
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
itemj, ok := h.items[h.queue[j]]
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
return h.lessFunc(itemi.obj, itemj.obj)
|
||||
}
|
||||
|
||||
// Len returns the number of items in the Heap.
|
||||
func (h *heapData) Len() int { return len(h.queue) }
|
||||
|
||||
// Swap implements swapping of two elements in the heap. This is a part of standard
|
||||
// heap interface and should never be called directly.
|
||||
func (h *heapData) Swap(i, j int) {
|
||||
h.queue[i], h.queue[j] = h.queue[j], h.queue[i]
|
||||
item := h.items[h.queue[i]]
|
||||
item.index = i
|
||||
item = h.items[h.queue[j]]
|
||||
item.index = j
|
||||
}
|
||||
|
||||
// Push is supposed to be called by heap.Push only.
|
||||
func (h *heapData) Push(kv interface{}) {
|
||||
keyValue := kv.(*itemKeyValue)
|
||||
n := len(h.queue)
|
||||
h.items[keyValue.key] = &heapItem{keyValue.obj, n}
|
||||
h.queue = append(h.queue, keyValue.key)
|
||||
}
|
||||
|
||||
// Pop is supposed to be called by heap.Pop only.
|
||||
func (h *heapData) Pop() interface{} {
|
||||
key := h.queue[len(h.queue)-1]
|
||||
h.queue = h.queue[0 : len(h.queue)-1]
|
||||
item, ok := h.items[key]
|
||||
if !ok {
|
||||
// This is an error
|
||||
return nil
|
||||
}
|
||||
delete(h.items, key)
|
||||
return item.obj
|
||||
}
|
||||
|
||||
// Heap is a thread-safe producer/consumer queue that implements a heap data structure.
|
||||
// It can be used to implement priority queues and similar data structures.
|
||||
type Heap struct {
|
||||
lock sync.RWMutex
|
||||
cond sync.Cond
|
||||
|
||||
// data stores objects and has a queue that keeps their ordering according
|
||||
// to the heap invariant.
|
||||
data *heapData
|
||||
|
||||
// closed indicates that the queue is closed.
|
||||
// It is mainly used to let Pop() exit its control loop while waiting for an item.
|
||||
closed bool
|
||||
}
|
||||
|
||||
// Close the Heap and signals condition variables that may be waiting to pop
|
||||
// items from the heap.
|
||||
func (h *Heap) Close() {
|
||||
h.lock.Lock()
|
||||
defer h.lock.Unlock()
|
||||
h.closed = true
|
||||
h.cond.Broadcast()
|
||||
}
|
||||
|
||||
// Add inserts an item, and puts it in the queue. The item is updated if it
|
||||
// already exists.
|
||||
func (h *Heap) Add(obj interface{}) error {
|
||||
key, err := h.data.keyFunc(obj)
|
||||
if err != nil {
|
||||
return KeyError{obj, err}
|
||||
}
|
||||
h.lock.Lock()
|
||||
defer h.lock.Unlock()
|
||||
if h.closed {
|
||||
return fmt.Errorf(closedMsg)
|
||||
}
|
||||
if _, exists := h.data.items[key]; exists {
|
||||
h.data.items[key].obj = obj
|
||||
heap.Fix(h.data, h.data.items[key].index)
|
||||
} else {
|
||||
h.addIfNotPresentLocked(key, obj)
|
||||
}
|
||||
h.cond.Broadcast()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Adds all the items in the list to the queue and then signals the condition
|
||||
// variable. It is useful when the caller would like to add all of the items
|
||||
// to the queue before consumer starts processing them.
|
||||
func (h *Heap) BulkAdd(list []interface{}) error {
|
||||
h.lock.Lock()
|
||||
defer h.lock.Unlock()
|
||||
if h.closed {
|
||||
return fmt.Errorf(closedMsg)
|
||||
}
|
||||
for _, obj := range list {
|
||||
key, err := h.data.keyFunc(obj)
|
||||
if err != nil {
|
||||
return KeyError{obj, err}
|
||||
}
|
||||
if _, exists := h.data.items[key]; exists {
|
||||
h.data.items[key].obj = obj
|
||||
heap.Fix(h.data, h.data.items[key].index)
|
||||
} else {
|
||||
h.addIfNotPresentLocked(key, obj)
|
||||
}
|
||||
}
|
||||
h.cond.Broadcast()
|
||||
return nil
|
||||
}
|
||||
|
||||
// AddIfNotPresent inserts an item, and puts it in the queue. If an item with
|
||||
// the key is present in the map, no changes is made to the item.
|
||||
//
|
||||
// This is useful in a single producer/consumer scenario so that the consumer can
|
||||
// safely retry items without contending with the producer and potentially enqueueing
|
||||
// stale items.
|
||||
func (h *Heap) AddIfNotPresent(obj interface{}) error {
|
||||
id, err := h.data.keyFunc(obj)
|
||||
if err != nil {
|
||||
return KeyError{obj, err}
|
||||
}
|
||||
h.lock.Lock()
|
||||
defer h.lock.Unlock()
|
||||
if h.closed {
|
||||
return fmt.Errorf(closedMsg)
|
||||
}
|
||||
h.addIfNotPresentLocked(id, obj)
|
||||
h.cond.Broadcast()
|
||||
return nil
|
||||
}
|
||||
|
||||
// addIfNotPresentLocked assumes the lock is already held and adds the the provided
|
||||
// item to the queue if it does not already exist.
|
||||
func (h *Heap) addIfNotPresentLocked(key string, obj interface{}) {
|
||||
if _, exists := h.data.items[key]; exists {
|
||||
return
|
||||
}
|
||||
heap.Push(h.data, &itemKeyValue{key, obj})
|
||||
}
|
||||
|
||||
// Update is the same as Add in this implementation. When the item does not
|
||||
// exist, it is added.
|
||||
func (h *Heap) Update(obj interface{}) error {
|
||||
return h.Add(obj)
|
||||
}
|
||||
|
||||
// Delete removes an item.
|
||||
func (h *Heap) Delete(obj interface{}) error {
|
||||
key, err := h.data.keyFunc(obj)
|
||||
if err != nil {
|
||||
return KeyError{obj, err}
|
||||
}
|
||||
h.lock.Lock()
|
||||
defer h.lock.Unlock()
|
||||
if item, ok := h.data.items[key]; ok {
|
||||
heap.Remove(h.data, item.index)
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("object not found")
|
||||
}
|
||||
|
||||
// Pop waits until an item is ready. If multiple items are
|
||||
// ready, they are returned in the order given by Heap.data.lessFunc.
|
||||
func (h *Heap) Pop() (interface{}, error) {
|
||||
h.lock.Lock()
|
||||
defer h.lock.Unlock()
|
||||
for len(h.data.queue) == 0 {
|
||||
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
|
||||
// When Close() is called, the h.closed is set and the condition is broadcast,
|
||||
// which causes this loop to continue and return from the Pop().
|
||||
if h.closed {
|
||||
return nil, fmt.Errorf("heap is closed")
|
||||
}
|
||||
h.cond.Wait()
|
||||
}
|
||||
obj := heap.Pop(h.data)
|
||||
if obj != nil {
|
||||
return obj, nil
|
||||
} else {
|
||||
return nil, fmt.Errorf("object was removed from heap data")
|
||||
}
|
||||
}
|
||||
|
||||
// List returns a list of all the items.
|
||||
func (h *Heap) List() []interface{} {
|
||||
h.lock.RLock()
|
||||
defer h.lock.RUnlock()
|
||||
list := make([]interface{}, 0, len(h.data.items))
|
||||
for _, item := range h.data.items {
|
||||
list = append(list, item.obj)
|
||||
}
|
||||
return list
|
||||
}
|
||||
|
||||
// ListKeys returns a list of all the keys of the objects currently in the Heap.
|
||||
func (h *Heap) ListKeys() []string {
|
||||
h.lock.RLock()
|
||||
defer h.lock.RUnlock()
|
||||
list := make([]string, 0, len(h.data.items))
|
||||
for key := range h.data.items {
|
||||
list = append(list, key)
|
||||
}
|
||||
return list
|
||||
}
|
||||
|
||||
// Get returns the requested item, or sets exists=false.
|
||||
func (h *Heap) Get(obj interface{}) (interface{}, bool, error) {
|
||||
key, err := h.data.keyFunc(obj)
|
||||
if err != nil {
|
||||
return nil, false, KeyError{obj, err}
|
||||
}
|
||||
return h.GetByKey(key)
|
||||
}
|
||||
|
||||
// GetByKey returns the requested item, or sets exists=false.
|
||||
func (h *Heap) GetByKey(key string) (interface{}, bool, error) {
|
||||
h.lock.RLock()
|
||||
defer h.lock.RUnlock()
|
||||
item, exists := h.data.items[key]
|
||||
if !exists {
|
||||
return nil, false, nil
|
||||
}
|
||||
return item.obj, true, nil
|
||||
}
|
||||
|
||||
// IsClosed returns true if the queue is closed.
|
||||
func (h *Heap) IsClosed() bool {
|
||||
h.lock.RLock()
|
||||
defer h.lock.RUnlock()
|
||||
if h.closed {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// NewHeap returns a Heap which can be used to queue up items to process.
|
||||
func NewHeap(keyFn KeyFunc, lessFn LessFunc) *Heap {
|
||||
h := &Heap{
|
||||
data: &heapData{
|
||||
items: map[string]*heapItem{},
|
||||
queue: []string{},
|
||||
keyFunc: keyFn,
|
||||
lessFunc: lessFn,
|
||||
},
|
||||
}
|
||||
h.cond.L = &h.lock
|
||||
return h
|
||||
}
|
15
vendor/k8s.io/client-go/tools/cache/listwatch.go
generated
vendored
15
vendor/k8s.io/client-go/tools/cache/listwatch.go
generated
vendored
|
@ -19,12 +19,15 @@ package cache
|
|||
import (
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/pager"
|
||||
)
|
||||
|
||||
// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
|
||||
|
@ -48,6 +51,9 @@ type WatchFunc func(options metav1.ListOptions) (watch.Interface, error)
|
|||
type ListWatch struct {
|
||||
ListFunc ListFunc
|
||||
WatchFunc WatchFunc
|
||||
// DisableChunking requests no chunking for this list watcher. It has no effect in Kubernetes 1.8, but in
|
||||
// 1.9 will allow a controller to opt out of chunking.
|
||||
DisableChunking bool
|
||||
}
|
||||
|
||||
// Getter interface knows how to access Get method from RESTClient.
|
||||
|
@ -58,21 +64,21 @@ type Getter interface {
|
|||
// NewListWatchFromClient creates a new ListWatch from the specified client, resource, namespace and field selector.
|
||||
func NewListWatchFromClient(c Getter, resource string, namespace string, fieldSelector fields.Selector) *ListWatch {
|
||||
listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
|
||||
options.FieldSelector = fieldSelector.String()
|
||||
return c.Get().
|
||||
Namespace(namespace).
|
||||
Resource(resource).
|
||||
VersionedParams(&options, metav1.ParameterCodec).
|
||||
FieldsSelectorParam(fieldSelector).
|
||||
Do().
|
||||
Get()
|
||||
}
|
||||
watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
|
||||
options.Watch = true
|
||||
options.FieldSelector = fieldSelector.String()
|
||||
return c.Get().
|
||||
Namespace(namespace).
|
||||
Resource(resource).
|
||||
VersionedParams(&options, metav1.ParameterCodec).
|
||||
FieldsSelectorParam(fieldSelector).
|
||||
Watch()
|
||||
}
|
||||
return &ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
|
||||
|
@ -87,6 +93,11 @@ func timeoutFromListOptions(options metav1.ListOptions) time.Duration {
|
|||
|
||||
// List a set of apiserver resources
|
||||
func (lw *ListWatch) List(options metav1.ListOptions) (runtime.Object, error) {
|
||||
// chunking will become the default for list watchers starting in Kubernetes 1.9, unless
|
||||
// otherwise disabled.
|
||||
if false && !lw.DisableChunking {
|
||||
return pager.New(pager.SimplePageFunc(lw.ListFunc)).List(context.TODO(), options)
|
||||
}
|
||||
return lw.ListFunc(options)
|
||||
}
|
||||
|
||||
|
|
2
vendor/k8s.io/client-go/tools/cache/mutation_cache.go
generated
vendored
2
vendor/k8s.io/client-go/tools/cache/mutation_cache.go
generated
vendored
|
@ -156,7 +156,7 @@ func (c *mutationCache) ByIndex(name string, indexKey string) ([]interface{}, er
|
|||
}
|
||||
elements, err := fn(updated)
|
||||
if err != nil {
|
||||
glog.V(4).Info("Unable to calculate an index entry for mutation cache entry %s: %v", key, err)
|
||||
glog.V(4).Infof("Unable to calculate an index entry for mutation cache entry %s: %v", key, err)
|
||||
continue
|
||||
}
|
||||
for _, inIndex := range elements {
|
||||
|
|
16
vendor/k8s.io/client-go/tools/cache/mutation_detector.go
generated
vendored
16
vendor/k8s.io/client-go/tools/cache/mutation_detector.go
generated
vendored
|
@ -79,17 +79,15 @@ type cacheObj struct {
|
|||
|
||||
func (d *defaultCacheMutationDetector) Run(stopCh <-chan struct{}) {
|
||||
// we DON'T want protection from panics. If we're running this code, we want to die
|
||||
go func() {
|
||||
for {
|
||||
d.CompareObjects()
|
||||
for {
|
||||
d.CompareObjects()
|
||||
|
||||
select {
|
||||
case <-stopCh:
|
||||
return
|
||||
case <-time.After(d.period):
|
||||
}
|
||||
select {
|
||||
case <-stopCh:
|
||||
return
|
||||
case <-time.After(d.period):
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
// AddObject makes a deep copy of the object for later comparison. It only works on runtime.Object
|
||||
|
|
59
vendor/k8s.io/client-go/tools/cache/reflector.go
generated
vendored
59
vendor/k8s.io/client-go/tools/cache/reflector.go
generated
vendored
|
@ -30,6 +30,7 @@ import (
|
|||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
|
@ -48,6 +49,8 @@ import (
|
|||
type Reflector struct {
|
||||
// name identifies this reflector. By default it will be a file:line if possible.
|
||||
name string
|
||||
// metrics tracks basic metric information about the reflector
|
||||
metrics *reflectorMetrics
|
||||
|
||||
// The type of object we expect to place in the store.
|
||||
expectedType reflect.Type
|
||||
|
@ -96,10 +99,17 @@ func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyn
|
|||
return NewNamedReflector(getDefaultReflectorName(internalPackages...), lw, expectedType, store, resyncPeriod)
|
||||
}
|
||||
|
||||
// reflectorDisambiguator is used to disambiguate started reflectors.
|
||||
// initialized to an unstable value to ensure meaning isn't attributed to the suffix.
|
||||
var reflectorDisambiguator = int64(time.Now().UnixNano() % 12345)
|
||||
|
||||
// NewNamedReflector same as NewReflector, but with a specified name for logging
|
||||
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
|
||||
reflectorSuffix := atomic.AddInt64(&reflectorDisambiguator, 1)
|
||||
r := &Reflector{
|
||||
name: name,
|
||||
name: name,
|
||||
// we need this to be unique per process (some names are still the same)but obvious who it belongs to
|
||||
metrics: newReflectorMetrics(makeValidPromethusMetricName(fmt.Sprintf("reflector_"+name+"_%d", reflectorSuffix))),
|
||||
listerWatcher: lw,
|
||||
store: store,
|
||||
expectedType: reflect.TypeOf(expectedType),
|
||||
|
@ -110,6 +120,11 @@ func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{},
|
|||
return r
|
||||
}
|
||||
|
||||
func makeValidPromethusMetricName(in string) string {
|
||||
// this isn't perfect, but it removes our common characters
|
||||
return strings.NewReplacer("/", "_", ".", "_", "-", "_").Replace(in)
|
||||
}
|
||||
|
||||
// internalPackages are packages that ignored when creating a default reflector name. These packages are in the common
|
||||
// call chains to NewReflector, so they'd be low entropy names for reflectors
|
||||
var internalPackages = []string{"client-go/tools/cache/", "/runtime/asm_"}
|
||||
|
@ -182,21 +197,10 @@ func extractStackCreator() (string, int, bool) {
|
|||
}
|
||||
|
||||
// Run starts a watch and handles watch events. Will restart the watch if it is closed.
|
||||
// Run starts a goroutine and returns immediately.
|
||||
func (r *Reflector) Run() {
|
||||
// Run will exit when stopCh is closed.
|
||||
func (r *Reflector) Run(stopCh <-chan struct{}) {
|
||||
glog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name)
|
||||
go wait.Until(func() {
|
||||
if err := r.ListAndWatch(wait.NeverStop); err != nil {
|
||||
utilruntime.HandleError(err)
|
||||
}
|
||||
}, r.period, wait.NeverStop)
|
||||
}
|
||||
|
||||
// RunUntil starts a watch and handles watch events. Will restart the watch if it is closed.
|
||||
// RunUntil starts a goroutine and returns immediately. It will exit when stopCh is closed.
|
||||
func (r *Reflector) RunUntil(stopCh <-chan struct{}) {
|
||||
glog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name)
|
||||
go wait.Until(func() {
|
||||
wait.Until(func() {
|
||||
if err := r.ListAndWatch(stopCh); err != nil {
|
||||
utilruntime.HandleError(err)
|
||||
}
|
||||
|
@ -235,17 +239,18 @@ func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) {
|
|||
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
||||
glog.V(3).Infof("Listing and watching %v from %s", r.expectedType, r.name)
|
||||
var resourceVersion string
|
||||
resyncCh, cleanup := r.resyncChan()
|
||||
defer cleanup()
|
||||
|
||||
// Explicitly set "0" as resource version - it's fine for the List()
|
||||
// to be served from cache and potentially be delayed relative to
|
||||
// etcd contents. Reflector framework will catch up via Watch() eventually.
|
||||
options := metav1.ListOptions{ResourceVersion: "0"}
|
||||
r.metrics.numberOfLists.Inc()
|
||||
start := r.clock.Now()
|
||||
list, err := r.listerWatcher.List(options)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err)
|
||||
}
|
||||
r.metrics.listDuration.Observe(time.Since(start).Seconds())
|
||||
listMetaInterface, err := meta.ListAccessor(list)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)
|
||||
|
@ -255,6 +260,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
|||
if err != nil {
|
||||
return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)
|
||||
}
|
||||
r.metrics.numberOfItemsInList.Observe(float64(len(items)))
|
||||
if err := r.syncWith(items, resourceVersion); err != nil {
|
||||
return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
|
||||
}
|
||||
|
@ -264,6 +270,10 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
|||
cancelCh := make(chan struct{})
|
||||
defer close(cancelCh)
|
||||
go func() {
|
||||
resyncCh, cleanup := r.resyncChan()
|
||||
defer func() {
|
||||
cleanup() // Call the last one written into cleanup
|
||||
}()
|
||||
for {
|
||||
select {
|
||||
case <-resyncCh:
|
||||
|
@ -293,6 +303,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
|||
TimeoutSeconds: &timemoutseconds,
|
||||
}
|
||||
|
||||
r.metrics.numberOfWatches.Inc()
|
||||
w, err := r.listerWatcher.Watch(options)
|
||||
if err != nil {
|
||||
switch err {
|
||||
|
@ -344,6 +355,11 @@ func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, err
|
|||
// Stopping the watcher should be idempotent and if we return from this function there's no way
|
||||
// we're coming back in with the same watch interface.
|
||||
defer w.Stop()
|
||||
// update metrics
|
||||
defer func() {
|
||||
r.metrics.numberOfItemsInWatch.Observe(float64(eventCount))
|
||||
r.metrics.watchDuration.Observe(time.Since(start).Seconds())
|
||||
}()
|
||||
|
||||
loop:
|
||||
for {
|
||||
|
@ -399,8 +415,8 @@ loop:
|
|||
|
||||
watchDuration := r.clock.Now().Sub(start)
|
||||
if watchDuration < 1*time.Second && eventCount == 0 {
|
||||
glog.V(4).Infof("%s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
|
||||
return errors.New("very short watch")
|
||||
r.metrics.numberOfShortWatches.Inc()
|
||||
return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
|
||||
}
|
||||
glog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedType, eventCount)
|
||||
return nil
|
||||
|
@ -418,4 +434,9 @@ func (r *Reflector) setLastSyncResourceVersion(v string) {
|
|||
r.lastSyncResourceVersionMutex.Lock()
|
||||
defer r.lastSyncResourceVersionMutex.Unlock()
|
||||
r.lastSyncResourceVersion = v
|
||||
|
||||
rv, err := strconv.Atoi(v)
|
||||
if err == nil {
|
||||
r.metrics.lastResourceVersion.Set(float64(rv))
|
||||
}
|
||||
}
|
||||
|
|
119
vendor/k8s.io/client-go/tools/cache/reflector_metrics.go
generated
vendored
Normal file
119
vendor/k8s.io/client-go/tools/cache/reflector_metrics.go
generated
vendored
Normal file
|
@ -0,0 +1,119 @@
|
|||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// This file provides abstractions for setting the provider (e.g., prometheus)
|
||||
// of metrics.
|
||||
|
||||
package cache
|
||||
|
||||
import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
// GaugeMetric represents a single numerical value that can arbitrarily go up
|
||||
// and down.
|
||||
type GaugeMetric interface {
|
||||
Set(float64)
|
||||
}
|
||||
|
||||
// CounterMetric represents a single numerical value that only ever
|
||||
// goes up.
|
||||
type CounterMetric interface {
|
||||
Inc()
|
||||
}
|
||||
|
||||
// SummaryMetric captures individual observations.
|
||||
type SummaryMetric interface {
|
||||
Observe(float64)
|
||||
}
|
||||
|
||||
type noopMetric struct{}
|
||||
|
||||
func (noopMetric) Inc() {}
|
||||
func (noopMetric) Dec() {}
|
||||
func (noopMetric) Observe(float64) {}
|
||||
func (noopMetric) Set(float64) {}
|
||||
|
||||
type reflectorMetrics struct {
|
||||
numberOfLists CounterMetric
|
||||
listDuration SummaryMetric
|
||||
numberOfItemsInList SummaryMetric
|
||||
|
||||
numberOfWatches CounterMetric
|
||||
numberOfShortWatches CounterMetric
|
||||
watchDuration SummaryMetric
|
||||
numberOfItemsInWatch SummaryMetric
|
||||
|
||||
lastResourceVersion GaugeMetric
|
||||
}
|
||||
|
||||
// MetricsProvider generates various metrics used by the reflector.
|
||||
type MetricsProvider interface {
|
||||
NewListsMetric(name string) CounterMetric
|
||||
NewListDurationMetric(name string) SummaryMetric
|
||||
NewItemsInListMetric(name string) SummaryMetric
|
||||
|
||||
NewWatchesMetric(name string) CounterMetric
|
||||
NewShortWatchesMetric(name string) CounterMetric
|
||||
NewWatchDurationMetric(name string) SummaryMetric
|
||||
NewItemsInWatchMetric(name string) SummaryMetric
|
||||
|
||||
NewLastResourceVersionMetric(name string) GaugeMetric
|
||||
}
|
||||
|
||||
type noopMetricsProvider struct{}
|
||||
|
||||
func (noopMetricsProvider) NewListsMetric(name string) CounterMetric { return noopMetric{} }
|
||||
func (noopMetricsProvider) NewListDurationMetric(name string) SummaryMetric { return noopMetric{} }
|
||||
func (noopMetricsProvider) NewItemsInListMetric(name string) SummaryMetric { return noopMetric{} }
|
||||
func (noopMetricsProvider) NewWatchesMetric(name string) CounterMetric { return noopMetric{} }
|
||||
func (noopMetricsProvider) NewShortWatchesMetric(name string) CounterMetric { return noopMetric{} }
|
||||
func (noopMetricsProvider) NewWatchDurationMetric(name string) SummaryMetric { return noopMetric{} }
|
||||
func (noopMetricsProvider) NewItemsInWatchMetric(name string) SummaryMetric { return noopMetric{} }
|
||||
func (noopMetricsProvider) NewLastResourceVersionMetric(name string) GaugeMetric {
|
||||
return noopMetric{}
|
||||
}
|
||||
|
||||
var metricsFactory = struct {
|
||||
metricsProvider MetricsProvider
|
||||
setProviders sync.Once
|
||||
}{
|
||||
metricsProvider: noopMetricsProvider{},
|
||||
}
|
||||
|
||||
func newReflectorMetrics(name string) *reflectorMetrics {
|
||||
var ret *reflectorMetrics
|
||||
if len(name) == 0 {
|
||||
return ret
|
||||
}
|
||||
return &reflectorMetrics{
|
||||
numberOfLists: metricsFactory.metricsProvider.NewListsMetric(name),
|
||||
listDuration: metricsFactory.metricsProvider.NewListDurationMetric(name),
|
||||
numberOfItemsInList: metricsFactory.metricsProvider.NewItemsInListMetric(name),
|
||||
numberOfWatches: metricsFactory.metricsProvider.NewWatchesMetric(name),
|
||||
numberOfShortWatches: metricsFactory.metricsProvider.NewShortWatchesMetric(name),
|
||||
watchDuration: metricsFactory.metricsProvider.NewWatchDurationMetric(name),
|
||||
numberOfItemsInWatch: metricsFactory.metricsProvider.NewItemsInWatchMetric(name),
|
||||
lastResourceVersion: metricsFactory.metricsProvider.NewLastResourceVersionMetric(name),
|
||||
}
|
||||
}
|
||||
|
||||
// SetReflectorMetricsProvider sets the metrics provider
|
||||
func SetReflectorMetricsProvider(metricsProvider MetricsProvider) {
|
||||
metricsFactory.setProviders.Do(func() {
|
||||
metricsFactory.metricsProvider = metricsProvider
|
||||
})
|
||||
}
|
170
vendor/k8s.io/client-go/tools/cache/shared_informer.go
generated
vendored
170
vendor/k8s.io/client-go/tools/cache/shared_informer.go
generated
vendored
|
@ -138,15 +138,12 @@ type sharedIndexInformer struct {
|
|||
// clock allows for testability
|
||||
clock clock.Clock
|
||||
|
||||
started bool
|
||||
startedLock sync.Mutex
|
||||
started, stopped bool
|
||||
startedLock sync.Mutex
|
||||
|
||||
// blockDeltas gives a way to stop all event distribution so that a late event handler
|
||||
// can safely join the shared informer.
|
||||
blockDeltas sync.Mutex
|
||||
// stopCh is the channel used to stop the main Run process. We have to track it so that
|
||||
// late joiners can have a proper stop
|
||||
stopCh <-chan struct{}
|
||||
}
|
||||
|
||||
// dummyController hides the fact that a SharedInformer is different from a dedicated one
|
||||
|
@ -207,16 +204,20 @@ func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
|
|||
s.started = true
|
||||
}()
|
||||
|
||||
s.stopCh = stopCh
|
||||
s.cacheMutationDetector.Run(stopCh)
|
||||
s.processor.run(stopCh)
|
||||
s.controller.Run(stopCh)
|
||||
}
|
||||
// Separate stop channel because Processor should be stopped strictly after controller
|
||||
processorStopCh := make(chan struct{})
|
||||
var wg wait.Group
|
||||
defer wg.Wait() // Wait for Processor to stop
|
||||
defer close(processorStopCh) // Tell Processor to stop
|
||||
wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
|
||||
wg.StartWithChannel(processorStopCh, s.processor.run)
|
||||
|
||||
func (s *sharedIndexInformer) isStarted() bool {
|
||||
s.startedLock.Lock()
|
||||
defer s.startedLock.Unlock()
|
||||
return s.started
|
||||
defer func() {
|
||||
s.startedLock.Lock()
|
||||
defer s.startedLock.Unlock()
|
||||
s.stopped = true // Don't want any new listeners
|
||||
}()
|
||||
s.controller.Run(stopCh)
|
||||
}
|
||||
|
||||
func (s *sharedIndexInformer) HasSynced() bool {
|
||||
|
@ -287,6 +288,11 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv
|
|||
s.startedLock.Lock()
|
||||
defer s.startedLock.Unlock()
|
||||
|
||||
if s.stopped {
|
||||
glog.V(2).Infof("Handler %v was not added to shared informer because it has stopped already", handler)
|
||||
return
|
||||
}
|
||||
|
||||
if resyncPeriod > 0 {
|
||||
if resyncPeriod < minimumResyncPeriod {
|
||||
glog.Warningf("resyncPeriod %d is too small. Changing it to the minimum allowed value of %d", resyncPeriod, minimumResyncPeriod)
|
||||
|
@ -322,14 +328,9 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv
|
|||
s.blockDeltas.Lock()
|
||||
defer s.blockDeltas.Unlock()
|
||||
|
||||
s.processor.addListener(listener)
|
||||
|
||||
go listener.run(s.stopCh)
|
||||
go listener.pop(s.stopCh)
|
||||
|
||||
items := s.indexer.List()
|
||||
for i := range items {
|
||||
listener.add(addNotification{newObj: items[i]})
|
||||
s.processor.addAndStartListener(listener)
|
||||
for _, item := range s.indexer.List() {
|
||||
listener.add(addNotification{newObj: item})
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -369,12 +370,26 @@ type sharedProcessor struct {
|
|||
listeners []*processorListener
|
||||
syncingListeners []*processorListener
|
||||
clock clock.Clock
|
||||
wg wait.Group
|
||||
}
|
||||
|
||||
func (p *sharedProcessor) addAndStartListener(listener *processorListener) {
|
||||
p.listenersLock.Lock()
|
||||
defer p.listenersLock.Unlock()
|
||||
|
||||
p.addListenerLocked(listener)
|
||||
p.wg.Start(listener.run)
|
||||
p.wg.Start(listener.pop)
|
||||
}
|
||||
|
||||
func (p *sharedProcessor) addListener(listener *processorListener) {
|
||||
p.listenersLock.Lock()
|
||||
defer p.listenersLock.Unlock()
|
||||
|
||||
p.addListenerLocked(listener)
|
||||
}
|
||||
|
||||
func (p *sharedProcessor) addListenerLocked(listener *processorListener) {
|
||||
p.listeners = append(p.listeners, listener)
|
||||
p.syncingListeners = append(p.syncingListeners, listener)
|
||||
}
|
||||
|
@ -395,13 +410,21 @@ func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
|
|||
}
|
||||
|
||||
func (p *sharedProcessor) run(stopCh <-chan struct{}) {
|
||||
func() {
|
||||
p.listenersLock.RLock()
|
||||
defer p.listenersLock.RUnlock()
|
||||
for _, listener := range p.listeners {
|
||||
p.wg.Start(listener.run)
|
||||
p.wg.Start(listener.pop)
|
||||
}
|
||||
}()
|
||||
<-stopCh
|
||||
p.listenersLock.RLock()
|
||||
defer p.listenersLock.RUnlock()
|
||||
|
||||
for _, listener := range p.listeners {
|
||||
go listener.run(stopCh)
|
||||
go listener.pop(stopCh)
|
||||
close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
|
||||
}
|
||||
p.wg.Wait() // Wait for all .pop() and .run() to stop
|
||||
}
|
||||
|
||||
// shouldResync queries every listener to determine if any of them need a resync, based on each
|
||||
|
@ -437,18 +460,8 @@ func (p *sharedProcessor) resyncCheckPeriodChanged(resyncCheckPeriod time.Durati
|
|||
}
|
||||
|
||||
type processorListener struct {
|
||||
// lock/cond protects access to 'pendingNotifications'.
|
||||
lock sync.RWMutex
|
||||
cond sync.Cond
|
||||
|
||||
// pendingNotifications is an unbounded slice that holds all notifications not yet distributed
|
||||
// there is one per listener, but a failing/stalled listener will have infinite pendingNotifications
|
||||
// added until we OOM.
|
||||
// TODO This is no worse that before, since reflectors were backed by unbounded DeltaFIFOs, but
|
||||
// we should try to do something better
|
||||
pendingNotifications []interface{}
|
||||
|
||||
nextCh chan interface{}
|
||||
addCh chan interface{}
|
||||
|
||||
handler ResourceEventHandler
|
||||
|
||||
|
@ -466,80 +479,65 @@ type processorListener struct {
|
|||
|
||||
func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time) *processorListener {
|
||||
ret := &processorListener{
|
||||
pendingNotifications: []interface{}{},
|
||||
nextCh: make(chan interface{}),
|
||||
addCh: make(chan interface{}),
|
||||
handler: handler,
|
||||
requestedResyncPeriod: requestedResyncPeriod,
|
||||
resyncPeriod: resyncPeriod,
|
||||
}
|
||||
|
||||
ret.cond.L = &ret.lock
|
||||
|
||||
ret.determineNextResync(now)
|
||||
|
||||
return ret
|
||||
}
|
||||
|
||||
func (p *processorListener) add(notification interface{}) {
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
|
||||
p.pendingNotifications = append(p.pendingNotifications, notification)
|
||||
p.cond.Broadcast()
|
||||
p.addCh <- notification
|
||||
}
|
||||
|
||||
func (p *processorListener) pop(stopCh <-chan struct{}) {
|
||||
func (p *processorListener) pop() {
|
||||
defer utilruntime.HandleCrash()
|
||||
defer close(p.nextCh) // Tell .run() to stop
|
||||
|
||||
// pendingNotifications is an unbounded slice that holds all notifications not yet distributed
|
||||
// there is one per listener, but a failing/stalled listener will have infinite pendingNotifications
|
||||
// added until we OOM.
|
||||
// TODO This is no worse than before, since reflectors were backed by unbounded DeltaFIFOs, but
|
||||
// we should try to do something better
|
||||
var pendingNotifications []interface{}
|
||||
var nextCh chan<- interface{}
|
||||
var notification interface{}
|
||||
for {
|
||||
blockingGet := func() (interface{}, bool) {
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
|
||||
for len(p.pendingNotifications) == 0 {
|
||||
// check if we're shutdown
|
||||
select {
|
||||
case <-stopCh:
|
||||
return nil, true
|
||||
default:
|
||||
}
|
||||
p.cond.Wait()
|
||||
}
|
||||
|
||||
nt := p.pendingNotifications[0]
|
||||
p.pendingNotifications = p.pendingNotifications[1:]
|
||||
return nt, false
|
||||
}
|
||||
|
||||
notification, stopped := blockingGet()
|
||||
if stopped {
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case <-stopCh:
|
||||
return
|
||||
case p.nextCh <- notification:
|
||||
case nextCh <- notification:
|
||||
// Notification dispatched
|
||||
if len(pendingNotifications) == 0 { // Nothing to pop
|
||||
nextCh = nil // Disable this select case
|
||||
notification = nil
|
||||
} else {
|
||||
notification = pendingNotifications[0]
|
||||
pendingNotifications[0] = nil
|
||||
pendingNotifications = pendingNotifications[1:]
|
||||
}
|
||||
case notificationToAdd, ok := <-p.addCh:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
if notification == nil { // No notification to pop (and pendingNotifications is empty)
|
||||
// Optimize the case - skip adding to pendingNotifications
|
||||
notification = notificationToAdd
|
||||
nextCh = p.nextCh
|
||||
} else { // There is already a notification waiting to be dispatched
|
||||
pendingNotifications = append(pendingNotifications, notificationToAdd)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *processorListener) run(stopCh <-chan struct{}) {
|
||||
func (p *processorListener) run() {
|
||||
defer utilruntime.HandleCrash()
|
||||
|
||||
for {
|
||||
var next interface{}
|
||||
select {
|
||||
case <-stopCh:
|
||||
func() {
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
p.cond.Broadcast()
|
||||
}()
|
||||
return
|
||||
case next = <-p.nextCh:
|
||||
}
|
||||
|
||||
for next := range p.nextCh {
|
||||
switch notification := next.(type) {
|
||||
case updateNotification:
|
||||
p.handler.OnUpdate(notification.oldObj, notification.newObj)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue