craigcondit commented on a change in pull request #359:
URL: 
https://github.com/apache/incubator-yunikorn-k8shim/pull/359#discussion_r791158077



##########
File path: pkg/cache/context.go
##########
@@ -365,19 +366,68 @@ func (ctx *Context) bindPodVolumes(pod *v1.Pod) error {
                                zap.String("podName", pod.Name))
                } else {
                        log.Logger().Info("Binding Pod Volumes", 
zap.String("podName", pod.Name))
-                       boundClaims, claimsToBind, _, err := 
ctx.apiProvider.GetAPIs().VolumeBinder.GetPodVolumes(assumedPod)
+                       boundClaims, claimsToBind, unboundClaimsImmediate, err 
:= ctx.apiProvider.GetAPIs().VolumeBinder.GetPodVolumes(assumedPod)
                        if err != nil {
+                               log.Logger().Error("Failed to get pod volumes",
+                                       zap.String("podName", assumedPod.Name),
+                                       zap.Error(err))
+                               return err
+                       }
+                       if len(unboundClaimsImmediate) > 0 {
+                               err = fmt.Errorf("pod %s has unbound immediate 
claims", pod.Name)
+                               log.Logger().Error("Pod has unbound immediate 
claims",
+                                       zap.String("podName", assumedPod.Name),
+                                       zap.Error(err))
                                return err
                        }
                        node, err := 
ctx.schedulerCache.GetNodeInfo(assumedPod.Spec.NodeName)
                        if err != nil {
+                               log.Logger().Error("Failed to get node info",
+                                       zap.String("podName", assumedPod.Name),
+                                       zap.String("nodeName", 
assumedPod.Spec.NodeName),
+                                       zap.Error(err))
+                               return err
+                       }
+                       volumes, reasons, err := 
ctx.apiProvider.GetAPIs().VolumeBinder.FindPodVolumes(assumedPod, boundClaims, 
claimsToBind, node)
+                       if err != nil {
+                               log.Logger().Error("Failed to find pod volumes",
+                                       zap.String("podName", assumedPod.Name),
+                                       zap.String("nodeName", 
assumedPod.Spec.NodeName),
+                                       zap.Int("claimsToBind", 
len(claimsToBind)),
+                                       zap.Error(err))
                                return err
                        }
-                       volumes, _, err := 
ctx.apiProvider.GetAPIs().VolumeBinder.FindPodVolumes(assumedPod, boundClaims, 
claimsToBind, node)
+                       if len(reasons) > 0 {
+                               sReasons := make([]string, 0)
+                               for _, reason := range reasons {
+                                       sReasons = append(sReasons, 
string(reason))
+                               }
+                               sReason := strings.Join(sReasons, ", ")
+                               err = fmt.Errorf("pod %s has conflicting volume 
claims: %s", pod.Name, sReason)
+                               log.Logger().Error("Pod has conflicting volume 
claims",
+                                       zap.String("podName", assumedPod.Name),
+                                       zap.String("nodeName", 
assumedPod.Spec.NodeName),
+                                       zap.Int("claimsToBind", 
len(claimsToBind)),
+                                       zap.Error(err))
+                               return err
+                       }
+                       if volumes.StaticBindings == nil {

Review comment:
       This is the actual fix (ensure these slices are empty rather than nil).

##########
File path: pkg/cache/context.go
##########
@@ -365,19 +366,68 @@ func (ctx *Context) bindPodVolumes(pod *v1.Pod) error {
                                zap.String("podName", pod.Name))
                } else {
                        log.Logger().Info("Binding Pod Volumes", 
zap.String("podName", pod.Name))
-                       boundClaims, claimsToBind, _, err := 
ctx.apiProvider.GetAPIs().VolumeBinder.GetPodVolumes(assumedPod)
+                       boundClaims, claimsToBind, unboundClaimsImmediate, err 
:= ctx.apiProvider.GetAPIs().VolumeBinder.GetPodVolumes(assumedPod)
                        if err != nil {
+                               log.Logger().Error("Failed to get pod volumes",
+                                       zap.String("podName", assumedPod.Name),
+                                       zap.Error(err))
+                               return err
+                       }
+                       if len(unboundClaimsImmediate) > 0 {
+                               err = fmt.Errorf("pod %s has unbound immediate 
claims", pod.Name)
+                               log.Logger().Error("Pod has unbound immediate 
claims",
+                                       zap.String("podName", assumedPod.Name),
+                                       zap.Error(err))
                                return err
                        }
                        node, err := 
ctx.schedulerCache.GetNodeInfo(assumedPod.Spec.NodeName)
                        if err != nil {
+                               log.Logger().Error("Failed to get node info",
+                                       zap.String("podName", assumedPod.Name),
+                                       zap.String("nodeName", 
assumedPod.Spec.NodeName),
+                                       zap.Error(err))
+                               return err
+                       }
+                       volumes, reasons, err := 
ctx.apiProvider.GetAPIs().VolumeBinder.FindPodVolumes(assumedPod, boundClaims, 
claimsToBind, node)
+                       if err != nil {
+                               log.Logger().Error("Failed to find pod volumes",
+                                       zap.String("podName", assumedPod.Name),
+                                       zap.String("nodeName", 
assumedPod.Spec.NodeName),
+                                       zap.Int("claimsToBind", 
len(claimsToBind)),
+                                       zap.Error(err))
                                return err
                        }
-                       volumes, _, err := 
ctx.apiProvider.GetAPIs().VolumeBinder.FindPodVolumes(assumedPod, boundClaims, 
claimsToBind, node)
+                       if len(reasons) > 0 {
+                               sReasons := make([]string, 0)
+                               for _, reason := range reasons {
+                                       sReasons = append(sReasons, 
string(reason))
+                               }
+                               sReason := strings.Join(sReasons, ", ")
+                               err = fmt.Errorf("pod %s has conflicting volume 
claims: %s", pod.Name, sReason)
+                               log.Logger().Error("Pod has conflicting volume 
claims",
+                                       zap.String("podName", assumedPod.Name),
+                                       zap.String("nodeName", 
assumedPod.Spec.NodeName),
+                                       zap.Int("claimsToBind", 
len(claimsToBind)),
+                                       zap.Error(err))
+                               return err
+                       }
+                       if volumes.StaticBindings == nil {
+                               // convert nil to empty array
+                               volumes.StaticBindings = 
make([]*scheduling.BindingInfo, 0)
+                       }
+                       if volumes.DynamicProvisions == nil {
+                               // convert nil to empty array
+                               volumes.DynamicProvisions = 
make([]*v1.PersistentVolumeClaim, 0)
+                       }

Review comment:
       This is the actual fix -- don't allow the slices within volumes to be 
nil.

##########
File path: pkg/cache/context.go
##########
@@ -365,19 +366,68 @@ func (ctx *Context) bindPodVolumes(pod *v1.Pod) error {
                                zap.String("podName", pod.Name))
                } else {
                        log.Logger().Info("Binding Pod Volumes", 
zap.String("podName", pod.Name))
-                       boundClaims, claimsToBind, _, err := 
ctx.apiProvider.GetAPIs().VolumeBinder.GetPodVolumes(assumedPod)
+                       boundClaims, claimsToBind, unboundClaimsImmediate, err 
:= ctx.apiProvider.GetAPIs().VolumeBinder.GetPodVolumes(assumedPod)
                        if err != nil {
+                               log.Logger().Error("Failed to get pod volumes",
+                                       zap.String("podName", assumedPod.Name),
+                                       zap.Error(err))
+                               return err
+                       }
+                       if len(unboundClaimsImmediate) > 0 {
+                               err = fmt.Errorf("pod %s has unbound immediate 
claims", pod.Name)
+                               log.Logger().Error("Pod has unbound immediate 
claims",
+                                       zap.String("podName", assumedPod.Name),
+                                       zap.Error(err))
                                return err
                        }

Review comment:
       This is based on upstream K8s; if we have unbound immediate claims, we 
can't satisfy this.

##########
File path: pkg/cache/context.go
##########
@@ -365,19 +366,68 @@ func (ctx *Context) bindPodVolumes(pod *v1.Pod) error {
                                zap.String("podName", pod.Name))
                } else {
                        log.Logger().Info("Binding Pod Volumes", 
zap.String("podName", pod.Name))
-                       boundClaims, claimsToBind, _, err := 
ctx.apiProvider.GetAPIs().VolumeBinder.GetPodVolumes(assumedPod)
+                       boundClaims, claimsToBind, unboundClaimsImmediate, err 
:= ctx.apiProvider.GetAPIs().VolumeBinder.GetPodVolumes(assumedPod)
                        if err != nil {
+                               log.Logger().Error("Failed to get pod volumes",
+                                       zap.String("podName", assumedPod.Name),
+                                       zap.Error(err))
+                               return err
+                       }
+                       if len(unboundClaimsImmediate) > 0 {
+                               err = fmt.Errorf("pod %s has unbound immediate 
claims", pod.Name)
+                               log.Logger().Error("Pod has unbound immediate 
claims",
+                                       zap.String("podName", assumedPod.Name),
+                                       zap.Error(err))
                                return err
                        }
                        node, err := 
ctx.schedulerCache.GetNodeInfo(assumedPod.Spec.NodeName)
                        if err != nil {
+                               log.Logger().Error("Failed to get node info",
+                                       zap.String("podName", assumedPod.Name),
+                                       zap.String("nodeName", 
assumedPod.Spec.NodeName),
+                                       zap.Error(err))
+                               return err
+                       }
+                       volumes, reasons, err := 
ctx.apiProvider.GetAPIs().VolumeBinder.FindPodVolumes(assumedPod, boundClaims, 
claimsToBind, node)
+                       if err != nil {
+                               log.Logger().Error("Failed to find pod volumes",
+                                       zap.String("podName", assumedPod.Name),
+                                       zap.String("nodeName", 
assumedPod.Spec.NodeName),
+                                       zap.Int("claimsToBind", 
len(claimsToBind)),
+                                       zap.Error(err))
                                return err
                        }
-                       volumes, _, err := 
ctx.apiProvider.GetAPIs().VolumeBinder.FindPodVolumes(assumedPod, boundClaims, 
claimsToBind, node)
+                       if len(reasons) > 0 {
+                               sReasons := make([]string, 0)
+                               for _, reason := range reasons {
+                                       sReasons = append(sReasons, 
string(reason))
+                               }
+                               sReason := strings.Join(sReasons, ", ")
+                               err = fmt.Errorf("pod %s has conflicting volume 
claims: %s", pod.Name, sReason)
+                               log.Logger().Error("Pod has conflicting volume 
claims",
+                                       zap.String("podName", assumedPod.Name),
+                                       zap.String("nodeName", 
assumedPod.Spec.NodeName),
+                                       zap.Int("claimsToBind", 
len(claimsToBind)),
+                                       zap.Error(err))
+                               return err
+                       }

Review comment:
       This was added based on upstream K8s. If any reasons are returned from 
FindPodVolumes, there are conflicts and we must fail.

##########
File path: pkg/cache/context.go
##########
@@ -365,19 +366,68 @@ func (ctx *Context) bindPodVolumes(pod *v1.Pod) error {
                                zap.String("podName", pod.Name))
                } else {
                        log.Logger().Info("Binding Pod Volumes", 
zap.String("podName", pod.Name))
-                       boundClaims, claimsToBind, _, err := 
ctx.apiProvider.GetAPIs().VolumeBinder.GetPodVolumes(assumedPod)
+                       boundClaims, claimsToBind, unboundClaimsImmediate, err 
:= ctx.apiProvider.GetAPIs().VolumeBinder.GetPodVolumes(assumedPod)
                        if err != nil {
+                               log.Logger().Error("Failed to get pod volumes",
+                                       zap.String("podName", assumedPod.Name),
+                                       zap.Error(err))
+                               return err
+                       }
+                       if len(unboundClaimsImmediate) > 0 {
+                               err = fmt.Errorf("pod %s has unbound immediate 
claims", pod.Name)
+                               log.Logger().Error("Pod has unbound immediate 
claims",
+                                       zap.String("podName", assumedPod.Name),
+                                       zap.Error(err))
                                return err
                        }
                        node, err := 
ctx.schedulerCache.GetNodeInfo(assumedPod.Spec.NodeName)
                        if err != nil {
+                               log.Logger().Error("Failed to get node info",
+                                       zap.String("podName", assumedPod.Name),
+                                       zap.String("nodeName", 
assumedPod.Spec.NodeName),
+                                       zap.Error(err))
+                               return err
+                       }
+                       volumes, reasons, err := 
ctx.apiProvider.GetAPIs().VolumeBinder.FindPodVolumes(assumedPod, boundClaims, 
claimsToBind, node)
+                       if err != nil {
+                               log.Logger().Error("Failed to find pod volumes",
+                                       zap.String("podName", assumedPod.Name),
+                                       zap.String("nodeName", 
assumedPod.Spec.NodeName),
+                                       zap.Int("claimsToBind", 
len(claimsToBind)),
+                                       zap.Error(err))
                                return err
                        }
-                       volumes, _, err := 
ctx.apiProvider.GetAPIs().VolumeBinder.FindPodVolumes(assumedPod, boundClaims, 
claimsToBind, node)
+                       if len(reasons) > 0 {
+                               sReasons := make([]string, 0)
+                               for _, reason := range reasons {
+                                       sReasons = append(sReasons, 
string(reason))
+                               }
+                               sReason := strings.Join(sReasons, ", ")
+                               err = fmt.Errorf("pod %s has conflicting volume 
claims: %s", pod.Name, sReason)
+                               log.Logger().Error("Pod has conflicting volume 
claims",
+                                       zap.String("podName", assumedPod.Name),
+                                       zap.String("nodeName", 
assumedPod.Spec.NodeName),
+                                       zap.Int("claimsToBind", 
len(claimsToBind)),
+                                       zap.Error(err))
+                               return err
+                       }
+                       if volumes.StaticBindings == nil {
+                               // convert nil to empty array
+                               volumes.StaticBindings = 
make([]*scheduling.BindingInfo, 0)
+                       }
+                       if volumes.DynamicProvisions == nil {
+                               // convert nil to empty array
+                               volumes.DynamicProvisions = 
make([]*v1.PersistentVolumeClaim, 0)
+                       }

Review comment:
       > We need tests for this code path.
   
   I don't disagree. However, currently, the entirety of volume binding is 
disabled in test mode. Mocking out the proper interactions is going to be a 
pretty major undertaking.

##########
File path: pkg/cache/context.go
##########
@@ -365,19 +366,68 @@ func (ctx *Context) bindPodVolumes(pod *v1.Pod) error {
                                zap.String("podName", pod.Name))
                } else {
                        log.Logger().Info("Binding Pod Volumes", 
zap.String("podName", pod.Name))
-                       boundClaims, claimsToBind, _, err := 
ctx.apiProvider.GetAPIs().VolumeBinder.GetPodVolumes(assumedPod)
+                       boundClaims, claimsToBind, unboundClaimsImmediate, err 
:= ctx.apiProvider.GetAPIs().VolumeBinder.GetPodVolumes(assumedPod)
                        if err != nil {
+                               log.Logger().Error("Failed to get pod volumes",
+                                       zap.String("podName", assumedPod.Name),
+                                       zap.Error(err))
+                               return err
+                       }
+                       if len(unboundClaimsImmediate) > 0 {
+                               err = fmt.Errorf("pod %s has unbound immediate 
claims", pod.Name)
+                               log.Logger().Error("Pod has unbound immediate 
claims",
+                                       zap.String("podName", assumedPod.Name),
+                                       zap.Error(err))
                                return err
                        }
                        node, err := 
ctx.schedulerCache.GetNodeInfo(assumedPod.Spec.NodeName)
                        if err != nil {
+                               log.Logger().Error("Failed to get node info",
+                                       zap.String("podName", assumedPod.Name),
+                                       zap.String("nodeName", 
assumedPod.Spec.NodeName),
+                                       zap.Error(err))
+                               return err
+                       }
+                       volumes, reasons, err := 
ctx.apiProvider.GetAPIs().VolumeBinder.FindPodVolumes(assumedPod, boundClaims, 
claimsToBind, node)
+                       if err != nil {
+                               log.Logger().Error("Failed to find pod volumes",
+                                       zap.String("podName", assumedPod.Name),
+                                       zap.String("nodeName", 
assumedPod.Spec.NodeName),
+                                       zap.Int("claimsToBind", 
len(claimsToBind)),
+                                       zap.Error(err))
                                return err
                        }
-                       volumes, _, err := 
ctx.apiProvider.GetAPIs().VolumeBinder.FindPodVolumes(assumedPod, boundClaims, 
claimsToBind, node)
+                       if len(reasons) > 0 {
+                               sReasons := make([]string, 0)
+                               for _, reason := range reasons {
+                                       sReasons = append(sReasons, 
string(reason))
+                               }
+                               sReason := strings.Join(sReasons, ", ")
+                               err = fmt.Errorf("pod %s has conflicting volume 
claims: %s", pod.Name, sReason)
+                               log.Logger().Error("Pod has conflicting volume 
claims",
+                                       zap.String("podName", assumedPod.Name),
+                                       zap.String("nodeName", 
assumedPod.Spec.NodeName),
+                                       zap.Int("claimsToBind", 
len(claimsToBind)),
+                                       zap.Error(err))
+                               return err
+                       }

Review comment:
       Exactly.

##########
File path: pkg/cache/context.go
##########
@@ -365,19 +366,68 @@ func (ctx *Context) bindPodVolumes(pod *v1.Pod) error {
                                zap.String("podName", pod.Name))
                } else {
                        log.Logger().Info("Binding Pod Volumes", 
zap.String("podName", pod.Name))
-                       boundClaims, claimsToBind, _, err := 
ctx.apiProvider.GetAPIs().VolumeBinder.GetPodVolumes(assumedPod)
+                       boundClaims, claimsToBind, unboundClaimsImmediate, err 
:= ctx.apiProvider.GetAPIs().VolumeBinder.GetPodVolumes(assumedPod)
                        if err != nil {
+                               log.Logger().Error("Failed to get pod volumes",
+                                       zap.String("podName", assumedPod.Name),
+                                       zap.Error(err))
+                               return err
+                       }
+                       if len(unboundClaimsImmediate) > 0 {
+                               err = fmt.Errorf("pod %s has unbound immediate 
claims", pod.Name)
+                               log.Logger().Error("Pod has unbound immediate 
claims",
+                                       zap.String("podName", assumedPod.Name),
+                                       zap.Error(err))

Review comment:
       I agree, we probably need to test for this earlier. I'll work on trying 
to get this working earlier as well. Your suggestion as to where makes sense -- 
I will investigate that.

##########
File path: pkg/cache/context.go
##########
@@ -365,19 +366,68 @@ func (ctx *Context) bindPodVolumes(pod *v1.Pod) error {
                                zap.String("podName", pod.Name))
                } else {
                        log.Logger().Info("Binding Pod Volumes", 
zap.String("podName", pod.Name))
-                       boundClaims, claimsToBind, _, err := 
ctx.apiProvider.GetAPIs().VolumeBinder.GetPodVolumes(assumedPod)
+                       boundClaims, claimsToBind, unboundClaimsImmediate, err 
:= ctx.apiProvider.GetAPIs().VolumeBinder.GetPodVolumes(assumedPod)

Review comment:
       We probably do. I missed that one.

##########
File path: pkg/cache/context.go
##########
@@ -365,19 +366,68 @@ func (ctx *Context) bindPodVolumes(pod *v1.Pod) error {
                                zap.String("podName", pod.Name))
                } else {
                        log.Logger().Info("Binding Pod Volumes", 
zap.String("podName", pod.Name))
-                       boundClaims, claimsToBind, _, err := 
ctx.apiProvider.GetAPIs().VolumeBinder.GetPodVolumes(assumedPod)
+                       boundClaims, claimsToBind, unboundClaimsImmediate, err 
:= ctx.apiProvider.GetAPIs().VolumeBinder.GetPodVolumes(assumedPod)
                        if err != nil {
+                               log.Logger().Error("Failed to get pod volumes",
+                                       zap.String("podName", assumedPod.Name),
+                                       zap.Error(err))
+                               return err
+                       }
+                       if len(unboundClaimsImmediate) > 0 {
+                               err = fmt.Errorf("pod %s has unbound immediate 
claims", pod.Name)
+                               log.Logger().Error("Pod has unbound immediate 
claims",
+                                       zap.String("podName", assumedPod.Name),
+                                       zap.Error(err))
                                return err
                        }
                        node, err := 
ctx.schedulerCache.GetNodeInfo(assumedPod.Spec.NodeName)
                        if err != nil {
+                               log.Logger().Error("Failed to get node info",
+                                       zap.String("podName", assumedPod.Name),
+                                       zap.String("nodeName", 
assumedPod.Spec.NodeName),
+                                       zap.Error(err))
+                               return err
+                       }
+                       volumes, reasons, err := 
ctx.apiProvider.GetAPIs().VolumeBinder.FindPodVolumes(assumedPod, boundClaims, 
claimsToBind, node)
+                       if err != nil {
+                               log.Logger().Error("Failed to find pod volumes",
+                                       zap.String("podName", assumedPod.Name),
+                                       zap.String("nodeName", 
assumedPod.Spec.NodeName),
+                                       zap.Int("claimsToBind", 
len(claimsToBind)),
+                                       zap.Error(err))
                                return err
                        }
-                       volumes, _, err := 
ctx.apiProvider.GetAPIs().VolumeBinder.FindPodVolumes(assumedPod, boundClaims, 
claimsToBind, node)
+                       if len(reasons) > 0 {
+                               sReasons := make([]string, 0)
+                               for _, reason := range reasons {
+                                       sReasons = append(sReasons, 
string(reason))
+                               }
+                               sReason := strings.Join(sReasons, ", ")
+                               err = fmt.Errorf("pod %s has conflicting volume 
claims: %s", pod.Name, sReason)
+                               log.Logger().Error("Pod has conflicting volume 
claims",
+                                       zap.String("podName", assumedPod.Name),
+                                       zap.String("nodeName", 
assumedPod.Spec.NodeName),
+                                       zap.Int("claimsToBind", 
len(claimsToBind)),
+                                       zap.Error(err))
+                               return err
+                       }
+                       if volumes.StaticBindings == nil {
+                               // convert nil to empty array
+                               volumes.StaticBindings = 
make([]*scheduling.BindingInfo, 0)
+                       }
+                       if volumes.DynamicProvisions == nil {
+                               // convert nil to empty array
+                               volumes.DynamicProvisions = 
make([]*v1.PersistentVolumeClaim, 0)
+                       }

Review comment:
       > We need tests for this code path.
   
   I don't disagree. However, currently, the entirety of volume binding is 
disabled in test mode. Mocking out the proper interactions is going to be a 
pretty major undertaking. Any ideas how best to proceed?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to