Update code for latest k8s

Signed-off-by: Mrunal Patel <mrunalp@gmail.com>
This commit is contained in:
Mrunal Patel 2018-02-12 12:13:07 -08:00
parent 8f5e37a83c
commit 5f7ac28059
792 changed files with 25023 additions and 19841 deletions

View file

@ -59,7 +59,7 @@ type Queue interface {
// has since been added.
AddIfNotPresent(interface{}) error
// Return true if the first batch of items has been popped
// HasSynced returns true if the first batch of items has been popped
HasSynced() bool
// Close queue

View file

@ -63,8 +63,18 @@ type Getter interface {
// NewListWatchFromClient creates a new ListWatch from the specified client, resource, namespace and field selector.
func NewListWatchFromClient(c Getter, resource string, namespace string, fieldSelector fields.Selector) *ListWatch {
listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
optionsModifier := func(options *metav1.ListOptions) {
options.FieldSelector = fieldSelector.String()
}
return NewFilteredListWatchFromClient(c, resource, namespace, optionsModifier)
}
// NewFilteredListWatchFromClient creates a new ListWatch from the specified client, resource, namespace, and option modifier.
// Option modifier is a function takes a ListOptions and modifies the consumed ListOptions. Provide customized modifier function
// to apply modification to ListOptions with a field selector, a label selector, or any other desired options.
func NewFilteredListWatchFromClient(c Getter, resource string, namespace string, optionsModifier func(options *metav1.ListOptions)) *ListWatch {
listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
optionsModifier(&options)
return c.Get().
Namespace(namespace).
Resource(resource).
@ -74,7 +84,7 @@ func NewListWatchFromClient(c Getter, resource string, namespace string, fieldSe
}
watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
options.Watch = true
options.FieldSelector = fieldSelector.String()
optionsModifier(&options)
return c.Get().
Namespace(namespace).
Resource(resource).

View file

@ -108,8 +108,8 @@ func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{},
reflectorSuffix := atomic.AddInt64(&reflectorDisambiguator, 1)
r := &Reflector{
name: name,
// we need this to be unique per process (some names are still the same)but obvious who it belongs to
metrics: newReflectorMetrics(makeValidPromethusMetricLabel(fmt.Sprintf("reflector_"+name+"_%d", reflectorSuffix))),
// we need this to be unique per process (some names are still the same) but obvious who it belongs to
metrics: newReflectorMetrics(makeValidPrometheusMetricLabel(fmt.Sprintf("reflector_"+name+"_%d", reflectorSuffix))),
listerWatcher: lw,
store: store,
expectedType: reflect.TypeOf(expectedType),
@ -120,7 +120,7 @@ func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{},
return r
}
func makeValidPromethusMetricLabel(in string) string {
func makeValidPrometheusMetricLabel(in string) string {
// this isn't perfect, but it removes our common characters
return strings.NewReplacer("/", "_", ".", "_", "-", "_", ":", "_").Replace(in)
}
@ -302,12 +302,12 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
default:
}
timemoutseconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
options = metav1.ListOptions{
ResourceVersion: resourceVersion,
// We want to avoid situations of hanging watchers. Stop any wachers that do not
// receive any events within the timeout window.
TimeoutSeconds: &timemoutseconds,
TimeoutSeconds: &timeoutSeconds,
}
r.metrics.numberOfWatches.Inc()

View file

@ -26,6 +26,7 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/buffer"
"k8s.io/client-go/util/retry"
"github.com/golang/glog"
)
@ -540,20 +541,35 @@ func (p *processorListener) pop() {
}
func (p *processorListener) run() {
defer utilruntime.HandleCrash()
// this call blocks until the channel is closed. When a panic happens during the notification
// we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
// the next notification will be attempted. This is usually better than the alternative of never
// delivering again.
stopCh := make(chan struct{})
wait.Until(func() {
// this gives us a few quick retries before a long pause and then a few more quick retries
err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
for next := range p.nextCh {
switch notification := next.(type) {
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))
}
}
// the only way to get here is if the p.nextCh is empty and closed
return true, nil
})
for next := range p.nextCh {
switch notification := next.(type) {
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))
// the only way to get here is if the p.nextCh is empty and closed
if err == nil {
close(stopCh)
}
}
}, 1*time.Minute, stopCh)
}
// shouldResync deterimines if the listener needs a resync. If the listener's resyncPeriod is 0,

View file

@ -1,7 +1,7 @@
// +build !ignore_autogenerated
/*
Copyright 2017 The Kubernetes Authors.
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.