8e5b17cf13
Signed-off-by: Mrunal Patel <mrunalp@gmail.com>
1070 lines
38 KiB
Go
1070 lines
38 KiB
Go
/*
|
|
Copyright 2016 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package service
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"reflect"
|
|
|
|
"github.com/golang/glog"
|
|
"k8s.io/apimachinery/pkg/api/errors"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/conversion"
|
|
pkgruntime "k8s.io/apimachinery/pkg/runtime"
|
|
"k8s.io/apimachinery/pkg/util/runtime"
|
|
"k8s.io/apimachinery/pkg/util/sets"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
"k8s.io/apimachinery/pkg/watch"
|
|
v1beta1 "k8s.io/kubernetes/federation/apis/federation/v1beta1"
|
|
federationcache "k8s.io/kubernetes/federation/client/cache"
|
|
fedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset"
|
|
"k8s.io/kubernetes/federation/pkg/dnsprovider"
|
|
"k8s.io/kubernetes/federation/pkg/federation-controller/util"
|
|
fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util"
|
|
"k8s.io/kubernetes/federation/pkg/federation-controller/util/deletionhelper"
|
|
"k8s.io/kubernetes/pkg/api"
|
|
v1 "k8s.io/kubernetes/pkg/api/v1"
|
|
cache "k8s.io/kubernetes/pkg/client/cache"
|
|
kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
|
"k8s.io/kubernetes/pkg/client/record"
|
|
"k8s.io/kubernetes/pkg/controller"
|
|
"k8s.io/kubernetes/pkg/util/workqueue"
|
|
)
|
|
|
|
const (
|
|
serviceSyncPeriod = 10 * time.Minute
|
|
clusterSyncPeriod = 10 * time.Minute
|
|
|
|
// How long to wait before retrying the processing of a service change.
|
|
// If this changes, the sleep in hack/jenkins/e2e.sh before downing a cluster
|
|
// should be changed appropriately.
|
|
minRetryDelay = 5 * time.Second
|
|
maxRetryDelay = 300 * time.Second
|
|
|
|
// client retry count and interval is when accessing a remote kube-apiserver or federation apiserver
|
|
// how many times should be attempted and how long it should sleep when failure occurs
|
|
// the retry should be in short time so no exponential backoff
|
|
clientRetryCount = 5
|
|
|
|
retryable = true
|
|
|
|
doNotRetry = time.Duration(0)
|
|
|
|
UserAgentName = "federation-service-controller"
|
|
KubeAPIQPS = 20.0
|
|
KubeAPIBurst = 30
|
|
|
|
maxNoOfClusters = 100
|
|
|
|
updateTimeout = 30 * time.Second
|
|
allClustersKey = "ALL_CLUSTERS"
|
|
clusterAvailableDelay = time.Second * 20
|
|
)
|
|
|
|
type cachedService struct {
|
|
lastState *v1.Service
|
|
// The state as successfully applied to the DNS server
|
|
appliedState *v1.Service
|
|
// cluster endpoint map hold subset info from kubernetes clusters
|
|
// key clusterName
|
|
// value is a flag that if there is ready address, 1 means there is ready address
|
|
endpointMap map[string]int
|
|
// serviceStatusMap map holds service status info from kubernetes clusters, keyed on clusterName
|
|
serviceStatusMap map[string]v1.LoadBalancerStatus
|
|
// Ensures only one goroutine can operate on this service at any given time.
|
|
rwlock sync.Mutex
|
|
// Controls error back-off for proceeding federation service to k8s clusters
|
|
lastRetryDelay time.Duration
|
|
// Controls error back-off for updating federation service back to federation apiserver
|
|
lastFedUpdateDelay time.Duration
|
|
// Controls error back-off for dns record update
|
|
lastDNSUpdateDelay time.Duration
|
|
}
|
|
|
|
type serviceCache struct {
|
|
rwlock sync.Mutex // protects serviceMap
|
|
// federation service map contains all service received from federation apiserver
|
|
// key serviceName
|
|
fedServiceMap map[string]*cachedService
|
|
}
|
|
|
|
type ServiceController struct {
|
|
dns dnsprovider.Interface
|
|
federationClient fedclientset.Interface
|
|
federationName string
|
|
// serviceDnsSuffix is the DNS suffix we use when publishing service DNS names
|
|
serviceDnsSuffix string
|
|
// zoneName and zoneID are used to identify the zone in which to put records
|
|
zoneName string
|
|
zoneID string
|
|
// each federation should be configured with a single zone (e.g. "mycompany.com")
|
|
dnsZones dnsprovider.Zones
|
|
serviceCache *serviceCache
|
|
clusterCache *clusterClientCache
|
|
// A store of services, populated by the serviceController
|
|
serviceStore cache.StoreToServiceLister
|
|
// Watches changes to all services
|
|
serviceController cache.Controller
|
|
federatedInformer fedutil.FederatedInformer
|
|
// A store of services, populated by the serviceController
|
|
clusterStore federationcache.StoreToClusterLister
|
|
// Watches changes to all services
|
|
clusterController cache.Controller
|
|
eventBroadcaster record.EventBroadcaster
|
|
eventRecorder record.EventRecorder
|
|
// services that need to be synced
|
|
queue *workqueue.Type
|
|
knownClusterSet sets.String
|
|
// endpoint worker map contains all the clusters registered with an indication that worker exist
|
|
// key clusterName
|
|
endpointWorkerMap map[string]bool
|
|
// channel for worker to signal that it is going out of existence
|
|
endpointWorkerDoneChan chan string
|
|
// service worker map contains all the clusters registered with an indication that worker exist
|
|
// key clusterName
|
|
serviceWorkerMap map[string]bool
|
|
// channel for worker to signal that it is going out of existence
|
|
serviceWorkerDoneChan chan string
|
|
|
|
// For triggering all services reconciliation. This is used when
|
|
// a new cluster becomes available.
|
|
clusterDeliverer *util.DelayingDeliverer
|
|
|
|
deletionHelper *deletionhelper.DeletionHelper
|
|
}
|
|
|
|
// New returns a new service controller to keep DNS provider service resources
|
|
// (like Kubernetes Services and DNS server records for service discovery) in sync with the registry.
|
|
|
|
func New(federationClient fedclientset.Interface, dns dnsprovider.Interface,
|
|
federationName, serviceDnsSuffix, zoneName string, zoneID string) *ServiceController {
|
|
broadcaster := record.NewBroadcaster()
|
|
// federationClient event is not supported yet
|
|
// broadcaster.StartRecordingToSink(&unversioned_core.EventSinkImpl{Interface: kubeClient.Core().Events("")})
|
|
recorder := broadcaster.NewRecorder(v1.EventSource{Component: UserAgentName})
|
|
|
|
s := &ServiceController{
|
|
dns: dns,
|
|
federationClient: federationClient,
|
|
federationName: federationName,
|
|
serviceDnsSuffix: serviceDnsSuffix,
|
|
zoneName: zoneName,
|
|
zoneID: zoneID,
|
|
serviceCache: &serviceCache{fedServiceMap: make(map[string]*cachedService)},
|
|
clusterCache: &clusterClientCache{
|
|
rwlock: sync.Mutex{},
|
|
clientMap: make(map[string]*clusterCache),
|
|
},
|
|
eventBroadcaster: broadcaster,
|
|
eventRecorder: recorder,
|
|
queue: workqueue.New(),
|
|
knownClusterSet: make(sets.String),
|
|
}
|
|
s.clusterDeliverer = util.NewDelayingDeliverer()
|
|
s.serviceStore.Indexer, s.serviceController = cache.NewIndexerInformer(
|
|
&cache.ListWatch{
|
|
ListFunc: func(options v1.ListOptions) (pkgruntime.Object, error) {
|
|
return s.federationClient.Core().Services(v1.NamespaceAll).List(options)
|
|
},
|
|
WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {
|
|
return s.federationClient.Core().Services(v1.NamespaceAll).Watch(options)
|
|
},
|
|
},
|
|
&v1.Service{},
|
|
serviceSyncPeriod,
|
|
cache.ResourceEventHandlerFuncs{
|
|
AddFunc: s.enqueueService,
|
|
UpdateFunc: func(old, cur interface{}) {
|
|
// there is case that old and new are equals but we still catch the event now.
|
|
if !reflect.DeepEqual(old, cur) {
|
|
s.enqueueService(cur)
|
|
}
|
|
},
|
|
DeleteFunc: s.enqueueService,
|
|
},
|
|
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
|
|
)
|
|
s.clusterStore.Store, s.clusterController = cache.NewInformer(
|
|
&cache.ListWatch{
|
|
ListFunc: func(options v1.ListOptions) (pkgruntime.Object, error) {
|
|
return s.federationClient.Federation().Clusters().List(options)
|
|
},
|
|
WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {
|
|
return s.federationClient.Federation().Clusters().Watch(options)
|
|
},
|
|
},
|
|
&v1beta1.Cluster{},
|
|
clusterSyncPeriod,
|
|
cache.ResourceEventHandlerFuncs{
|
|
DeleteFunc: s.clusterCache.delFromClusterSet,
|
|
AddFunc: s.clusterCache.addToClientMap,
|
|
UpdateFunc: func(old, cur interface{}) {
|
|
oldCluster, ok := old.(*v1beta1.Cluster)
|
|
if !ok {
|
|
return
|
|
}
|
|
curCluster, ok := cur.(*v1beta1.Cluster)
|
|
if !ok {
|
|
return
|
|
}
|
|
if !reflect.DeepEqual(oldCluster.Spec, curCluster.Spec) {
|
|
// update when spec is changed
|
|
s.clusterCache.addToClientMap(cur)
|
|
}
|
|
|
|
pred := getClusterConditionPredicate()
|
|
// only update when condition changed to ready from not-ready
|
|
if !pred(*oldCluster) && pred(*curCluster) {
|
|
s.clusterCache.addToClientMap(cur)
|
|
}
|
|
// did not handle ready -> not-ready
|
|
// how could we stop a controller?
|
|
},
|
|
},
|
|
)
|
|
|
|
clusterLifecycle := fedutil.ClusterLifecycleHandlerFuncs{
|
|
ClusterAvailable: func(cluster *v1beta1.Cluster) {
|
|
s.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterAvailableDelay)
|
|
},
|
|
}
|
|
fedInformerFactory := func(cluster *v1beta1.Cluster, targetClient kubeclientset.Interface) (cache.Store, cache.Controller) {
|
|
return cache.NewInformer(
|
|
&cache.ListWatch{
|
|
ListFunc: func(options v1.ListOptions) (pkgruntime.Object, error) {
|
|
return targetClient.Core().Services(v1.NamespaceAll).List(options)
|
|
},
|
|
WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {
|
|
return targetClient.Core().Services(v1.NamespaceAll).Watch(options)
|
|
},
|
|
},
|
|
&v1.Service{},
|
|
controller.NoResyncPeriodFunc(),
|
|
// Trigger reconciliation whenever something in federated cluster is changed. In most cases it
|
|
// would be just confirmation that some service operation succeeded.
|
|
util.NewTriggerOnAllChanges(
|
|
func(obj pkgruntime.Object) {
|
|
// TODO: Use this to enque services.
|
|
},
|
|
))
|
|
}
|
|
|
|
s.federatedInformer = fedutil.NewFederatedInformer(federationClient, fedInformerFactory, &clusterLifecycle)
|
|
|
|
federatedUpdater := fedutil.NewFederatedUpdater(s.federatedInformer,
|
|
func(client kubeclientset.Interface, obj pkgruntime.Object) error {
|
|
svc := obj.(*v1.Service)
|
|
_, err := client.Core().Services(svc.Namespace).Create(svc)
|
|
return err
|
|
},
|
|
func(client kubeclientset.Interface, obj pkgruntime.Object) error {
|
|
svc := obj.(*v1.Service)
|
|
_, err := client.Core().Services(svc.Namespace).Update(svc)
|
|
return err
|
|
},
|
|
func(client kubeclientset.Interface, obj pkgruntime.Object) error {
|
|
svc := obj.(*v1.Service)
|
|
err := client.Core().Services(svc.Namespace).Delete(svc.Name, &v1.DeleteOptions{})
|
|
return err
|
|
})
|
|
|
|
s.deletionHelper = deletionhelper.NewDeletionHelper(
|
|
s.hasFinalizerFunc,
|
|
s.removeFinalizerFunc,
|
|
s.addFinalizerFunc,
|
|
// objNameFunc
|
|
func(obj pkgruntime.Object) string {
|
|
service := obj.(*v1.Service)
|
|
return service.Name
|
|
},
|
|
updateTimeout,
|
|
s.eventRecorder,
|
|
s.federatedInformer,
|
|
federatedUpdater,
|
|
)
|
|
|
|
s.endpointWorkerMap = make(map[string]bool)
|
|
s.serviceWorkerMap = make(map[string]bool)
|
|
s.endpointWorkerDoneChan = make(chan string, maxNoOfClusters)
|
|
s.serviceWorkerDoneChan = make(chan string, maxNoOfClusters)
|
|
return s
|
|
}
|
|
|
|
// Returns true if the given object has the given finalizer in its ObjectMeta.
|
|
func (s *ServiceController) hasFinalizerFunc(obj pkgruntime.Object, finalizer string) bool {
|
|
service := obj.(*v1.Service)
|
|
for i := range service.ObjectMeta.Finalizers {
|
|
if string(service.ObjectMeta.Finalizers[i]) == finalizer {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// Removes the finalizer from the given objects ObjectMeta.
|
|
// Assumes that the given object is a service.
|
|
func (s *ServiceController) removeFinalizerFunc(obj pkgruntime.Object, finalizer string) (pkgruntime.Object, error) {
|
|
service := obj.(*v1.Service)
|
|
newFinalizers := []string{}
|
|
hasFinalizer := false
|
|
for i := range service.ObjectMeta.Finalizers {
|
|
if string(service.ObjectMeta.Finalizers[i]) != finalizer {
|
|
newFinalizers = append(newFinalizers, service.ObjectMeta.Finalizers[i])
|
|
} else {
|
|
hasFinalizer = true
|
|
}
|
|
}
|
|
if !hasFinalizer {
|
|
// Nothing to do.
|
|
return obj, nil
|
|
}
|
|
service.ObjectMeta.Finalizers = newFinalizers
|
|
service, err := s.federationClient.Core().Services(service.Namespace).Update(service)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to remove finalizer %s from service %s: %v", finalizer, service.Name, err)
|
|
}
|
|
return service, nil
|
|
}
|
|
|
|
// Adds the given finalizer to the given objects ObjectMeta.
|
|
// Assumes that the given object is a service.
|
|
func (s *ServiceController) addFinalizerFunc(obj pkgruntime.Object, finalizer string) (pkgruntime.Object, error) {
|
|
service := obj.(*v1.Service)
|
|
service.ObjectMeta.Finalizers = append(service.ObjectMeta.Finalizers, finalizer)
|
|
service, err := s.federationClient.Core().Services(service.Namespace).Update(service)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to add finalizer %s to service %s: %v", finalizer, service.Name, err)
|
|
}
|
|
return service, nil
|
|
}
|
|
|
|
// obj could be an *api.Service, or a DeletionFinalStateUnknown marker item.
|
|
func (s *ServiceController) enqueueService(obj interface{}) {
|
|
key, err := controller.KeyFunc(obj)
|
|
if err != nil {
|
|
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
|
|
return
|
|
}
|
|
s.queue.Add(key)
|
|
}
|
|
|
|
// Run starts a background goroutine that watches for changes to federation services
|
|
// and ensures that they have Kubernetes services created, updated or deleted appropriately.
|
|
// federationSyncPeriod controls how often we check the federation's services to
|
|
// ensure that the correct Kubernetes services (and associated DNS entries) exist.
|
|
// This is only necessary to fudge over failed watches.
|
|
// clusterSyncPeriod controls how often we check the federation's underlying clusters and
|
|
// their Kubernetes services to ensure that matching services created independently of the Federation
|
|
// (e.g. directly via the underlying cluster's API) are correctly accounted for.
|
|
|
|
// It's an error to call Run() more than once for a given ServiceController
|
|
// object.
|
|
func (s *ServiceController) Run(workers int, stopCh <-chan struct{}) error {
|
|
if err := s.init(); err != nil {
|
|
return err
|
|
}
|
|
defer runtime.HandleCrash()
|
|
s.federatedInformer.Start()
|
|
s.clusterDeliverer.StartWithHandler(func(_ *util.DelayingDelivererItem) {
|
|
// TODO: Use this new clusterDeliverer to reconcile services in new clusters.
|
|
})
|
|
go s.serviceController.Run(stopCh)
|
|
go s.clusterController.Run(stopCh)
|
|
for i := 0; i < workers; i++ {
|
|
go wait.Until(s.fedServiceWorker, time.Second, stopCh)
|
|
}
|
|
go wait.Until(s.clusterEndpointWorker, time.Second, stopCh)
|
|
go wait.Until(s.clusterServiceWorker, time.Second, stopCh)
|
|
go wait.Until(s.clusterSyncLoop, time.Second, stopCh)
|
|
<-stopCh
|
|
glog.Infof("Shutting down Federation Service Controller")
|
|
s.queue.ShutDown()
|
|
s.federatedInformer.Stop()
|
|
return nil
|
|
}
|
|
|
|
func (s *ServiceController) init() error {
|
|
if s.federationName == "" {
|
|
return fmt.Errorf("ServiceController should not be run without federationName.")
|
|
}
|
|
if s.zoneName == "" && s.zoneID == "" {
|
|
return fmt.Errorf("ServiceController must be run with either zoneName or zoneID.")
|
|
}
|
|
if s.serviceDnsSuffix == "" {
|
|
// TODO: Is this the right place to do defaulting?
|
|
if s.zoneName == "" {
|
|
return fmt.Errorf("ServiceController must be run with zoneName, if serviceDnsSuffix is not set.")
|
|
}
|
|
s.serviceDnsSuffix = s.zoneName
|
|
}
|
|
if s.dns == nil {
|
|
return fmt.Errorf("ServiceController should not be run without a dnsprovider.")
|
|
}
|
|
zones, ok := s.dns.Zones()
|
|
if !ok {
|
|
return fmt.Errorf("the dns provider does not support zone enumeration, which is required for creating dns records.")
|
|
}
|
|
s.dnsZones = zones
|
|
matchingZones, err := getDnsZones(s.zoneName, s.zoneID, s.dnsZones)
|
|
if err != nil {
|
|
return fmt.Errorf("error querying for DNS zones: %v", err)
|
|
}
|
|
if len(matchingZones) == 0 {
|
|
if s.zoneName == "" {
|
|
return fmt.Errorf("ServiceController must be run with zoneName to create zone automatically.")
|
|
}
|
|
glog.Infof("DNS zone %q not found. Creating DNS zone %q.", s.zoneName, s.zoneName)
|
|
managedZone, err := s.dnsZones.New(s.zoneName)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
zone, err := s.dnsZones.Add(managedZone)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
glog.Infof("DNS zone %q successfully created. Note that DNS resolution will not work until you have registered this name with "+
|
|
"a DNS registrar and they have changed the authoritative name servers for your domain to point to your DNS provider.", zone.Name())
|
|
}
|
|
if len(matchingZones) > 1 {
|
|
return fmt.Errorf("Multiple matching DNS zones found for %q; please specify zoneID", s.zoneName)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// fedServiceWorker runs a worker thread that just dequeues items, processes them, and marks them done.
|
|
// It enforces that the syncService is never invoked concurrently with the same key.
|
|
func (s *ServiceController) fedServiceWorker() {
|
|
for {
|
|
func() {
|
|
key, quit := s.queue.Get()
|
|
if quit {
|
|
return
|
|
}
|
|
|
|
defer s.queue.Done(key)
|
|
err := s.syncService(key.(string))
|
|
if err != nil {
|
|
glog.Errorf("Error syncing service: %v", err)
|
|
}
|
|
}()
|
|
}
|
|
}
|
|
|
|
func wantsDNSRecords(service *v1.Service) bool {
|
|
return service.Spec.Type == v1.ServiceTypeLoadBalancer
|
|
}
|
|
|
|
// processServiceForCluster creates or updates service to all registered running clusters,
|
|
// update DNS records and update the service info with DNS entries to federation apiserver.
|
|
// the function returns any error caught
|
|
func (s *ServiceController) processServiceForCluster(cachedService *cachedService, clusterName string, service *v1.Service, client *kubeclientset.Clientset) error {
|
|
glog.V(4).Infof("Process service %s/%s for cluster %s", service.Namespace, service.Name, clusterName)
|
|
// Create or Update k8s Service
|
|
err := s.ensureClusterService(cachedService, clusterName, service, client)
|
|
if err != nil {
|
|
glog.V(4).Infof("Failed to process service %s/%s for cluster %s", service.Namespace, service.Name, clusterName)
|
|
return err
|
|
}
|
|
glog.V(4).Infof("Successfully process service %s/%s for cluster %s", service.Namespace, service.Name, clusterName)
|
|
return nil
|
|
}
|
|
|
|
// updateFederationService Returns whatever error occurred along with a boolean indicator of whether it
|
|
// should be retried.
|
|
func (s *ServiceController) updateFederationService(key string, cachedService *cachedService) (error, bool) {
|
|
// Clone federation service, and create them in underlying k8s cluster
|
|
desiredService := &v1.Service{
|
|
ObjectMeta: util.DeepCopyRelevantObjectMeta(cachedService.lastState.ObjectMeta),
|
|
Spec: *(util.DeepCopyApiTypeOrPanic(&cachedService.lastState.Spec).(*v1.ServiceSpec)),
|
|
}
|
|
|
|
// handle available clusters one by one
|
|
var hasErr bool
|
|
for clusterName, cache := range s.clusterCache.clientMap {
|
|
go func(cache *clusterCache, clusterName string) {
|
|
err := s.processServiceForCluster(cachedService, clusterName, desiredService, cache.clientset)
|
|
if err != nil {
|
|
hasErr = true
|
|
}
|
|
}(cache, clusterName)
|
|
}
|
|
if hasErr {
|
|
// detail error has been dumped inside the loop
|
|
return fmt.Errorf("Service %s/%s was not successfully updated to all clusters", desiredService.Namespace, desiredService.Name), retryable
|
|
}
|
|
return nil, !retryable
|
|
}
|
|
|
|
func (s *ServiceController) ensureClusterService(cachedService *cachedService, clusterName string, service *v1.Service, client *kubeclientset.Clientset) error {
|
|
var err error
|
|
var needUpdate bool
|
|
for i := 0; i < clientRetryCount; i++ {
|
|
svc, err := client.Core().Services(service.Namespace).Get(service.Name, metav1.GetOptions{})
|
|
if err == nil {
|
|
// service exists
|
|
glog.V(5).Infof("Found service %s/%s from cluster %s", service.Namespace, service.Name, clusterName)
|
|
//reserve immutable fields
|
|
service.Spec.ClusterIP = svc.Spec.ClusterIP
|
|
|
|
//reserve auto assigned field
|
|
for i, oldPort := range svc.Spec.Ports {
|
|
for _, port := range service.Spec.Ports {
|
|
if port.NodePort == 0 {
|
|
if !portEqualExcludeNodePort(&oldPort, &port) {
|
|
svc.Spec.Ports[i] = port
|
|
needUpdate = true
|
|
}
|
|
} else {
|
|
if !portEqualForLB(&oldPort, &port) {
|
|
svc.Spec.Ports[i] = port
|
|
needUpdate = true
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if needUpdate {
|
|
// we only apply spec update
|
|
svc.Spec = service.Spec
|
|
_, err = client.Core().Services(svc.Namespace).Update(svc)
|
|
if err == nil {
|
|
glog.V(5).Infof("Service %s/%s successfully updated to cluster %s", svc.Namespace, svc.Name, clusterName)
|
|
return nil
|
|
} else {
|
|
glog.V(4).Infof("Failed to update %+v", err)
|
|
}
|
|
} else {
|
|
glog.V(5).Infof("Service %s/%s is not updated to cluster %s as the spec are identical", svc.Namespace, svc.Name, clusterName)
|
|
return nil
|
|
}
|
|
} else if errors.IsNotFound(err) {
|
|
// Create service if it is not found
|
|
glog.Infof("Service '%s/%s' is not found in cluster %s, trying to create new",
|
|
service.Namespace, service.Name, clusterName)
|
|
service.ResourceVersion = ""
|
|
_, err = client.Core().Services(service.Namespace).Create(service)
|
|
if err == nil {
|
|
glog.V(5).Infof("Service %s/%s successfully created to cluster %s", service.Namespace, service.Name, clusterName)
|
|
return nil
|
|
}
|
|
glog.V(4).Infof("Failed to create %+v", err)
|
|
if errors.IsAlreadyExists(err) {
|
|
glog.V(5).Infof("service %s/%s already exists in cluster %s", service.Namespace, service.Name, clusterName)
|
|
return nil
|
|
}
|
|
}
|
|
if errors.IsConflict(err) {
|
|
glog.V(4).Infof("Not persisting update to service '%s/%s' that has been changed since we received it: %v",
|
|
service.Namespace, service.Name, err)
|
|
}
|
|
// should we reuse same retry delay for all clusters?
|
|
time.Sleep(cachedService.nextRetryDelay())
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (s *serviceCache) allServices() []*cachedService {
|
|
s.rwlock.Lock()
|
|
defer s.rwlock.Unlock()
|
|
services := make([]*cachedService, 0, len(s.fedServiceMap))
|
|
for _, v := range s.fedServiceMap {
|
|
services = append(services, v)
|
|
}
|
|
return services
|
|
}
|
|
|
|
func (s *serviceCache) get(serviceName string) (*cachedService, bool) {
|
|
s.rwlock.Lock()
|
|
defer s.rwlock.Unlock()
|
|
service, ok := s.fedServiceMap[serviceName]
|
|
return service, ok
|
|
}
|
|
|
|
func (s *serviceCache) getOrCreate(serviceName string) *cachedService {
|
|
s.rwlock.Lock()
|
|
defer s.rwlock.Unlock()
|
|
service, ok := s.fedServiceMap[serviceName]
|
|
if !ok {
|
|
service = &cachedService{
|
|
endpointMap: make(map[string]int),
|
|
serviceStatusMap: make(map[string]v1.LoadBalancerStatus),
|
|
}
|
|
s.fedServiceMap[serviceName] = service
|
|
}
|
|
return service
|
|
}
|
|
|
|
func (s *serviceCache) set(serviceName string, service *cachedService) {
|
|
s.rwlock.Lock()
|
|
defer s.rwlock.Unlock()
|
|
s.fedServiceMap[serviceName] = service
|
|
}
|
|
|
|
func (s *serviceCache) delete(serviceName string) {
|
|
s.rwlock.Lock()
|
|
defer s.rwlock.Unlock()
|
|
delete(s.fedServiceMap, serviceName)
|
|
}
|
|
|
|
// needsUpdateDNS check if the dns records of the given service should be updated
|
|
func (s *ServiceController) needsUpdateDNS(oldService *v1.Service, newService *v1.Service) bool {
|
|
if !wantsDNSRecords(oldService) && !wantsDNSRecords(newService) {
|
|
return false
|
|
}
|
|
if wantsDNSRecords(oldService) != wantsDNSRecords(newService) {
|
|
s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "Type", "%v -> %v",
|
|
oldService.Spec.Type, newService.Spec.Type)
|
|
return true
|
|
}
|
|
if !portsEqualForLB(oldService, newService) || oldService.Spec.SessionAffinity != newService.Spec.SessionAffinity {
|
|
return true
|
|
}
|
|
if !LoadBalancerIPsAreEqual(oldService, newService) {
|
|
s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "LoadbalancerIP", "%v -> %v",
|
|
oldService.Spec.LoadBalancerIP, newService.Spec.LoadBalancerIP)
|
|
return true
|
|
}
|
|
if len(oldService.Spec.ExternalIPs) != len(newService.Spec.ExternalIPs) {
|
|
s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "ExternalIP", "Count: %v -> %v",
|
|
len(oldService.Spec.ExternalIPs), len(newService.Spec.ExternalIPs))
|
|
return true
|
|
}
|
|
for i := range oldService.Spec.ExternalIPs {
|
|
if oldService.Spec.ExternalIPs[i] != newService.Spec.ExternalIPs[i] {
|
|
s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "ExternalIP", "Added: %v",
|
|
newService.Spec.ExternalIPs[i])
|
|
return true
|
|
}
|
|
}
|
|
if !reflect.DeepEqual(oldService.Annotations, newService.Annotations) {
|
|
return true
|
|
}
|
|
if oldService.UID != newService.UID {
|
|
s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "UID", "%v -> %v",
|
|
oldService.UID, newService.UID)
|
|
return true
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
func getPortsForLB(service *v1.Service) ([]*v1.ServicePort, error) {
|
|
// TODO: quinton: Probably applies for DNS SVC records. Come back to this.
|
|
//var protocol api.Protocol
|
|
|
|
ports := []*v1.ServicePort{}
|
|
for i := range service.Spec.Ports {
|
|
sp := &service.Spec.Ports[i]
|
|
// The check on protocol was removed here. The DNS provider itself is now responsible for all protocol validation
|
|
ports = append(ports, sp)
|
|
}
|
|
return ports, nil
|
|
}
|
|
|
|
func portsEqualForLB(x, y *v1.Service) bool {
|
|
xPorts, err := getPortsForLB(x)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
yPorts, err := getPortsForLB(y)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
return portSlicesEqualForLB(xPorts, yPorts)
|
|
}
|
|
|
|
func portSlicesEqualForLB(x, y []*v1.ServicePort) bool {
|
|
if len(x) != len(y) {
|
|
return false
|
|
}
|
|
|
|
for i := range x {
|
|
if !portEqualForLB(x[i], y[i]) {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
func portEqualForLB(x, y *v1.ServicePort) bool {
|
|
// TODO: Should we check name? (In theory, an LB could expose it)
|
|
if x.Name != y.Name {
|
|
return false
|
|
}
|
|
|
|
if x.Protocol != y.Protocol {
|
|
return false
|
|
}
|
|
|
|
if x.Port != y.Port {
|
|
return false
|
|
}
|
|
if x.NodePort != y.NodePort {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
func portEqualExcludeNodePort(x, y *v1.ServicePort) bool {
|
|
// TODO: Should we check name? (In theory, an LB could expose it)
|
|
if x.Name != y.Name {
|
|
return false
|
|
}
|
|
|
|
if x.Protocol != y.Protocol {
|
|
return false
|
|
}
|
|
|
|
if x.Port != y.Port {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
func clustersFromList(list *v1beta1.ClusterList) []string {
|
|
result := []string{}
|
|
for ix := range list.Items {
|
|
result = append(result, list.Items[ix].Name)
|
|
}
|
|
return result
|
|
}
|
|
|
|
// getClusterConditionPredicate filter all clusters meet condition of
|
|
// condition.type=Ready and condition.status=true
|
|
func getClusterConditionPredicate() federationcache.ClusterConditionPredicate {
|
|
return func(cluster v1beta1.Cluster) bool {
|
|
// If we have no info, don't accept
|
|
if len(cluster.Status.Conditions) == 0 {
|
|
return false
|
|
}
|
|
for _, cond := range cluster.Status.Conditions {
|
|
//We consider the cluster for load balancing only when its ClusterReady condition status
|
|
//is ConditionTrue
|
|
if cond.Type == v1beta1.ClusterReady && cond.Status != v1.ConditionTrue {
|
|
glog.V(4).Infof("Ignoring cluster %v with %v condition status %v", cluster.Name, cond.Type, cond.Status)
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
}
|
|
|
|
// clusterSyncLoop observes running clusters changes, and apply all services to new added cluster
|
|
// and add dns records for the changes
|
|
func (s *ServiceController) clusterSyncLoop() {
|
|
var servicesToUpdate []*cachedService
|
|
// should we remove cache for cluster from ready to not ready? should remove the condition predicate if no
|
|
clusters, err := s.clusterStore.ClusterCondition(getClusterConditionPredicate()).List()
|
|
if err != nil {
|
|
glog.Infof("Fail to get cluster list")
|
|
return
|
|
}
|
|
newClusters := clustersFromList(&clusters)
|
|
var newSet, increase sets.String
|
|
newSet = sets.NewString(newClusters...)
|
|
if newSet.Equal(s.knownClusterSet) {
|
|
// The set of cluster names in the services in the federation hasn't changed, but we can retry
|
|
// updating any services that we failed to update last time around.
|
|
servicesToUpdate = s.updateDNSRecords(servicesToUpdate, newClusters)
|
|
return
|
|
}
|
|
glog.Infof("Detected change in list of cluster names. New set: %v, Old set: %v", newSet, s.knownClusterSet)
|
|
increase = newSet.Difference(s.knownClusterSet)
|
|
// do nothing when cluster is removed.
|
|
if increase != nil {
|
|
// Try updating all services, and save the ones that fail to try again next
|
|
// round.
|
|
servicesToUpdate = s.serviceCache.allServices()
|
|
numServices := len(servicesToUpdate)
|
|
for newCluster := range increase {
|
|
glog.Infof("New cluster observed %s", newCluster)
|
|
s.updateAllServicesToCluster(servicesToUpdate, newCluster)
|
|
}
|
|
servicesToUpdate = s.updateDNSRecords(servicesToUpdate, newClusters)
|
|
glog.Infof("Successfully updated %d out of %d DNS records to direct traffic to the updated cluster",
|
|
numServices-len(servicesToUpdate), numServices)
|
|
}
|
|
s.knownClusterSet = newSet
|
|
}
|
|
|
|
func (s *ServiceController) updateAllServicesToCluster(services []*cachedService, clusterName string) {
|
|
cluster, ok := s.clusterCache.clientMap[clusterName]
|
|
if ok {
|
|
for _, cachedService := range services {
|
|
appliedState := cachedService.lastState
|
|
s.processServiceForCluster(cachedService, clusterName, appliedState, cluster.clientset)
|
|
}
|
|
}
|
|
}
|
|
|
|
// updateDNSRecords updates all existing federation service DNS Records so that
|
|
// they will match the list of cluster names provided.
|
|
// Returns the list of services that couldn't be updated.
|
|
func (s *ServiceController) updateDNSRecords(services []*cachedService, clusters []string) (servicesToRetry []*cachedService) {
|
|
for _, service := range services {
|
|
func() {
|
|
service.rwlock.Lock()
|
|
defer service.rwlock.Unlock()
|
|
// If the applied state is nil, that means it hasn't yet been successfully dealt
|
|
// with by the DNS Record reconciler. We can trust the DNS Record
|
|
// reconciler to ensure the federation service's DNS records are created to target
|
|
// the correct backend service IP's
|
|
if service.appliedState == nil {
|
|
return
|
|
}
|
|
if err := s.lockedUpdateDNSRecords(service, clusters); err != nil {
|
|
glog.Errorf("External error while updating DNS Records: %v.", err)
|
|
servicesToRetry = append(servicesToRetry, service)
|
|
}
|
|
}()
|
|
}
|
|
return servicesToRetry
|
|
}
|
|
|
|
// lockedUpdateDNSRecords Updates the DNS records of a service, assuming we hold the mutex
|
|
// associated with the service.
|
|
func (s *ServiceController) lockedUpdateDNSRecords(service *cachedService, clusterNames []string) error {
|
|
if !wantsDNSRecords(service.appliedState) {
|
|
return nil
|
|
}
|
|
|
|
ensuredCount := 0
|
|
unensuredCount := 0
|
|
for key := range s.clusterCache.clientMap {
|
|
for _, clusterName := range clusterNames {
|
|
if key == clusterName {
|
|
err := s.ensureDnsRecords(clusterName, service)
|
|
if err != nil {
|
|
unensuredCount += 1
|
|
glog.V(4).Infof("Failed to update DNS records for service %v from cluster %s: %v", service, clusterName, err)
|
|
} else {
|
|
ensuredCount += 1
|
|
}
|
|
}
|
|
}
|
|
}
|
|
missedCount := len(clusterNames) - ensuredCount - unensuredCount
|
|
if missedCount > 0 || unensuredCount > 0 {
|
|
return fmt.Errorf("Failed to update DNS records for %d clusters for service %v due to missing clients [missed count: %d] and/or failing to ensure DNS records [unensured count: %d]",
|
|
len(clusterNames), service, missedCount, unensuredCount)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func LoadBalancerIPsAreEqual(oldService, newService *v1.Service) bool {
|
|
return oldService.Spec.LoadBalancerIP == newService.Spec.LoadBalancerIP
|
|
}
|
|
|
|
// Computes the next retry, using exponential backoff
|
|
// mutex must be held.
|
|
func (s *cachedService) nextRetryDelay() time.Duration {
|
|
s.lastRetryDelay = s.lastRetryDelay * 2
|
|
if s.lastRetryDelay < minRetryDelay {
|
|
s.lastRetryDelay = minRetryDelay
|
|
}
|
|
if s.lastRetryDelay > maxRetryDelay {
|
|
s.lastRetryDelay = maxRetryDelay
|
|
}
|
|
return s.lastRetryDelay
|
|
}
|
|
|
|
// resetRetryDelay Resets the retry exponential backoff. mutex must be held.
|
|
func (s *cachedService) resetRetryDelay() {
|
|
s.lastRetryDelay = time.Duration(0)
|
|
}
|
|
|
|
// Computes the next retry, using exponential backoff
|
|
// mutex must be held.
|
|
func (s *cachedService) nextFedUpdateDelay() time.Duration {
|
|
s.lastFedUpdateDelay = s.lastFedUpdateDelay * 2
|
|
if s.lastFedUpdateDelay < minRetryDelay {
|
|
s.lastFedUpdateDelay = minRetryDelay
|
|
}
|
|
if s.lastFedUpdateDelay > maxRetryDelay {
|
|
s.lastFedUpdateDelay = maxRetryDelay
|
|
}
|
|
return s.lastFedUpdateDelay
|
|
}
|
|
|
|
// resetRetryDelay Resets the retry exponential backoff. mutex must be held.
|
|
func (s *cachedService) resetFedUpdateDelay() {
|
|
s.lastFedUpdateDelay = time.Duration(0)
|
|
}
|
|
|
|
// Computes the next retry, using exponential backoff
|
|
// mutex must be held.
|
|
func (s *cachedService) nextDNSUpdateDelay() time.Duration {
|
|
s.lastDNSUpdateDelay = s.lastDNSUpdateDelay * 2
|
|
if s.lastDNSUpdateDelay < minRetryDelay {
|
|
s.lastDNSUpdateDelay = minRetryDelay
|
|
}
|
|
if s.lastDNSUpdateDelay > maxRetryDelay {
|
|
s.lastDNSUpdateDelay = maxRetryDelay
|
|
}
|
|
return s.lastDNSUpdateDelay
|
|
}
|
|
|
|
// resetRetryDelay Resets the retry exponential backoff. mutex must be held.
|
|
func (s *cachedService) resetDNSUpdateDelay() {
|
|
s.lastDNSUpdateDelay = time.Duration(0)
|
|
}
|
|
|
|
// syncService will sync the Service with the given key if it has had its expectations fulfilled,
|
|
// meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be
|
|
// invoked concurrently with the same key.
|
|
func (s *ServiceController) syncService(key string) error {
|
|
startTime := time.Now()
|
|
var cachedService *cachedService
|
|
var retryDelay time.Duration
|
|
defer func() {
|
|
glog.V(4).Infof("Finished syncing service %q (%v)", key, time.Now().Sub(startTime))
|
|
}()
|
|
// obj holds the latest service info from apiserver
|
|
objFromStore, exists, err := s.serviceStore.Indexer.GetByKey(key)
|
|
if err != nil {
|
|
glog.Errorf("Unable to retrieve service %v from store: %v", key, err)
|
|
s.queue.Add(key)
|
|
return err
|
|
}
|
|
if !exists {
|
|
// service absence in store means watcher caught the deletion, ensure LB info is cleaned
|
|
glog.Infof("Service has been deleted %v", key)
|
|
err, retryDelay = s.processServiceDeletion(key)
|
|
}
|
|
// Create a copy before modifying the obj to prevent race condition with
|
|
// other readers of obj from store.
|
|
obj, err := conversion.NewCloner().DeepCopy(objFromStore)
|
|
if err != nil {
|
|
glog.Errorf("Error in deep copying service %v retrieved from store: %v", key, err)
|
|
s.queue.Add(key)
|
|
return err
|
|
}
|
|
|
|
if exists {
|
|
service, ok := obj.(*v1.Service)
|
|
if ok {
|
|
cachedService = s.serviceCache.getOrCreate(key)
|
|
err, retryDelay = s.processServiceUpdate(cachedService, service, key)
|
|
} else {
|
|
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
|
if !ok {
|
|
return fmt.Errorf("Object contained wasn't a service or a deleted key: %+v", obj)
|
|
}
|
|
glog.Infof("Found tombstone for %v", key)
|
|
err, retryDelay = s.processServiceDeletion(tombstone.Key)
|
|
}
|
|
}
|
|
|
|
if retryDelay != 0 {
|
|
s.enqueueService(obj)
|
|
} else if err != nil {
|
|
runtime.HandleError(fmt.Errorf("Failed to process service. Not retrying: %v", err))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// processServiceUpdate returns an error if processing the service update failed, along with a time.Duration
|
|
// indicating whether processing should be retried; zero means no-retry; otherwise
|
|
// we should retry in that Duration.
|
|
func (s *ServiceController) processServiceUpdate(cachedService *cachedService, service *v1.Service, key string) (error, time.Duration) {
|
|
// Ensure that no other goroutine will interfere with our processing of the
|
|
// service.
|
|
cachedService.rwlock.Lock()
|
|
defer cachedService.rwlock.Unlock()
|
|
|
|
if service.DeletionTimestamp != nil {
|
|
if err := s.delete(service); err != nil {
|
|
glog.Errorf("Failed to delete %s: %v", service, err)
|
|
s.eventRecorder.Eventf(service, api.EventTypeNormal, "DeleteFailed",
|
|
"Service delete failed: %v", err)
|
|
return err, cachedService.nextRetryDelay()
|
|
}
|
|
return nil, doNotRetry
|
|
}
|
|
|
|
glog.V(3).Infof("Ensuring delete object from underlying clusters finalizer for service: %s",
|
|
service.Name)
|
|
// Add the required finalizers before creating a service in underlying clusters.
|
|
updatedServiceObj, err := s.deletionHelper.EnsureFinalizers(service)
|
|
if err != nil {
|
|
glog.Errorf("Failed to ensure delete object from underlying clusters finalizer in service %s: %v",
|
|
service.Name, err)
|
|
return err, cachedService.nextRetryDelay()
|
|
}
|
|
service = updatedServiceObj.(*v1.Service)
|
|
|
|
glog.V(3).Infof("Syncing service %s in underlying clusters", service.Name)
|
|
|
|
// Update the cached service (used above for populating synthetic deletes)
|
|
// alway trust service, which is retrieve from serviceStore, which keeps the latest service info getting from apiserver
|
|
// if the same service is changed before this go routine finished, there will be another queue entry to handle that.
|
|
cachedService.lastState = service
|
|
err, retry := s.updateFederationService(key, cachedService)
|
|
if err != nil {
|
|
message := "Error occurs when updating service to all clusters"
|
|
if retry {
|
|
message += " (will retry): "
|
|
} else {
|
|
message += " (will not retry): "
|
|
}
|
|
message += err.Error()
|
|
s.eventRecorder.Event(service, v1.EventTypeWarning, "UpdateServiceFail", message)
|
|
return err, cachedService.nextRetryDelay()
|
|
}
|
|
// Always update the cache upon success.
|
|
// NOTE: Since we update the cached service if and only if we successfully
|
|
// processed it, a cached service being nil implies that it hasn't yet
|
|
// been successfully processed.
|
|
|
|
cachedService.appliedState = service
|
|
s.serviceCache.set(key, cachedService)
|
|
glog.V(4).Infof("Successfully proceeded services %s", key)
|
|
cachedService.resetRetryDelay()
|
|
return nil, doNotRetry
|
|
}
|
|
|
|
// delete deletes the given service or returns error if the deletion was not complete.
|
|
func (s *ServiceController) delete(service *v1.Service) error {
|
|
glog.V(3).Infof("Handling deletion of service: %v", *service)
|
|
_, err := s.deletionHelper.HandleObjectInUnderlyingClusters(service)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = s.federationClient.Core().Services(service.Namespace).Delete(service.Name, nil)
|
|
if err != nil {
|
|
// Its all good if the error is not found error. That means it is deleted already and we do not have to do anything.
|
|
// This is expected when we are processing an update as a result of service finalizer deletion.
|
|
// The process that deleted the last finalizer is also going to delete the service and we do not have to do anything.
|
|
if !errors.IsNotFound(err) {
|
|
return fmt.Errorf("failed to delete service: %v", err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// processServiceDeletion returns an error if processing the service deletion failed, along with a time.Duration
|
|
// indicating whether processing should be retried; zero means no-retry; otherwise
|
|
// we should retry in that Duration.
|
|
func (s *ServiceController) processServiceDeletion(key string) (error, time.Duration) {
|
|
glog.V(2).Infof("Process service deletion for %v", key)
|
|
s.serviceCache.delete(key)
|
|
return nil, doNotRetry
|
|
}
|