From ac882d9c68b5ace8fdb17535a9ed1d335ebd5cef Mon Sep 17 00:00:00 2001 From: Jan Safranek Date: Thu, 17 Jan 2019 11:15:21 +0100 Subject: [PATCH] Allow exponential backoff to save PVs to API server Exponential backoff must be explicitly requested by provisioner. --- controller/controller.go | 120 +++++++++++++++++++++++++++------------ 1 file changed, 83 insertions(+), 37 deletions(-) diff --git a/controller/controller.go b/controller/controller.go index ecbf129..5d035c0 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -131,6 +131,7 @@ type ProvisionController struct { exponentialBackOffOnError bool threadiness int + createProvisionedPVBackoff *wait.Backoff createProvisionedPVRetryCount int createProvisionedPVInterval time.Duration @@ -244,6 +245,9 @@ func CreateProvisionedPVRetryCount(createProvisionedPVRetryCount int) func(*Prov if c.HasRun() { return errRuntime } + if c.createProvisionedPVBackoff != nil { + return fmt.Errorf("CreateProvisionedPVBackoff cannot be used together with CreateProvisionedPVRetryCount") + } c.createProvisionedPVRetryCount = createProvisionedPVRetryCount return nil } @@ -256,11 +260,34 @@ func CreateProvisionedPVInterval(createProvisionedPVInterval time.Duration) func if c.HasRun() { return errRuntime } + if c.createProvisionedPVBackoff != nil { + return fmt.Errorf("CreateProvisionedPVBackoff cannot be used together with CreateProvisionedPVInterval") + } c.createProvisionedPVInterval = createProvisionedPVInterval return nil } } +// CreateProvisionedPVBackoff is the configuration of exponential backoff between retries when we create a +// PV object for a provisioned volume. Defaults to linear backoff, 10 seconds 5 times. +// Only one of CreateProvisionedPVInterval+CreateProvisionedPVRetryCount or CreateProvisionedPVBackoff +// can be used. +func CreateProvisionedPVBackoff(backoff wait.Backoff) func(*ProvisionController) error { + return func(c *ProvisionController) error { + if c.HasRun() { + return errRuntime + } + if c.createProvisionedPVRetryCount != 0 { + return fmt.Errorf("CreateProvisionedPVBackoff cannot be used together with CreateProvisionedPVRetryCount") + } + if c.createProvisionedPVInterval != 0 { + return fmt.Errorf("CreateProvisionedPVBackoff cannot be used together with CreateProvisionedPVInterval") + } + c.createProvisionedPVBackoff = &backoff + return nil + } +} + // FailedProvisionThreshold is the threshold for max number of retries on // failures of Provision. Defaults to 15. func FailedProvisionThreshold(failedProvisionThreshold int) func(*ProvisionController) error { @@ -452,34 +479,35 @@ func NewProvisionController( eventRecorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: component}) controller := &ProvisionController{ - client: client, - provisionerName: provisionerName, - provisioner: provisioner, - kubeVersion: utilversion.MustParseSemantic(kubeVersion), - id: id, - component: component, - eventRecorder: eventRecorder, - resyncPeriod: DefaultResyncPeriod, - exponentialBackOffOnError: DefaultExponentialBackOffOnError, - threadiness: DefaultThreadiness, - createProvisionedPVRetryCount: DefaultCreateProvisionedPVRetryCount, - createProvisionedPVInterval: DefaultCreateProvisionedPVInterval, - failedProvisionThreshold: DefaultFailedProvisionThreshold, - failedDeleteThreshold: DefaultFailedDeleteThreshold, - leaderElection: DefaultLeaderElection, - leaderElectionNamespace: getInClusterNamespace(), - leaseDuration: DefaultLeaseDuration, - renewDeadline: DefaultRenewDeadline, - retryPeriod: DefaultRetryPeriod, - metricsPort: DefaultMetricsPort, - metricsAddress: DefaultMetricsAddress, - metricsPath: DefaultMetricsPath, - hasRun: false, - hasRunLock: &sync.Mutex{}, + client: client, + provisionerName: provisionerName, + provisioner: provisioner, + kubeVersion: utilversion.MustParseSemantic(kubeVersion), + id: id, + component: component, + eventRecorder: eventRecorder, + resyncPeriod: DefaultResyncPeriod, + exponentialBackOffOnError: DefaultExponentialBackOffOnError, + threadiness: DefaultThreadiness, + failedProvisionThreshold: DefaultFailedProvisionThreshold, + failedDeleteThreshold: DefaultFailedDeleteThreshold, + leaderElection: DefaultLeaderElection, + leaderElectionNamespace: getInClusterNamespace(), + leaseDuration: DefaultLeaseDuration, + renewDeadline: DefaultRenewDeadline, + retryPeriod: DefaultRetryPeriod, + metricsPort: DefaultMetricsPort, + metricsAddress: DefaultMetricsAddress, + metricsPath: DefaultMetricsPath, + hasRun: false, + hasRunLock: &sync.Mutex{}, } for _, option := range options { - option(controller) + err := option(controller) + if err != nil { + glog.Fatalf("Error processing controller options: %s", err) + } } var rateLimiter workqueue.RateLimiter @@ -500,6 +528,22 @@ func NewProvisionController( controller.claimQueue = workqueue.NewNamedRateLimitingQueue(rateLimiter, "claims") controller.volumeQueue = workqueue.NewNamedRateLimitingQueue(rateLimiter, "volumes") + if controller.createProvisionedPVBackoff == nil { + // Use linear backoff with createProvisionedPVInterval and createProvisionedPVRetryCount by default. + if controller.createProvisionedPVInterval == 0 { + controller.createProvisionedPVInterval = DefaultCreateProvisionedPVInterval + } + if controller.createProvisionedPVRetryCount == 0 { + controller.createProvisionedPVRetryCount = DefaultCreateProvisionedPVRetryCount + } + controller.createProvisionedPVBackoff = &wait.Backoff{ + Duration: controller.createProvisionedPVInterval, + Factor: 1, // linear backoff + Steps: controller.createProvisionedPVRetryCount, + Cap: controller.createProvisionedPVInterval, + } + } + informer := informers.NewSharedInformerFactory(client, controller.resyncPeriod) // ---------------------- @@ -1077,47 +1121,49 @@ func (ctrl *ProvisionController) provisionClaimOperation(claim *v1.PersistentVol } // Try to create the PV object several times - for i := 0; i < ctrl.createProvisionedPVRetryCount; i++ { + var lastSaveError error + err = wait.ExponentialBackoff(*ctrl.createProvisionedPVBackoff, func() (bool, error) { glog.Info(logOperation(operation, "trying to save persistentvolume %q", volume.Name)) if _, err = ctrl.client.CoreV1().PersistentVolumes().Create(volume); err == nil || apierrs.IsAlreadyExists(err) { // Save succeeded. if err != nil { glog.Info(logOperation(operation, "persistentvolume %q already exists, reusing", volume.Name)) - err = nil } else { glog.Info(logOperation(operation, "persistentvolume %q saved", volume.Name)) } - break + return true, nil } // Save failed, try again after a while. glog.Info(logOperation(operation, "failed to save persistentvolume %q: %v", volume.Name, err)) - time.Sleep(ctrl.createProvisionedPVInterval) - } + lastSaveError = err + return false, nil + }) if err != nil { // Save failed. Now we have a storage asset outside of Kubernetes, // but we don't have appropriate PV object for it. // Emit some event here and try to delete the storage asset several // times. - strerr := fmt.Sprintf("Error creating provisioned PV object for claim %s: %v. Deleting the volume.", claimToClaimKey(claim), err) + strerr := fmt.Sprintf("Error creating provisioned PV object for claim %s: %v. Deleting the volume.", claimToClaimKey(claim), lastSaveError) glog.Error(logOperation(operation, strerr)) ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningFailed", strerr) - for i := 0; i < ctrl.createProvisionedPVRetryCount; i++ { + var lastDeleteError error + err = wait.ExponentialBackoff(*ctrl.createProvisionedPVBackoff, func() (bool, error) { if err = ctrl.provisioner.Delete(volume); err == nil { // Delete succeeded glog.Info(logOperation(operation, "cleaning volume %q succeeded", volume.Name)) - break + return true, nil } // Delete failed, try again after a while. glog.Info(logOperation(operation, "failed to clean volume %q: %v", volume.Name, err)) - time.Sleep(ctrl.createProvisionedPVInterval) - } - + lastDeleteError = err + return false, nil + }) if err != nil { // Delete failed several times. There is an orphaned volume and there // is nothing we can do about it. - strerr := fmt.Sprintf("Error cleaning provisioned volume for claim %s: %v. Please delete manually.", claimToClaimKey(claim), err) + strerr := fmt.Sprintf("Error cleaning provisioned volume for claim %s: %v. Please delete manually.", claimToClaimKey(claim), lastDeleteError) glog.Error(logOperation(operation, strerr)) ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningCleanupFailed", strerr) }