Skip to content

Commit

Permalink
Allow exponential backoff to save PVs to API server
Browse files Browse the repository at this point in the history
Exponential backoff must be explicitly requested by provisioner.
  • Loading branch information
jsafrane authored and wongma7 committed Jan 18, 2019
1 parent c2c650d commit 0af807b
Showing 1 changed file with 82 additions and 37 deletions.
119 changes: 82 additions & 37 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ type ProvisionController struct {
exponentialBackOffOnError bool
threadiness int

createProvisionedPVBackoff *wait.Backoff
createProvisionedPVRetryCount int
createProvisionedPVInterval time.Duration

Expand Down Expand Up @@ -244,6 +245,9 @@ func CreateProvisionedPVRetryCount(createProvisionedPVRetryCount int) func(*Prov
if c.HasRun() {
return errRuntime
}
if c.createProvisionedPVBackoff != nil {
return fmt.Errorf("CreateProvisionedPVBackoff cannot be used together with CreateProvisionedPVRetryCount")
}
c.createProvisionedPVRetryCount = createProvisionedPVRetryCount
return nil
}
Expand All @@ -256,11 +260,34 @@ func CreateProvisionedPVInterval(createProvisionedPVInterval time.Duration) func
if c.HasRun() {
return errRuntime
}
if c.createProvisionedPVBackoff != nil {
return fmt.Errorf("CreateProvisionedPVBackoff cannot be used together with CreateProvisionedPVInterval")
}
c.createProvisionedPVInterval = createProvisionedPVInterval
return nil
}
}

// CreateProvisionedPVBackoff is the configuration of exponential backoff between retries when we create a
// PV object for a provisioned volume. Defaults to linear backoff, 10 seconds 5 times.
// Only one of CreateProvisionedPVInterval+CreateProvisionedPVRetryCount or CreateProvisionedPVBackoff
// can be used.
func CreateProvisionedPVBackoff(backoff wait.Backoff) func(*ProvisionController) error {
return func(c *ProvisionController) error {
if c.HasRun() {
return errRuntime
}
if c.createProvisionedPVRetryCount != 0 {
return fmt.Errorf("CreateProvisionedPVBackoff cannot be used together with CreateProvisionedPVRetryCount")
}
if c.createProvisionedPVInterval != 0 {
return fmt.Errorf("CreateProvisionedPVBackoff cannot be used together with CreateProvisionedPVInterval")
}
c.createProvisionedPVBackoff = &backoff
return nil
}
}

// FailedProvisionThreshold is the threshold for max number of retries on
// failures of Provision. Set to 0 to retry indefinitely. Defaults to 15.
func FailedProvisionThreshold(failedProvisionThreshold int) func(*ProvisionController) error {
Expand Down Expand Up @@ -452,34 +479,35 @@ func NewProvisionController(
eventRecorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: component})

controller := &ProvisionController{
client: client,
provisionerName: provisionerName,
provisioner: provisioner,
kubeVersion: utilversion.MustParseSemantic(kubeVersion),
id: id,
component: component,
eventRecorder: eventRecorder,
resyncPeriod: DefaultResyncPeriod,
exponentialBackOffOnError: DefaultExponentialBackOffOnError,
threadiness: DefaultThreadiness,
createProvisionedPVRetryCount: DefaultCreateProvisionedPVRetryCount,
createProvisionedPVInterval: DefaultCreateProvisionedPVInterval,
failedProvisionThreshold: DefaultFailedProvisionThreshold,
failedDeleteThreshold: DefaultFailedDeleteThreshold,
leaderElection: DefaultLeaderElection,
leaderElectionNamespace: getInClusterNamespace(),
leaseDuration: DefaultLeaseDuration,
renewDeadline: DefaultRenewDeadline,
retryPeriod: DefaultRetryPeriod,
metricsPort: DefaultMetricsPort,
metricsAddress: DefaultMetricsAddress,
metricsPath: DefaultMetricsPath,
hasRun: false,
hasRunLock: &sync.Mutex{},
client: client,
provisionerName: provisionerName,
provisioner: provisioner,
kubeVersion: utilversion.MustParseSemantic(kubeVersion),
id: id,
component: component,
eventRecorder: eventRecorder,
resyncPeriod: DefaultResyncPeriod,
exponentialBackOffOnError: DefaultExponentialBackOffOnError,
threadiness: DefaultThreadiness,
failedProvisionThreshold: DefaultFailedProvisionThreshold,
failedDeleteThreshold: DefaultFailedDeleteThreshold,
leaderElection: DefaultLeaderElection,
leaderElectionNamespace: getInClusterNamespace(),
leaseDuration: DefaultLeaseDuration,
renewDeadline: DefaultRenewDeadline,
retryPeriod: DefaultRetryPeriod,
metricsPort: DefaultMetricsPort,
metricsAddress: DefaultMetricsAddress,
metricsPath: DefaultMetricsPath,
hasRun: false,
hasRunLock: &sync.Mutex{},
}

for _, option := range options {
option(controller)
err := option(controller)
if err != nil {
glog.Fatalf("Error processing controller options: %s", err)
}
}

var rateLimiter workqueue.RateLimiter
Expand All @@ -500,6 +528,21 @@ func NewProvisionController(
controller.claimQueue = workqueue.NewNamedRateLimitingQueue(rateLimiter, "claims")
controller.volumeQueue = workqueue.NewNamedRateLimitingQueue(rateLimiter, "volumes")

if controller.createProvisionedPVBackoff == nil {
// Use linear backoff with createProvisionedPVInterval and createProvisionedPVRetryCount by default.
if controller.createProvisionedPVInterval == 0 {
controller.createProvisionedPVInterval = DefaultCreateProvisionedPVInterval
}
if controller.createProvisionedPVRetryCount == 0 {
controller.createProvisionedPVRetryCount = DefaultCreateProvisionedPVRetryCount
}
controller.createProvisionedPVBackoff = &wait.Backoff{
Duration: controller.createProvisionedPVInterval,
Factor: 1, // linear backoff
Steps: controller.createProvisionedPVRetryCount,
}
}

informer := informers.NewSharedInformerFactory(client, controller.resyncPeriod)

// ----------------------
Expand Down Expand Up @@ -1083,47 +1126,49 @@ func (ctrl *ProvisionController) provisionClaimOperation(claim *v1.PersistentVol
}

// Try to create the PV object several times
for i := 0; i < ctrl.createProvisionedPVRetryCount; i++ {
var lastSaveError error
err = wait.ExponentialBackoff(*ctrl.createProvisionedPVBackoff, func() (bool, error) {
glog.Info(logOperation(operation, "trying to save persistentvolume %q", volume.Name))
if _, err = ctrl.client.CoreV1().PersistentVolumes().Create(volume); err == nil || apierrs.IsAlreadyExists(err) {
// Save succeeded.
if err != nil {
glog.Info(logOperation(operation, "persistentvolume %q already exists, reusing", volume.Name))
err = nil
} else {
glog.Info(logOperation(operation, "persistentvolume %q saved", volume.Name))
}
break
return true, nil
}
// Save failed, try again after a while.
glog.Info(logOperation(operation, "failed to save persistentvolume %q: %v", volume.Name, err))
time.Sleep(ctrl.createProvisionedPVInterval)
}
lastSaveError = err
return false, nil
})

if err != nil {
// Save failed. Now we have a storage asset outside of Kubernetes,
// but we don't have appropriate PV object for it.
// Emit some event here and try to delete the storage asset several
// times.
strerr := fmt.Sprintf("Error creating provisioned PV object for claim %s: %v. Deleting the volume.", claimToClaimKey(claim), err)
strerr := fmt.Sprintf("Error creating provisioned PV object for claim %s: %v. Deleting the volume.", claimToClaimKey(claim), lastSaveError)
glog.Error(logOperation(operation, strerr))
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningFailed", strerr)

for i := 0; i < ctrl.createProvisionedPVRetryCount; i++ {
var lastDeleteError error
err = wait.ExponentialBackoff(*ctrl.createProvisionedPVBackoff, func() (bool, error) {
if err = ctrl.provisioner.Delete(volume); err == nil {
// Delete succeeded
glog.Info(logOperation(operation, "cleaning volume %q succeeded", volume.Name))
break
return true, nil
}
// Delete failed, try again after a while.
glog.Info(logOperation(operation, "failed to clean volume %q: %v", volume.Name, err))
time.Sleep(ctrl.createProvisionedPVInterval)
}

lastDeleteError = err
return false, nil
})
if err != nil {
// Delete failed several times. There is an orphaned volume and there
// is nothing we can do about it.
strerr := fmt.Sprintf("Error cleaning provisioned volume for claim %s: %v. Please delete manually.", claimToClaimKey(claim), err)
strerr := fmt.Sprintf("Error cleaning provisioned volume for claim %s: %v. Please delete manually.", claimToClaimKey(claim), lastDeleteError)
glog.Error(logOperation(operation, strerr))
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningCleanupFailed", strerr)
}
Expand Down

0 comments on commit 0af807b

Please sign in to comment.