craigcondit commented on a change in pull request #359:
URL:
https://github.com/apache/incubator-yunikorn-k8shim/pull/359#discussion_r791158077
##########
File path: pkg/cache/context.go
##########
@@ -365,19 +366,68 @@ func (ctx *Context) bindPodVolumes(pod *v1.Pod) error {
zap.String("podName", pod.Name))
} else {
log.Logger().Info("Binding Pod Volumes",
zap.String("podName", pod.Name))
- boundClaims, claimsToBind, _, err :=
ctx.apiProvider.GetAPIs().VolumeBinder.GetPodVolumes(assumedPod)
+ boundClaims, claimsToBind, unboundClaimsImmediate, err
:= ctx.apiProvider.GetAPIs().VolumeBinder.GetPodVolumes(assumedPod)
if err != nil {
+ log.Logger().Error("Failed to get pod volumes",
+ zap.String("podName", assumedPod.Name),
+ zap.Error(err))
+ return err
+ }
+ if len(unboundClaimsImmediate) > 0 {
+ err = fmt.Errorf("pod %s has unbound immediate
claims", pod.Name)
+ log.Logger().Error("Pod has unbound immediate
claims",
+ zap.String("podName", assumedPod.Name),
+ zap.Error(err))
return err
}
node, err :=
ctx.schedulerCache.GetNodeInfo(assumedPod.Spec.NodeName)
if err != nil {
+ log.Logger().Error("Failed to get node info",
+ zap.String("podName", assumedPod.Name),
+ zap.String("nodeName",
assumedPod.Spec.NodeName),
+ zap.Error(err))
+ return err
+ }
+ volumes, reasons, err :=
ctx.apiProvider.GetAPIs().VolumeBinder.FindPodVolumes(assumedPod, boundClaims,
claimsToBind, node)
+ if err != nil {
+ log.Logger().Error("Failed to find pod volumes",
+ zap.String("podName", assumedPod.Name),
+ zap.String("nodeName",
assumedPod.Spec.NodeName),
+ zap.Int("claimsToBind",
len(claimsToBind)),
+ zap.Error(err))
return err
}
- volumes, _, err :=
ctx.apiProvider.GetAPIs().VolumeBinder.FindPodVolumes(assumedPod, boundClaims,
claimsToBind, node)
+ if len(reasons) > 0 {
+ sReasons := make([]string, 0)
+ for _, reason := range reasons {
+ sReasons = append(sReasons,
string(reason))
+ }
+ sReason := strings.Join(sReasons, ", ")
+ err = fmt.Errorf("pod %s has conflicting volume
claims: %s", pod.Name, sReason)
+ log.Logger().Error("Pod has conflicting volume
claims",
+ zap.String("podName", assumedPod.Name),
+ zap.String("nodeName",
assumedPod.Spec.NodeName),
+ zap.Int("claimsToBind",
len(claimsToBind)),
+ zap.Error(err))
+ return err
+ }
+ if volumes.StaticBindings == nil {
Review comment:
This is the actual fix (ensure these slices are empty rather than nil).
##########
File path: pkg/cache/context.go
##########
@@ -365,19 +366,68 @@ func (ctx *Context) bindPodVolumes(pod *v1.Pod) error {
zap.String("podName", pod.Name))
} else {
log.Logger().Info("Binding Pod Volumes",
zap.String("podName", pod.Name))
- boundClaims, claimsToBind, _, err :=
ctx.apiProvider.GetAPIs().VolumeBinder.GetPodVolumes(assumedPod)
+ boundClaims, claimsToBind, unboundClaimsImmediate, err
:= ctx.apiProvider.GetAPIs().VolumeBinder.GetPodVolumes(assumedPod)
if err != nil {
+ log.Logger().Error("Failed to get pod volumes",
+ zap.String("podName", assumedPod.Name),
+ zap.Error(err))
+ return err
+ }
+ if len(unboundClaimsImmediate) > 0 {
+ err = fmt.Errorf("pod %s has unbound immediate
claims", pod.Name)
+ log.Logger().Error("Pod has unbound immediate
claims",
+ zap.String("podName", assumedPod.Name),
+ zap.Error(err))
return err
}
node, err :=
ctx.schedulerCache.GetNodeInfo(assumedPod.Spec.NodeName)
if err != nil {
+ log.Logger().Error("Failed to get node info",
+ zap.String("podName", assumedPod.Name),
+ zap.String("nodeName",
assumedPod.Spec.NodeName),
+ zap.Error(err))
+ return err
+ }
+ volumes, reasons, err :=
ctx.apiProvider.GetAPIs().VolumeBinder.FindPodVolumes(assumedPod, boundClaims,
claimsToBind, node)
+ if err != nil {
+ log.Logger().Error("Failed to find pod volumes",
+ zap.String("podName", assumedPod.Name),
+ zap.String("nodeName",
assumedPod.Spec.NodeName),
+ zap.Int("claimsToBind",
len(claimsToBind)),
+ zap.Error(err))
return err
}
- volumes, _, err :=
ctx.apiProvider.GetAPIs().VolumeBinder.FindPodVolumes(assumedPod, boundClaims,
claimsToBind, node)
+ if len(reasons) > 0 {
+ sReasons := make([]string, 0)
+ for _, reason := range reasons {
+ sReasons = append(sReasons,
string(reason))
+ }
+ sReason := strings.Join(sReasons, ", ")
+ err = fmt.Errorf("pod %s has conflicting volume
claims: %s", pod.Name, sReason)
+ log.Logger().Error("Pod has conflicting volume
claims",
+ zap.String("podName", assumedPod.Name),
+ zap.String("nodeName",
assumedPod.Spec.NodeName),
+ zap.Int("claimsToBind",
len(claimsToBind)),
+ zap.Error(err))
+ return err
+ }
+ if volumes.StaticBindings == nil {
+ // convert nil to empty array
+ volumes.StaticBindings =
make([]*scheduling.BindingInfo, 0)
+ }
+ if volumes.DynamicProvisions == nil {
+ // convert nil to empty array
+ volumes.DynamicProvisions =
make([]*v1.PersistentVolumeClaim, 0)
+ }
Review comment:
This is the actual fix -- don't allow the slices within volumes to be
nil.
##########
File path: pkg/cache/context.go
##########
@@ -365,19 +366,68 @@ func (ctx *Context) bindPodVolumes(pod *v1.Pod) error {
zap.String("podName", pod.Name))
} else {
log.Logger().Info("Binding Pod Volumes",
zap.String("podName", pod.Name))
- boundClaims, claimsToBind, _, err :=
ctx.apiProvider.GetAPIs().VolumeBinder.GetPodVolumes(assumedPod)
+ boundClaims, claimsToBind, unboundClaimsImmediate, err
:= ctx.apiProvider.GetAPIs().VolumeBinder.GetPodVolumes(assumedPod)
if err != nil {
+ log.Logger().Error("Failed to get pod volumes",
+ zap.String("podName", assumedPod.Name),
+ zap.Error(err))
+ return err
+ }
+ if len(unboundClaimsImmediate) > 0 {
+ err = fmt.Errorf("pod %s has unbound immediate
claims", pod.Name)
+ log.Logger().Error("Pod has unbound immediate
claims",
+ zap.String("podName", assumedPod.Name),
+ zap.Error(err))
return err
}
Review comment:
This is based on upstream K8s; if we have unbound immediate claims, we
can't satisfy this.
##########
File path: pkg/cache/context.go
##########
@@ -365,19 +366,68 @@ func (ctx *Context) bindPodVolumes(pod *v1.Pod) error {
zap.String("podName", pod.Name))
} else {
log.Logger().Info("Binding Pod Volumes",
zap.String("podName", pod.Name))
- boundClaims, claimsToBind, _, err :=
ctx.apiProvider.GetAPIs().VolumeBinder.GetPodVolumes(assumedPod)
+ boundClaims, claimsToBind, unboundClaimsImmediate, err
:= ctx.apiProvider.GetAPIs().VolumeBinder.GetPodVolumes(assumedPod)
if err != nil {
+ log.Logger().Error("Failed to get pod volumes",
+ zap.String("podName", assumedPod.Name),
+ zap.Error(err))
+ return err
+ }
+ if len(unboundClaimsImmediate) > 0 {
+ err = fmt.Errorf("pod %s has unbound immediate
claims", pod.Name)
+ log.Logger().Error("Pod has unbound immediate
claims",
+ zap.String("podName", assumedPod.Name),
+ zap.Error(err))
return err
}
node, err :=
ctx.schedulerCache.GetNodeInfo(assumedPod.Spec.NodeName)
if err != nil {
+ log.Logger().Error("Failed to get node info",
+ zap.String("podName", assumedPod.Name),
+ zap.String("nodeName",
assumedPod.Spec.NodeName),
+ zap.Error(err))
+ return err
+ }
+ volumes, reasons, err :=
ctx.apiProvider.GetAPIs().VolumeBinder.FindPodVolumes(assumedPod, boundClaims,
claimsToBind, node)
+ if err != nil {
+ log.Logger().Error("Failed to find pod volumes",
+ zap.String("podName", assumedPod.Name),
+ zap.String("nodeName",
assumedPod.Spec.NodeName),
+ zap.Int("claimsToBind",
len(claimsToBind)),
+ zap.Error(err))
return err
}
- volumes, _, err :=
ctx.apiProvider.GetAPIs().VolumeBinder.FindPodVolumes(assumedPod, boundClaims,
claimsToBind, node)
+ if len(reasons) > 0 {
+ sReasons := make([]string, 0)
+ for _, reason := range reasons {
+ sReasons = append(sReasons,
string(reason))
+ }
+ sReason := strings.Join(sReasons, ", ")
+ err = fmt.Errorf("pod %s has conflicting volume
claims: %s", pod.Name, sReason)
+ log.Logger().Error("Pod has conflicting volume
claims",
+ zap.String("podName", assumedPod.Name),
+ zap.String("nodeName",
assumedPod.Spec.NodeName),
+ zap.Int("claimsToBind",
len(claimsToBind)),
+ zap.Error(err))
+ return err
+ }
Review comment:
This was added based on upstream K8s. If any reasons are returned from
FindPodVolumes, there are conflicts and we must fail.
##########
File path: pkg/cache/context.go
##########
@@ -365,19 +366,68 @@ func (ctx *Context) bindPodVolumes(pod *v1.Pod) error {
zap.String("podName", pod.Name))
} else {
log.Logger().Info("Binding Pod Volumes",
zap.String("podName", pod.Name))
- boundClaims, claimsToBind, _, err :=
ctx.apiProvider.GetAPIs().VolumeBinder.GetPodVolumes(assumedPod)
+ boundClaims, claimsToBind, unboundClaimsImmediate, err
:= ctx.apiProvider.GetAPIs().VolumeBinder.GetPodVolumes(assumedPod)
if err != nil {
+ log.Logger().Error("Failed to get pod volumes",
+ zap.String("podName", assumedPod.Name),
+ zap.Error(err))
+ return err
+ }
+ if len(unboundClaimsImmediate) > 0 {
+ err = fmt.Errorf("pod %s has unbound immediate
claims", pod.Name)
+ log.Logger().Error("Pod has unbound immediate
claims",
+ zap.String("podName", assumedPod.Name),
+ zap.Error(err))
return err
}
node, err :=
ctx.schedulerCache.GetNodeInfo(assumedPod.Spec.NodeName)
if err != nil {
+ log.Logger().Error("Failed to get node info",
+ zap.String("podName", assumedPod.Name),
+ zap.String("nodeName",
assumedPod.Spec.NodeName),
+ zap.Error(err))
+ return err
+ }
+ volumes, reasons, err :=
ctx.apiProvider.GetAPIs().VolumeBinder.FindPodVolumes(assumedPod, boundClaims,
claimsToBind, node)
+ if err != nil {
+ log.Logger().Error("Failed to find pod volumes",
+ zap.String("podName", assumedPod.Name),
+ zap.String("nodeName",
assumedPod.Spec.NodeName),
+ zap.Int("claimsToBind",
len(claimsToBind)),
+ zap.Error(err))
return err
}
- volumes, _, err :=
ctx.apiProvider.GetAPIs().VolumeBinder.FindPodVolumes(assumedPod, boundClaims,
claimsToBind, node)
+ if len(reasons) > 0 {
+ sReasons := make([]string, 0)
+ for _, reason := range reasons {
+ sReasons = append(sReasons,
string(reason))
+ }
+ sReason := strings.Join(sReasons, ", ")
+ err = fmt.Errorf("pod %s has conflicting volume
claims: %s", pod.Name, sReason)
+ log.Logger().Error("Pod has conflicting volume
claims",
+ zap.String("podName", assumedPod.Name),
+ zap.String("nodeName",
assumedPod.Spec.NodeName),
+ zap.Int("claimsToBind",
len(claimsToBind)),
+ zap.Error(err))
+ return err
+ }
+ if volumes.StaticBindings == nil {
+ // convert nil to empty array
+ volumes.StaticBindings =
make([]*scheduling.BindingInfo, 0)
+ }
+ if volumes.DynamicProvisions == nil {
+ // convert nil to empty array
+ volumes.DynamicProvisions =
make([]*v1.PersistentVolumeClaim, 0)
+ }
Review comment:
> We need tests for this code path.
I don't disagree. However, currently, the entirety of volume binding is
disabled in test mode. Mocking out the proper interactions is going to be a
pretty major undertaking.
##########
File path: pkg/cache/context.go
##########
@@ -365,19 +366,68 @@ func (ctx *Context) bindPodVolumes(pod *v1.Pod) error {
zap.String("podName", pod.Name))
} else {
log.Logger().Info("Binding Pod Volumes",
zap.String("podName", pod.Name))
- boundClaims, claimsToBind, _, err :=
ctx.apiProvider.GetAPIs().VolumeBinder.GetPodVolumes(assumedPod)
+ boundClaims, claimsToBind, unboundClaimsImmediate, err
:= ctx.apiProvider.GetAPIs().VolumeBinder.GetPodVolumes(assumedPod)
if err != nil {
+ log.Logger().Error("Failed to get pod volumes",
+ zap.String("podName", assumedPod.Name),
+ zap.Error(err))
+ return err
+ }
+ if len(unboundClaimsImmediate) > 0 {
+ err = fmt.Errorf("pod %s has unbound immediate
claims", pod.Name)
+ log.Logger().Error("Pod has unbound immediate
claims",
+ zap.String("podName", assumedPod.Name),
+ zap.Error(err))
return err
}
node, err :=
ctx.schedulerCache.GetNodeInfo(assumedPod.Spec.NodeName)
if err != nil {
+ log.Logger().Error("Failed to get node info",
+ zap.String("podName", assumedPod.Name),
+ zap.String("nodeName",
assumedPod.Spec.NodeName),
+ zap.Error(err))
+ return err
+ }
+ volumes, reasons, err :=
ctx.apiProvider.GetAPIs().VolumeBinder.FindPodVolumes(assumedPod, boundClaims,
claimsToBind, node)
+ if err != nil {
+ log.Logger().Error("Failed to find pod volumes",
+ zap.String("podName", assumedPod.Name),
+ zap.String("nodeName",
assumedPod.Spec.NodeName),
+ zap.Int("claimsToBind",
len(claimsToBind)),
+ zap.Error(err))
return err
}
- volumes, _, err :=
ctx.apiProvider.GetAPIs().VolumeBinder.FindPodVolumes(assumedPod, boundClaims,
claimsToBind, node)
+ if len(reasons) > 0 {
+ sReasons := make([]string, 0)
+ for _, reason := range reasons {
+ sReasons = append(sReasons,
string(reason))
+ }
+ sReason := strings.Join(sReasons, ", ")
+ err = fmt.Errorf("pod %s has conflicting volume
claims: %s", pod.Name, sReason)
+ log.Logger().Error("Pod has conflicting volume
claims",
+ zap.String("podName", assumedPod.Name),
+ zap.String("nodeName",
assumedPod.Spec.NodeName),
+ zap.Int("claimsToBind",
len(claimsToBind)),
+ zap.Error(err))
+ return err
+ }
Review comment:
Exactly.
##########
File path: pkg/cache/context.go
##########
@@ -365,19 +366,68 @@ func (ctx *Context) bindPodVolumes(pod *v1.Pod) error {
zap.String("podName", pod.Name))
} else {
log.Logger().Info("Binding Pod Volumes",
zap.String("podName", pod.Name))
- boundClaims, claimsToBind, _, err :=
ctx.apiProvider.GetAPIs().VolumeBinder.GetPodVolumes(assumedPod)
+ boundClaims, claimsToBind, unboundClaimsImmediate, err
:= ctx.apiProvider.GetAPIs().VolumeBinder.GetPodVolumes(assumedPod)
if err != nil {
+ log.Logger().Error("Failed to get pod volumes",
+ zap.String("podName", assumedPod.Name),
+ zap.Error(err))
+ return err
+ }
+ if len(unboundClaimsImmediate) > 0 {
+ err = fmt.Errorf("pod %s has unbound immediate
claims", pod.Name)
+ log.Logger().Error("Pod has unbound immediate
claims",
+ zap.String("podName", assumedPod.Name),
+ zap.Error(err))
Review comment:
I agree, we probably need to test for this earlier. I'll work on trying
to get this working earlier as well. Your suggestion as to where makes sense --
I will investigate that.
##########
File path: pkg/cache/context.go
##########
@@ -365,19 +366,68 @@ func (ctx *Context) bindPodVolumes(pod *v1.Pod) error {
zap.String("podName", pod.Name))
} else {
log.Logger().Info("Binding Pod Volumes",
zap.String("podName", pod.Name))
- boundClaims, claimsToBind, _, err :=
ctx.apiProvider.GetAPIs().VolumeBinder.GetPodVolumes(assumedPod)
+ boundClaims, claimsToBind, unboundClaimsImmediate, err
:= ctx.apiProvider.GetAPIs().VolumeBinder.GetPodVolumes(assumedPod)
Review comment:
We probably do. I missed that one.
##########
File path: pkg/cache/context.go
##########
@@ -365,19 +366,68 @@ func (ctx *Context) bindPodVolumes(pod *v1.Pod) error {
zap.String("podName", pod.Name))
} else {
log.Logger().Info("Binding Pod Volumes",
zap.String("podName", pod.Name))
- boundClaims, claimsToBind, _, err :=
ctx.apiProvider.GetAPIs().VolumeBinder.GetPodVolumes(assumedPod)
+ boundClaims, claimsToBind, unboundClaimsImmediate, err
:= ctx.apiProvider.GetAPIs().VolumeBinder.GetPodVolumes(assumedPod)
if err != nil {
+ log.Logger().Error("Failed to get pod volumes",
+ zap.String("podName", assumedPod.Name),
+ zap.Error(err))
+ return err
+ }
+ if len(unboundClaimsImmediate) > 0 {
+ err = fmt.Errorf("pod %s has unbound immediate
claims", pod.Name)
+ log.Logger().Error("Pod has unbound immediate
claims",
+ zap.String("podName", assumedPod.Name),
+ zap.Error(err))
return err
}
node, err :=
ctx.schedulerCache.GetNodeInfo(assumedPod.Spec.NodeName)
if err != nil {
+ log.Logger().Error("Failed to get node info",
+ zap.String("podName", assumedPod.Name),
+ zap.String("nodeName",
assumedPod.Spec.NodeName),
+ zap.Error(err))
+ return err
+ }
+ volumes, reasons, err :=
ctx.apiProvider.GetAPIs().VolumeBinder.FindPodVolumes(assumedPod, boundClaims,
claimsToBind, node)
+ if err != nil {
+ log.Logger().Error("Failed to find pod volumes",
+ zap.String("podName", assumedPod.Name),
+ zap.String("nodeName",
assumedPod.Spec.NodeName),
+ zap.Int("claimsToBind",
len(claimsToBind)),
+ zap.Error(err))
return err
}
- volumes, _, err :=
ctx.apiProvider.GetAPIs().VolumeBinder.FindPodVolumes(assumedPod, boundClaims,
claimsToBind, node)
+ if len(reasons) > 0 {
+ sReasons := make([]string, 0)
+ for _, reason := range reasons {
+ sReasons = append(sReasons,
string(reason))
+ }
+ sReason := strings.Join(sReasons, ", ")
+ err = fmt.Errorf("pod %s has conflicting volume
claims: %s", pod.Name, sReason)
+ log.Logger().Error("Pod has conflicting volume
claims",
+ zap.String("podName", assumedPod.Name),
+ zap.String("nodeName",
assumedPod.Spec.NodeName),
+ zap.Int("claimsToBind",
len(claimsToBind)),
+ zap.Error(err))
+ return err
+ }
+ if volumes.StaticBindings == nil {
+ // convert nil to empty array
+ volumes.StaticBindings =
make([]*scheduling.BindingInfo, 0)
+ }
+ if volumes.DynamicProvisions == nil {
+ // convert nil to empty array
+ volumes.DynamicProvisions =
make([]*v1.PersistentVolumeClaim, 0)
+ }
Review comment:
> We need tests for this code path.
I don't disagree. However, currently, the entirety of volume binding is
disabled in test mode. Mocking out the proper interactions is going to be a
pretty major undertaking. Any ideas how best to proceed?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]