[GitHub] [incubator-yunikorn-k8shim] pbacsko commented on a change in pull request #336: [YUNIKORN-971] Implement YuniKorn as a Kubernetes scheduler plugin.
pbacsko commented on a change in pull request #336: URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/336#discussion_r787549205 ## File path: pkg/schedulerplugin/scheduler_plugin.go ## @@ -0,0 +1,209 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package schedulerplugin + +import ( + "context" + "fmt" + "sync" + + "go.uber.org/zap" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/informers" + "k8s.io/kubernetes/pkg/scheduler/framework" + + "github.com/apache/incubator-yunikorn-core/pkg/entrypoint" + "github.com/apache/incubator-yunikorn-k8shim/pkg/cache" + "github.com/apache/incubator-yunikorn-k8shim/pkg/common/events" + "github.com/apache/incubator-yunikorn-k8shim/pkg/common/utils" + "github.com/apache/incubator-yunikorn-k8shim/pkg/conf" + "github.com/apache/incubator-yunikorn-k8shim/pkg/dispatcher" + "github.com/apache/incubator-yunikorn-k8shim/pkg/log" + pluginconf "github.com/apache/incubator-yunikorn-k8shim/pkg/schedulerplugin/conf" + "github.com/apache/incubator-yunikorn-k8shim/pkg/shim" + "github.com/apache/incubator-yunikorn-scheduler-interface/lib/go/api" +) + +const ( + SchedulerPluginName = "YuniKornPlugin" +) + +// YuniKornSchedulerPlugin provides an implementation of several lifecycle methods of the Kubernetes scheduling framework: +// https://kubernetes.io/docs/concepts/scheduling-eviction/scheduling-framework/ +// +// PreFilter: Used to notify the default scheduler that a particular pod has been marked ready for scheduling by YuniKorn +// +// Filter: Used to notify the default scheduler that a particular pod/node combination is ready to be scheduled +// +// PostBind: Used to notify YuniKorn that a pod has been scheduled successfully +// +// Pod Allocations: +// +// The YuniKorn scheduler is always running in the background, making decisions about which pods to allocate to which +// nodes. When a decision is made, that pod is marked as having a "pending" pod allocation, which means YuniKorn has +// allocated the pod, but the default scheduler (via the plugin interface) has not yet been notified. +// +// Once PreFilter() has been called for a particular pod, that allocation is marked as "in progress" meaning it has been +// communicated to the default scheduler, but has not yet been fulfilled. +// +// Finally, in PostBind(), the allocation is removed as we now know that the pod has been allocated successfully. +// If a pending or in-progress allocation is detected for a pod in PreFilter(), we remove the allocation and force the +// pod to be rescheduled, as this means the prior allocation could not be completed successfully by the default +// scheduler for some reason. +type YuniKornSchedulerPlugin struct { Review comment: Solid comment, it's great to have this here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@yunikorn.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [incubator-yunikorn-k8shim] pbacsko commented on a change in pull request #336: [YUNIKORN-971] Implement YuniKorn as a Kubernetes scheduler plugin.
pbacsko commented on a change in pull request #336: URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/336#discussion_r787548184 ## File path: pkg/schedulerplugin/scheduler_plugin.go ## @@ -0,0 +1,209 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package schedulerplugin + +import ( + "context" + "fmt" + "sync" + + "go.uber.org/zap" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/informers" + "k8s.io/kubernetes/pkg/scheduler/framework" + + "github.com/apache/incubator-yunikorn-core/pkg/entrypoint" + "github.com/apache/incubator-yunikorn-k8shim/pkg/cache" + "github.com/apache/incubator-yunikorn-k8shim/pkg/common/events" + "github.com/apache/incubator-yunikorn-k8shim/pkg/common/utils" + "github.com/apache/incubator-yunikorn-k8shim/pkg/conf" + "github.com/apache/incubator-yunikorn-k8shim/pkg/dispatcher" + "github.com/apache/incubator-yunikorn-k8shim/pkg/log" + pluginconf "github.com/apache/incubator-yunikorn-k8shim/pkg/schedulerplugin/conf" + "github.com/apache/incubator-yunikorn-k8shim/pkg/shim" + "github.com/apache/incubator-yunikorn-scheduler-interface/lib/go/api" +) + +const ( + SchedulerPluginName = "YuniKornPlugin" +) + +// YuniKornSchedulerPlugin provides an implementation of several lifecycle methods of the Kubernetes scheduling framework: +// https://kubernetes.io/docs/concepts/scheduling-eviction/scheduling-framework/ +// +// PreFilter: Used to notify the default scheduler that a particular pod has been marked ready for scheduling by YuniKorn +// +// Filter: Used to notify the default scheduler that a particular pod/node combination is ready to be scheduled +// +// PostBind: Used to notify YuniKorn that a pod has been scheduled successfully +// +// Pod Allocations: +// +// The YuniKorn scheduler is always running in the background, making decisions about which pods to allocate to which +// nodes. When a decision is made, that pod is marked as having a "pending" pod allocation, which means YuniKorn has +// allocated the pod, but the default scheduler (via the plugin interface) has not yet been notified. +// +// Once PreFilter() has been called for a particular pod, that allocation is marked as "in progress" meaning it has been +// communicated to the default scheduler, but has not yet been fulfilled. +// +// Finally, in PostBind(), the allocation is removed as we now know that the pod has been allocated successfully. +// If a pending or in-progress allocation is detected for a pod in PreFilter(), we remove the allocation and force the +// pod to be rescheduled, as this means the prior allocation could not be completed successfully by the default +// scheduler for some reason. +type YuniKornSchedulerPlugin struct { + sync.RWMutex + context *cache.Context +} + +// ensure all required interfaces are implemented +var _ framework.PreFilterPlugin = &YuniKornSchedulerPlugin{} +var _ framework.FilterPlugin = &YuniKornSchedulerPlugin{} +var _ framework.PostBindPlugin = &YuniKornSchedulerPlugin{} + +// Name returns the name of the plugin +func (sp *YuniKornSchedulerPlugin) Name() string { + return SchedulerPluginName +} + +// PreFilter is used to release pods to scheduler +func (sp *YuniKornSchedulerPlugin) PreFilter(_ context.Context, _ *framework.CycleState, pod *v1.Pod) *framework.Status { + log.Logger().Debug("PreFilter check", + zap.String("pod", fmt.Sprintf("%s/%s", pod.Namespace, pod.Name))) + + // we don't process pods without appID defined + appID, err := utils.GetApplicationIDFromPod(pod) + if err != nil { + log.Logger().Info(fmt.Sprintf("Skipping pod %s/%s in the prefilter plugin because no applicationID is defined", + pod.Namespace, pod.Name)) + return framework.NewStatus(framework.Success, "Deferring to default scheduler") + } + + if app := sp.context.GetApplication(appID); app != nil { + if task, err := app.GetTask(string(pod.UID)); err == nil { + _, ok := sp.context.GetInProgressPodAllocation(string(pod.UID)) +
[GitHub] [incubator-yunikorn-k8shim] pbacsko commented on a change in pull request #336: [YUNIKORN-971] Implement YuniKorn as a Kubernetes scheduler plugin.
pbacsko commented on a change in pull request #336: URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/336#discussion_r787539000 ## File path: pkg/cache/external/scheduler_cache.go ## @@ -36,23 +36,23 @@ import ( // nodes are cached in the form of de-scheduler nodeInfo, instead of re-creating all nodes info from scratch, // we replicate nodes info from de-scheduler, in order to re-use predicates functions. type SchedulerCache struct { - // node name to NodeInfo map - nodesMap map[string]*framework.NodeInfo - podsMap map[string]*v1.Pod - // this is a map of assumed pods, - // the value indicates if a pod volumes are all bound - assumedPods map[string]bool - locksync.RWMutex - // client APIs - clients *client.Clients + nodesMap map[string]*framework.NodeInfo // node name to NodeInfo map + podsMap map[string]*v1.Pod + assumedPods map[string]bool // map of assumed pods, value indicates if pod volumes are all bound + pendingAllocationsmap[string]string // map of pod to node ID, presence indicates a pending allocation for scheduler + inProgressAllocations map[string]string // map of pod to node ID, presence indicates an in-process allocation for scheduler Review comment: Thanks, this definitely helps to understand the code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@yunikorn.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [incubator-yunikorn-k8shim] pbacsko commented on a change in pull request #336: [YUNIKORN-971] Implement YuniKorn as a Kubernetes scheduler plugin.
pbacsko commented on a change in pull request #336: URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/336#discussion_r783059490 ## File path: pkg/cache/external/scheduler_cache.go ## @@ -132,6 +132,45 @@ func (cache *SchedulerCache) removeNode(node *v1.Node) error { return nil } +func (cache *SchedulerCache) AddPendingPodAllocation(podKey string, nodeID string) { + cache.lock.Lock() + defer cache.lock.Unlock() + delete(cache.inProgressAllocations, podKey) Review comment: Is this delete necessary? Under what circumstances can we have an entry? As I can see it, `inProgressAllocations` is populated from `StartPodAllocation()` but that already requires that pending has an entry which we just add in line 139. Am I missing something here? Is it a bug if we already have something in `inProgressAllocations` at this point for the same `podKey`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@yunikorn.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [incubator-yunikorn-k8shim] pbacsko commented on a change in pull request #336: [YUNIKORN-971] Implement YuniKorn as a Kubernetes scheduler plugin.
pbacsko commented on a change in pull request #336: URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/336#discussion_r783047016 ## File path: pkg/schedulerplugin/scheduler_plugin.go ## @@ -0,0 +1,192 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package schedulerplugin + +import ( + "context" + "fmt" + "sync" + + "go.uber.org/zap" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/informers" + "k8s.io/kubernetes/pkg/scheduler/framework" + + "github.com/apache/incubator-yunikorn-core/pkg/entrypoint" + "github.com/apache/incubator-yunikorn-k8shim/pkg/cache" + "github.com/apache/incubator-yunikorn-k8shim/pkg/common/events" + "github.com/apache/incubator-yunikorn-k8shim/pkg/common/utils" + "github.com/apache/incubator-yunikorn-k8shim/pkg/conf" + "github.com/apache/incubator-yunikorn-k8shim/pkg/dispatcher" + "github.com/apache/incubator-yunikorn-k8shim/pkg/log" + pluginconf "github.com/apache/incubator-yunikorn-k8shim/pkg/schedulerplugin/conf" + "github.com/apache/incubator-yunikorn-k8shim/pkg/shim" + "github.com/apache/incubator-yunikorn-scheduler-interface/lib/go/api" +) + +const ( + SchedulerPluginName = "YuniKornPlugin" +) + +type YuniKornSchedulerPlugin struct { + sync.RWMutex + context *cache.Context +} + +// ensure all required interfaces are implemented +var _ framework.PreFilterPlugin = &YuniKornSchedulerPlugin{} +var _ framework.FilterPlugin = &YuniKornSchedulerPlugin{} +var _ framework.PostBindPlugin = &YuniKornSchedulerPlugin{} + +// Plugin +func (sp *YuniKornSchedulerPlugin) Name() string { + return SchedulerPluginName +} + +// PreFilterPlugin: PreFilter is used to release pods to scheduler +func (sp *YuniKornSchedulerPlugin) PreFilter(_ context.Context, _ *framework.CycleState, pod *v1.Pod) *framework.Status { + log.Logger().Debug("PreFilter check", + zap.String("pod", fmt.Sprintf("%s/%s", pod.Namespace, pod.Name))) + + // we don't process pods without appID defined + appID, err := utils.GetApplicationIDFromPod(pod) + if err != nil { + log.Logger().Info(fmt.Sprintf("Skipping pod %s/%s in the prefilter plugin because no applicationID is defined", + pod.Namespace, pod.Name)) + return framework.NewStatus(framework.Success, "Deferring to default scheduler") + } + + if app := sp.context.GetApplication(appID); app != nil { + if task, err := app.GetTask(string(pod.UID)); err == nil { + _, ok := sp.context.GetInProgressPodAllocation(string(pod.UID)) + if ok { + // pod must have failed scheduling, reject it and return unschedulable + log.Logger().Info("Task failed scheduling, marking as rejected", + zap.String("pod", fmt.Sprintf("%s/%s", pod.Namespace, pod.Name)), + zap.String("taskID", task.GetTaskID())) + sp.context.RemovePendingPodAllocation(string(pod.UID)) + dispatcher.Dispatch(cache.NewRejectTaskEvent(app.GetApplicationID(), task.GetTaskID(), + fmt.Sprintf("task %s rejected by scheduler", task.GetTaskID( + return framework.NewStatus(framework.Unschedulable, "Pod is not ready for scheduling") + } + + nodeID, ok := sp.context.GetPendingPodAllocation(string(pod.UID)) + if !ok { + nodeID = "" Review comment: Is this value used for anything? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@yunikorn.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [incubator-yunikorn-k8shim] pbacsko commented on a change in pull request #336: [YUNIKORN-971] Implement YuniKorn as a Kubernetes scheduler plugin.
pbacsko commented on a change in pull request #336: URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/336#discussion_r783039045 ## File path: pkg/cache/external/scheduler_cache.go ## @@ -132,6 +132,45 @@ func (cache *SchedulerCache) removeNode(node *v1.Node) error { return nil } +func (cache *SchedulerCache) AddPendingPodAllocation(podKey string, nodeID string) { + cache.lock.Lock() + defer cache.lock.Unlock() + delete(cache.inProgressAllocations, podKey) + cache.pendingAllocations[podKey] = nodeID +} + +func (cache *SchedulerCache) RemovePendingPodAllocation(podKey string) { + cache.lock.Lock() + defer cache.lock.Unlock() + delete(cache.pendingAllocations, podKey) + delete(cache.inProgressAllocations, podKey) +} + +func (cache *SchedulerCache) GetPendingPodAllocation(podKey string) (nodeID string, ok bool) { + cache.lock.RLock() + defer cache.lock.RUnlock() + res, ok := cache.pendingAllocations[podKey] + return res, ok +} + +func (cache *SchedulerCache) GetInProgressPodAllocation(podKey string) (nodeID string, ok bool) { + cache.lock.RLock() + defer cache.lock.RUnlock() + res, ok := cache.inProgressAllocations[podKey] + return res, ok +} + +func (cache *SchedulerCache) StartPodAllocation(podKey string, nodeID string) bool { + cache.lock.Lock() + defer cache.lock.Unlock() + expectedNodeID, ok := cache.pendingAllocations[podKey] + if ok && expectedNodeID == nodeID { Review comment: Ok, from what I understand `expectedNodeID` comes from YK itself, and `nodeID` from the default scheduler. Questions: * How often do these two mismatch? * What happens if we return `false` here? * Can YK and the def. scheduler end up repeatedly allocating to different nodes? Another thing is that do we even need to consider YK's allocation? Because in `task.postTaskAllocated()`, the original code path calls `KubeClient.Bind()` but in plugin mode, we delegate this operation as I can see. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@yunikorn.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [incubator-yunikorn-k8shim] pbacsko commented on a change in pull request #336: [YUNIKORN-971] Implement YuniKorn as a Kubernetes scheduler plugin.
pbacsko commented on a change in pull request #336: URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/336#discussion_r782996743 ## File path: pkg/cache/external/scheduler_cache.go ## @@ -36,23 +36,23 @@ import ( // nodes are cached in the form of de-scheduler nodeInfo, instead of re-creating all nodes info from scratch, // we replicate nodes info from de-scheduler, in order to re-use predicates functions. type SchedulerCache struct { - // node name to NodeInfo map - nodesMap map[string]*framework.NodeInfo - podsMap map[string]*v1.Pod - // this is a map of assumed pods, - // the value indicates if a pod volumes are all bound - assumedPods map[string]bool - locksync.RWMutex - // client APIs - clients *client.Clients + nodesMap map[string]*framework.NodeInfo // node name to NodeInfo map + podsMap map[string]*v1.Pod + assumedPods map[string]bool // map of assumed pods, value indicates if pod volumes are all bound + pendingAllocationsmap[string]string // map of pod to node ID, presence indicates a pending allocation for scheduler + inProgressAllocations map[string]string // map of pod to node ID, presence indicates an in-process allocation for scheduler Review comment: We need some extra comment which explains what is the difference between a "pending" and an "in progress" allocation. Just by looking at the code, it's not straightforward which comes first and what is the lifecycle of the entries here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@yunikorn.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [incubator-yunikorn-k8shim] pbacsko commented on a change in pull request #336: [YUNIKORN-971] Implement YuniKorn as a Kubernetes scheduler plugin.
pbacsko commented on a change in pull request #336: URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/336#discussion_r783005657 ## File path: pkg/cache/task.go ## @@ -440,6 +460,23 @@ func (task *Task) beforeTaskAllocated(event *fsm.Event) { } func (task *Task) postTaskBound(event *fsm.Event) { + if task.pluginMode { + // when the pod is scheduling by yunikorn, it is moved to the default-scheduler's + // unschedulable queue, if nothing changes, the pod will be staying in the unschedulable + // queue for unschedulableQTimeInterval long (default 1 minute). hence, we are updating + // the pod status explicitly, when there is a status change, the default scheduler will + // move the pod back to the active queue immediately. + podCopy := task.pod.DeepCopy() Review comment: Can we reason about the performance effects of this copy? If we do this all the time (and that's what seems to be happening), we end up creating extra objects which affects GC time. Maybe not a big deal, just something to think about. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@yunikorn.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [incubator-yunikorn-k8shim] pbacsko commented on a change in pull request #336: [YUNIKORN-971] Implement YuniKorn as a Kubernetes scheduler plugin.
pbacsko commented on a change in pull request #336: URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/336#discussion_r782996743 ## File path: pkg/cache/external/scheduler_cache.go ## @@ -36,23 +36,23 @@ import ( // nodes are cached in the form of de-scheduler nodeInfo, instead of re-creating all nodes info from scratch, // we replicate nodes info from de-scheduler, in order to re-use predicates functions. type SchedulerCache struct { - // node name to NodeInfo map - nodesMap map[string]*framework.NodeInfo - podsMap map[string]*v1.Pod - // this is a map of assumed pods, - // the value indicates if a pod volumes are all bound - assumedPods map[string]bool - locksync.RWMutex - // client APIs - clients *client.Clients + nodesMap map[string]*framework.NodeInfo // node name to NodeInfo map + podsMap map[string]*v1.Pod + assumedPods map[string]bool // map of assumed pods, value indicates if pod volumes are all bound + pendingAllocationsmap[string]string // map of pod to node ID, presence indicates a pending allocation for scheduler + inProgressAllocations map[string]string // map of pod to node ID, presence indicates an in-process allocation for scheduler Review comment: We need some extra comment which explains what is the difference between a "pending" and an "in progress" allocation. Just by looking at the code, it's not straightforward which comes first and what is the lifecycle of the pods. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@yunikorn.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org