pbacsko commented on code in PR #890:
URL: https://github.com/apache/yunikorn-k8shim/pull/890#discussion_r1716854876
##########
pkg/cache/context_test.go:
##########
@@ -2433,3 +2433,111 @@ func assertListerPods(pods []*v1.Pod, count int) bool {
}
return count == counted
}
+
+type MockRetryStrategy struct {
+ totalSleep time.Duration
+}
+
+func (m *MockRetryStrategy) Sleep(duration time.Duration) {
+ m.totalSleep += duration
+}
Review Comment:
Extra code, not needed
##########
pkg/cache/context.go:
##########
@@ -754,20 +755,73 @@ func (ctx *Context) bindPodVolumes(pod *v1.Pod) error {
// convert nil to empty array
volumes.DynamicProvisions =
make([]*v1.PersistentVolumeClaim, 0)
}
- err =
ctx.apiProvider.GetAPIs().VolumeBinder.BindPodVolumes(context.Background(),
assumedPod, volumes)
- if err != nil {
- log.Log(log.ShimContext).Error("Failed to bind
pod volumes",
- zap.String("podName", assumedPod.Name),
- zap.String("nodeName",
assumedPod.Spec.NodeName),
- zap.Int("dynamicProvisions",
len(volumes.DynamicProvisions)),
- zap.Int("staticBindings",
len(volumes.StaticBindings)))
- return err
- }
+ // Here we set the max retry count to 5, and use the
default retry strategy
+ // Details:
+ // max sleep time is 1s, 2s, 4s, 8s and the last one
will not sleep
+ return ctx.bindPodVolumesWithRetry(assumedPod, volumes,
5, &DefaultRetryStrategy{})
}
}
return nil
}
+type RetryStrategy interface {
+ // Sleep function used for retry delays
+ Sleep(duration time.Duration)
+}
+
+// DefaultRetryStrategy is a simple retry strategy that sleeps for a fixed
duration
+// We can extend this to support more advanced retry strategies in the future
and also for testing purposes
+type DefaultRetryStrategy struct{}
+
+func (r *DefaultRetryStrategy) Sleep(duration time.Duration) {
+ time.Sleep(duration)
+}
Review Comment:
Lot of extra code, not needed, see below
##########
pkg/cache/context.go:
##########
@@ -754,20 +755,73 @@ func (ctx *Context) bindPodVolumes(pod *v1.Pod) error {
// convert nil to empty array
volumes.DynamicProvisions =
make([]*v1.PersistentVolumeClaim, 0)
}
- err =
ctx.apiProvider.GetAPIs().VolumeBinder.BindPodVolumes(context.Background(),
assumedPod, volumes)
- if err != nil {
- log.Log(log.ShimContext).Error("Failed to bind
pod volumes",
- zap.String("podName", assumedPod.Name),
- zap.String("nodeName",
assumedPod.Spec.NodeName),
- zap.Int("dynamicProvisions",
len(volumes.DynamicProvisions)),
- zap.Int("staticBindings",
len(volumes.StaticBindings)))
- return err
- }
+ // Here we set the max retry count to 5, and use the
default retry strategy
+ // Details:
+ // max sleep time is 1s, 2s, 4s, 8s and the last one
will not sleep
+ return ctx.bindPodVolumesWithRetry(assumedPod, volumes,
5, &DefaultRetryStrategy{})
}
}
return nil
}
+type RetryStrategy interface {
+ // Sleep function used for retry delays
+ Sleep(duration time.Duration)
+}
+
+// DefaultRetryStrategy is a simple retry strategy that sleeps for a fixed
duration
+// We can extend this to support more advanced retry strategies in the future
and also for testing purposes
+type DefaultRetryStrategy struct{}
+
+func (r *DefaultRetryStrategy) Sleep(duration time.Duration) {
+ time.Sleep(duration)
+}
+
+func (ctx *Context) bindPodVolumesWithRetry(
+ assumedPod *v1.Pod,
+ volumes *volumebinding.PodVolumes,
+ maxRetries int,
+ retryStrategy RetryStrategy,
+) error {
+ const baseDelay = time.Second
+ const maxDelay = 8 * time.Second
+
+ var err error
+ for i := 0; i < maxRetries; i++ {
+ err =
ctx.apiProvider.GetAPIs().VolumeBinder.BindPodVolumes(context.Background(),
assumedPod, volumes)
+ if err == nil {
+ return nil
+ }
+
+ log.Log(log.ShimContext).Error("Failed to bind pod volumes",
+ zap.String("podName", assumedPod.Name),
+ zap.String("nodeName", assumedPod.Spec.NodeName),
+ zap.Int("dynamicProvisions",
len(volumes.DynamicProvisions)),
+ zap.Int("staticBindings", len(volumes.StaticBindings)),
+ zap.Int("retryCount", i+1),
+ zap.Error(err))
+
+ if i == maxRetries-1 {
+ log.Log(log.ShimContext).Error("Failed to bind pod
volumes after retry",
+ zap.String("podName", assumedPod.Name),
+ zap.String("nodeName",
assumedPod.Spec.NodeName),
+ zap.Int("dynamicProvisions",
len(volumes.DynamicProvisions)),
+ zap.Int("staticBindings",
len(volumes.StaticBindings)),
+ zap.Error(err))
+ return err
+ }
+
+ delay := baseDelay * time.Duration(1<<uint(i))
+ if delay > maxDelay {
+ delay = maxDelay
+ }
+
+ retryStrategy.Sleep(delay) // Use the retry strategy
+ }
Review Comment:
We have a retry logic in the K8s codebase that we can reuse:
```
import "k8s.io/client-go/util/retry"
backoff := wait.Backoff{
Steps: 5,
Duration: time.Second,
Factor: 2.0,
Jitter: 0,
}
err := retry.OnError(backoff, func(_ error) bool {
return true // retry on all error
}, func() error {
return
ctx.apiProvider.GetAPIs().VolumeBinder.BindPodVolumes(context.Background(),
assumedPod, volumes)
})
```
There's `retry.DefaultRetry` and `retry.DefaultBackoff` but those don't look
suitable for us. With no network delay this retries 5 times with a total wait
time of 30 seconds.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]