Skip to content

Commit

Permalink
Merge pull request #16 from jsafrane/update-pv-save-backoff
Browse files Browse the repository at this point in the history
Allow exponential backoff to save PVs to API server
  • Loading branch information
k8s-ci-robot committed Jan 17, 2019
2 parents 1a00ed7 + ac882d9 commit 9bdb352
Showing 1 changed file with 83 additions and 37 deletions.
120 changes: 83 additions & 37 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ type ProvisionController struct {
exponentialBackOffOnError bool
threadiness int

createProvisionedPVBackoff *wait.Backoff
createProvisionedPVRetryCount int
createProvisionedPVInterval time.Duration

Expand Down Expand Up @@ -244,6 +245,9 @@ func CreateProvisionedPVRetryCount(createProvisionedPVRetryCount int) func(*Prov
if c.HasRun() {
return errRuntime
}
if c.createProvisionedPVBackoff != nil {
return fmt.Errorf("CreateProvisionedPVBackoff cannot be used together with CreateProvisionedPVRetryCount")
}
c.createProvisionedPVRetryCount = createProvisionedPVRetryCount
return nil
}
Expand All @@ -256,11 +260,34 @@ func CreateProvisionedPVInterval(createProvisionedPVInterval time.Duration) func
if c.HasRun() {
return errRuntime
}
if c.createProvisionedPVBackoff != nil {
return fmt.Errorf("CreateProvisionedPVBackoff cannot be used together with CreateProvisionedPVInterval")
}
c.createProvisionedPVInterval = createProvisionedPVInterval
return nil
}
}

// CreateProvisionedPVBackoff is the configuration of exponential backoff between retries when we create a
// PV object for a provisioned volume. Defaults to linear backoff, 10 seconds 5 times.
// Only one of CreateProvisionedPVInterval+CreateProvisionedPVRetryCount or CreateProvisionedPVBackoff
// can be used.
func CreateProvisionedPVBackoff(backoff wait.Backoff) func(*ProvisionController) error {
return func(c *ProvisionController) error {
if c.HasRun() {
return errRuntime
}
if c.createProvisionedPVRetryCount != 0 {
return fmt.Errorf("CreateProvisionedPVBackoff cannot be used together with CreateProvisionedPVRetryCount")
}
if c.createProvisionedPVInterval != 0 {
return fmt.Errorf("CreateProvisionedPVBackoff cannot be used together with CreateProvisionedPVInterval")
}
c.createProvisionedPVBackoff = &backoff
return nil
}
}

// FailedProvisionThreshold is the threshold for max number of retries on
// failures of Provision. Defaults to 15.
func FailedProvisionThreshold(failedProvisionThreshold int) func(*ProvisionController) error {
Expand Down Expand Up @@ -452,34 +479,35 @@ func NewProvisionController(
eventRecorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: component})

controller := &ProvisionController{
client: client,
provisionerName: provisionerName,
provisioner: provisioner,
kubeVersion: utilversion.MustParseSemantic(kubeVersion),
id: id,
component: component,
eventRecorder: eventRecorder,
resyncPeriod: DefaultResyncPeriod,
exponentialBackOffOnError: DefaultExponentialBackOffOnError,
threadiness: DefaultThreadiness,
createProvisionedPVRetryCount: DefaultCreateProvisionedPVRetryCount,
createProvisionedPVInterval: DefaultCreateProvisionedPVInterval,
failedProvisionThreshold: DefaultFailedProvisionThreshold,
failedDeleteThreshold: DefaultFailedDeleteThreshold,
leaderElection: DefaultLeaderElection,
leaderElectionNamespace: getInClusterNamespace(),
leaseDuration: DefaultLeaseDuration,
renewDeadline: DefaultRenewDeadline,
retryPeriod: DefaultRetryPeriod,
metricsPort: DefaultMetricsPort,
metricsAddress: DefaultMetricsAddress,
metricsPath: DefaultMetricsPath,
hasRun: false,
hasRunLock: &sync.Mutex{},
client: client,
provisionerName: provisionerName,
provisioner: provisioner,
kubeVersion: utilversion.MustParseSemantic(kubeVersion),
id: id,
component: component,
eventRecorder: eventRecorder,
resyncPeriod: DefaultResyncPeriod,
exponentialBackOffOnError: DefaultExponentialBackOffOnError,
threadiness: DefaultThreadiness,
failedProvisionThreshold: DefaultFailedProvisionThreshold,
failedDeleteThreshold: DefaultFailedDeleteThreshold,
leaderElection: DefaultLeaderElection,
leaderElectionNamespace: getInClusterNamespace(),
leaseDuration: DefaultLeaseDuration,
renewDeadline: DefaultRenewDeadline,
retryPeriod: DefaultRetryPeriod,
metricsPort: DefaultMetricsPort,
metricsAddress: DefaultMetricsAddress,
metricsPath: DefaultMetricsPath,
hasRun: false,
hasRunLock: &sync.Mutex{},
}

for _, option := range options {
option(controller)
err := option(controller)
if err != nil {
glog.Fatalf("Error processing controller options: %s", err)
}
}

var rateLimiter workqueue.RateLimiter
Expand All @@ -500,6 +528,22 @@ func NewProvisionController(
controller.claimQueue = workqueue.NewNamedRateLimitingQueue(rateLimiter, "claims")
controller.volumeQueue = workqueue.NewNamedRateLimitingQueue(rateLimiter, "volumes")

if controller.createProvisionedPVBackoff == nil {
// Use linear backoff with createProvisionedPVInterval and createProvisionedPVRetryCount by default.
if controller.createProvisionedPVInterval == 0 {
controller.createProvisionedPVInterval = DefaultCreateProvisionedPVInterval
}
if controller.createProvisionedPVRetryCount == 0 {
controller.createProvisionedPVRetryCount = DefaultCreateProvisionedPVRetryCount
}
controller.createProvisionedPVBackoff = &wait.Backoff{
Duration: controller.createProvisionedPVInterval,
Factor: 1, // linear backoff
Steps: controller.createProvisionedPVRetryCount,
Cap: controller.createProvisionedPVInterval,
}
}

informer := informers.NewSharedInformerFactory(client, controller.resyncPeriod)

// ----------------------
Expand Down Expand Up @@ -1077,47 +1121,49 @@ func (ctrl *ProvisionController) provisionClaimOperation(claim *v1.PersistentVol
}

// Try to create the PV object several times
for i := 0; i < ctrl.createProvisionedPVRetryCount; i++ {
var lastSaveError error
err = wait.ExponentialBackoff(*ctrl.createProvisionedPVBackoff, func() (bool, error) {
glog.Info(logOperation(operation, "trying to save persistentvolume %q", volume.Name))
if _, err = ctrl.client.CoreV1().PersistentVolumes().Create(volume); err == nil || apierrs.IsAlreadyExists(err) {
// Save succeeded.
if err != nil {
glog.Info(logOperation(operation, "persistentvolume %q already exists, reusing", volume.Name))
err = nil
} else {
glog.Info(logOperation(operation, "persistentvolume %q saved", volume.Name))
}
break
return true, nil
}
// Save failed, try again after a while.
glog.Info(logOperation(operation, "failed to save persistentvolume %q: %v", volume.Name, err))
time.Sleep(ctrl.createProvisionedPVInterval)
}
lastSaveError = err
return false, nil
})

if err != nil {
// Save failed. Now we have a storage asset outside of Kubernetes,
// but we don't have appropriate PV object for it.
// Emit some event here and try to delete the storage asset several
// times.
strerr := fmt.Sprintf("Error creating provisioned PV object for claim %s: %v. Deleting the volume.", claimToClaimKey(claim), err)
strerr := fmt.Sprintf("Error creating provisioned PV object for claim %s: %v. Deleting the volume.", claimToClaimKey(claim), lastSaveError)
glog.Error(logOperation(operation, strerr))
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningFailed", strerr)

for i := 0; i < ctrl.createProvisionedPVRetryCount; i++ {
var lastDeleteError error
err = wait.ExponentialBackoff(*ctrl.createProvisionedPVBackoff, func() (bool, error) {
if err = ctrl.provisioner.Delete(volume); err == nil {
// Delete succeeded
glog.Info(logOperation(operation, "cleaning volume %q succeeded", volume.Name))
break
return true, nil
}
// Delete failed, try again after a while.
glog.Info(logOperation(operation, "failed to clean volume %q: %v", volume.Name, err))
time.Sleep(ctrl.createProvisionedPVInterval)
}

lastDeleteError = err
return false, nil
})
if err != nil {
// Delete failed several times. There is an orphaned volume and there
// is nothing we can do about it.
strerr := fmt.Sprintf("Error cleaning provisioned volume for claim %s: %v. Please delete manually.", claimToClaimKey(claim), err)
strerr := fmt.Sprintf("Error cleaning provisioned volume for claim %s: %v. Please delete manually.", claimToClaimKey(claim), lastDeleteError)
glog.Error(logOperation(operation, strerr))
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningCleanupFailed", strerr)
}
Expand Down

0 comments on commit 9bdb352

Please sign in to comment.