[GitHub] [incubator-yunikorn-core] yangwwei commented on a change in pull request #145: [YUNIKORN-99] Add state based app sorting
yangwwei commented on a change in pull request #145: URL: https://github.com/apache/incubator-yunikorn-core/pull/145#discussion_r428475700 ## File path: pkg/scheduler/sorters.go ## @@ -47,39 +39,101 @@ func sortQueue(queues []*SchedulingQueue, sortType SortType) { return comp < 0 }) } + metrics.GetSchedulerMetrics().ObserveQueueSortingLatency(sortingStart) } -func sortApplications(apps []*SchedulingApplication, sortType SortType, globalResource *resources.Resource) { - // TODO add latency metric +func sortApplications(apps map[string]*SchedulingApplication, sortType policies.SortPolicy, globalResource *resources.Resource) []*SchedulingApplication { + sortingStart := time.Now() + var sortedApps []*SchedulingApplication switch sortType { - case FairSortPolicy: + case policies.FairSortPolicy: + sortedApps = filterOnPendingResources(apps) // Sort by usage - sort.SliceStable(apps, func(i, j int) bool { - l := apps[i] - r := apps[j] + sort.SliceStable(sortedApps, func(i, j int) bool { + l := sortedApps[i] + r := sortedApps[j] return resources.CompUsageRatio(l.getAssumeAllocated(), r.getAssumeAllocated(), globalResource) < 0 }) - case FifoSortPolicy: + case policies.FifoSortPolicy: + sortedApps = filterOnPendingResources(apps) // Sort by submission time oldest first - sort.SliceStable(apps, func(i, j int) bool { - l := apps[i] - r := apps[j] + sort.SliceStable(sortedApps, func(i, j int) bool { + l := sortedApps[i] + r := sortedApps[j] return l.ApplicationInfo.SubmissionTime < r.ApplicationInfo.SubmissionTime }) + case policies.StateAwarePolicy: + sortedApps = stateAwareFilter(apps) + // Sort by submission time oldest first + sort.SliceStable(sortedApps, func(i, j int) bool { + l := sortedApps[i] + r := sortedApps[j] + return l.ApplicationInfo.SubmissionTime < r.ApplicationInfo.SubmissionTime + }) + } + metrics.GetSchedulerMetrics().ObserveAppSortingLatency(sortingStart) + return sortedApps +} + +func filterOnPendingResources(apps map[string]*SchedulingApplication) []*SchedulingApplication { + filteredApps := make([]*SchedulingApplication, 0) + for _, app := range apps { + // Only look at app when pending-res > 0 + if resources.StrictlyGreaterThanZero(app.GetPendingResource()) { + filteredApps = append(filteredApps, app) + } + } + return filteredApps +} + +// This filter only allows one (1) application with a state that is not running in the list of candidates. +// The preference is a state of Starting. If we can not find an app with a starting state we will use an app +// with an Accepted state. However if there is an app with a Starting state even with no pending resource +// requests, no Accepted apps can be scheduled. An app in New state does not have any asks and can never be +// scheduled. +func stateAwareFilter(apps map[string]*SchedulingApplication) []*SchedulingApplication { Review comment: based on the discussion. Let's group un-named pods into one single app per namespace. this can resolve the issue. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [incubator-yunikorn-core] yangwwei commented on a change in pull request #145: [YUNIKORN-99] Add state based app sorting
yangwwei commented on a change in pull request #145: URL: https://github.com/apache/incubator-yunikorn-core/pull/145#discussion_r428228741 ## File path: pkg/scheduler/sorters.go ## @@ -47,39 +39,101 @@ func sortQueue(queues []*SchedulingQueue, sortType SortType) { return comp < 0 }) } + metrics.GetSchedulerMetrics().ObserveQueueSortingLatency(sortingStart) } -func sortApplications(apps []*SchedulingApplication, sortType SortType, globalResource *resources.Resource) { - // TODO add latency metric +func sortApplications(apps map[string]*SchedulingApplication, sortType policies.SortPolicy, globalResource *resources.Resource) []*SchedulingApplication { + sortingStart := time.Now() + var sortedApps []*SchedulingApplication switch sortType { - case FairSortPolicy: + case policies.FairSortPolicy: + sortedApps = filterOnPendingResources(apps) // Sort by usage - sort.SliceStable(apps, func(i, j int) bool { - l := apps[i] - r := apps[j] + sort.SliceStable(sortedApps, func(i, j int) bool { + l := sortedApps[i] + r := sortedApps[j] return resources.CompUsageRatio(l.getAssumeAllocated(), r.getAssumeAllocated(), globalResource) < 0 }) - case FifoSortPolicy: + case policies.FifoSortPolicy: + sortedApps = filterOnPendingResources(apps) // Sort by submission time oldest first - sort.SliceStable(apps, func(i, j int) bool { - l := apps[i] - r := apps[j] + sort.SliceStable(sortedApps, func(i, j int) bool { + l := sortedApps[i] + r := sortedApps[j] return l.ApplicationInfo.SubmissionTime < r.ApplicationInfo.SubmissionTime }) + case policies.StateAwarePolicy: + sortedApps = stateAwareFilter(apps) + // Sort by submission time oldest first + sort.SliceStable(sortedApps, func(i, j int) bool { + l := sortedApps[i] + r := sortedApps[j] + return l.ApplicationInfo.SubmissionTime < r.ApplicationInfo.SubmissionTime + }) + } + metrics.GetSchedulerMetrics().ObserveAppSortingLatency(sortingStart) + return sortedApps +} + +func filterOnPendingResources(apps map[string]*SchedulingApplication) []*SchedulingApplication { + filteredApps := make([]*SchedulingApplication, 0) + for _, app := range apps { + // Only look at app when pending-res > 0 + if resources.StrictlyGreaterThanZero(app.GetPendingResource()) { + filteredApps = append(filteredApps, app) + } + } + return filteredApps +} + +// This filter only allows one (1) application with a state that is not running in the list of candidates. +// The preference is a state of Starting. If we can not find an app with a starting state we will use an app +// with an Accepted state. However if there is an app with a Starting state even with no pending resource +// requests, no Accepted apps can be scheduled. An app in New state does not have any asks and can never be +// scheduled. +func stateAwareFilter(apps map[string]*SchedulingApplication) []*SchedulingApplication { Review comment: do we have any other way to resolve this problem? this is a blocker to me This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [incubator-yunikorn-core] yangwwei commented on a change in pull request #145: [YUNIKORN-99] Add state based app sorting
yangwwei commented on a change in pull request #145: URL: https://github.com/apache/incubator-yunikorn-core/pull/145#discussion_r427778852 ## File path: pkg/scheduler/sorters.go ## @@ -47,39 +39,101 @@ func sortQueue(queues []*SchedulingQueue, sortType SortType) { return comp < 0 }) } + metrics.GetSchedulerMetrics().ObserveQueueSortingLatency(sortingStart) } -func sortApplications(apps []*SchedulingApplication, sortType SortType, globalResource *resources.Resource) { - // TODO add latency metric +func sortApplications(apps map[string]*SchedulingApplication, sortType policies.SortPolicy, globalResource *resources.Resource) []*SchedulingApplication { + sortingStart := time.Now() + var sortedApps []*SchedulingApplication switch sortType { - case FairSortPolicy: + case policies.FairSortPolicy: + sortedApps = filterOnPendingResources(apps) // Sort by usage - sort.SliceStable(apps, func(i, j int) bool { - l := apps[i] - r := apps[j] + sort.SliceStable(sortedApps, func(i, j int) bool { + l := sortedApps[i] + r := sortedApps[j] return resources.CompUsageRatio(l.getAssumeAllocated(), r.getAssumeAllocated(), globalResource) < 0 }) - case FifoSortPolicy: + case policies.FifoSortPolicy: + sortedApps = filterOnPendingResources(apps) // Sort by submission time oldest first - sort.SliceStable(apps, func(i, j int) bool { - l := apps[i] - r := apps[j] + sort.SliceStable(sortedApps, func(i, j int) bool { + l := sortedApps[i] + r := sortedApps[j] return l.ApplicationInfo.SubmissionTime < r.ApplicationInfo.SubmissionTime }) + case policies.StateAwarePolicy: + sortedApps = stateAwareFilter(apps) + // Sort by submission time oldest first + sort.SliceStable(sortedApps, func(i, j int) bool { + l := sortedApps[i] + r := sortedApps[j] + return l.ApplicationInfo.SubmissionTime < r.ApplicationInfo.SubmissionTime + }) + } + metrics.GetSchedulerMetrics().ObserveAppSortingLatency(sortingStart) + return sortedApps +} + +func filterOnPendingResources(apps map[string]*SchedulingApplication) []*SchedulingApplication { + filteredApps := make([]*SchedulingApplication, 0) + for _, app := range apps { + // Only look at app when pending-res > 0 + if resources.StrictlyGreaterThanZero(app.GetPendingResource()) { + filteredApps = append(filteredApps, app) + } + } + return filteredApps +} + +// This filter only allows one (1) application with a state that is not running in the list of candidates. +// The preference is a state of Starting. If we can not find an app with a starting state we will use an app +// with an Accepted state. However if there is an app with a Starting state even with no pending resource +// requests, no Accepted apps can be scheduled. An app in New state does not have any asks and can never be +// scheduled. +func stateAwareFilter(apps map[string]*SchedulingApplication) []*SchedulingApplication { Review comment: Basically, I think the `Running` state should not be depending on the fact that there is a 2nd allocation that happened for the app. Instead, we should read the info from the app, each app can have a field, e.g `org.apache.yunikorn/minTaskNum` (default=1). When this is 1, then once 1 allocation is made, we can say the app is `Running`. Later on, we can leverage this field for FIFO based Gang scheduling. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [incubator-yunikorn-core] yangwwei commented on a change in pull request #145: [YUNIKORN-99] Add state based app sorting
yangwwei commented on a change in pull request #145: URL: https://github.com/apache/incubator-yunikorn-core/pull/145#discussion_r427774391 ## File path: pkg/cache/application_state.go ## @@ -31,76 +31,102 @@ import ( // -- // application events // -- -type ApplicationEvent int +type applicationEvent int const ( - AcceptApplication ApplicationEvent = iota - RejectApplication + AcceptApplication applicationEvent = iota + StartApplication RunApplication + WaitApplication + RejectApplication CompleteApplication KillApplication ) -func (ae ApplicationEvent) String() string { - return [...]string{"AcceptApplication", "RejectApplication", "RunApplication", "CompleteApplication", "KillApplication"}[ae] +func (ae applicationEvent) String() string { + return [...]string{"AcceptApplication", "StartApplication", "RunApplication", "WaitApplication", "RejectApplication", "CompleteApplication", "KillApplication"}[ae] } // -- // application states // -- -type ApplicationState int +type applicationState int const ( - New ApplicationState = iota + New applicationState = iota Accepted - Rejected + Starting Running + Waiting + Rejected Completed Killed Review comment: I found the meaning of these states is really confusing. Can you please add the comment here, and also in a doc. I think we need a design doc for this as well. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [incubator-yunikorn-core] yangwwei commented on a change in pull request #145: [YUNIKORN-99] Add state based app sorting
yangwwei commented on a change in pull request #145: URL: https://github.com/apache/incubator-yunikorn-core/pull/145#discussion_r427773603 ## File path: pkg/scheduler/policies/sorting_policy.go ## @@ -0,0 +1,51 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package policies + +import ( + "fmt" +) + +// Sort type for queues & apps. +type SortPolicy int + +const ( + FifoSortPolicy SortPolicy = iota // fair based on usage + FairSortPolicy // first in first out, submit time + StateAwarePolicy // only 1 app in starting state + UndefinedApp // not initialised or parsing failed Review comment: UndefinedApp -> Undefined This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [incubator-yunikorn-core] yangwwei commented on a change in pull request #145: [YUNIKORN-99] Add state based app sorting
yangwwei commented on a change in pull request #145: URL: https://github.com/apache/incubator-yunikorn-core/pull/145#discussion_r427770746 ## File path: pkg/scheduler/sorters.go ## @@ -47,39 +39,101 @@ func sortQueue(queues []*SchedulingQueue, sortType SortType) { return comp < 0 }) } + metrics.GetSchedulerMetrics().ObserveQueueSortingLatency(sortingStart) } -func sortApplications(apps []*SchedulingApplication, sortType SortType, globalResource *resources.Resource) { - // TODO add latency metric +func sortApplications(apps map[string]*SchedulingApplication, sortType policies.SortPolicy, globalResource *resources.Resource) []*SchedulingApplication { + sortingStart := time.Now() + var sortedApps []*SchedulingApplication switch sortType { - case FairSortPolicy: + case policies.FairSortPolicy: + sortedApps = filterOnPendingResources(apps) // Sort by usage - sort.SliceStable(apps, func(i, j int) bool { - l := apps[i] - r := apps[j] + sort.SliceStable(sortedApps, func(i, j int) bool { + l := sortedApps[i] + r := sortedApps[j] return resources.CompUsageRatio(l.getAssumeAllocated(), r.getAssumeAllocated(), globalResource) < 0 }) - case FifoSortPolicy: + case policies.FifoSortPolicy: + sortedApps = filterOnPendingResources(apps) // Sort by submission time oldest first - sort.SliceStable(apps, func(i, j int) bool { - l := apps[i] - r := apps[j] + sort.SliceStable(sortedApps, func(i, j int) bool { + l := sortedApps[i] + r := sortedApps[j] return l.ApplicationInfo.SubmissionTime < r.ApplicationInfo.SubmissionTime }) + case policies.StateAwarePolicy: + sortedApps = stateAwareFilter(apps) + // Sort by submission time oldest first + sort.SliceStable(sortedApps, func(i, j int) bool { + l := sortedApps[i] + r := sortedApps[j] + return l.ApplicationInfo.SubmissionTime < r.ApplicationInfo.SubmissionTime + }) + } + metrics.GetSchedulerMetrics().ObserveAppSortingLatency(sortingStart) + return sortedApps +} + +func filterOnPendingResources(apps map[string]*SchedulingApplication) []*SchedulingApplication { + filteredApps := make([]*SchedulingApplication, 0) + for _, app := range apps { + // Only look at app when pending-res > 0 + if resources.StrictlyGreaterThanZero(app.GetPendingResource()) { + filteredApps = append(filteredApps, app) + } + } + return filteredApps +} + +// This filter only allows one (1) application with a state that is not running in the list of candidates. +// The preference is a state of Starting. If we can not find an app with a starting state we will use an app +// with an Accepted state. However if there is an app with a Starting state even with no pending resource +// requests, no Accepted apps can be scheduled. An app in New state does not have any asks and can never be +// scheduled. +func stateAwareFilter(apps map[string]*SchedulingApplication) []*SchedulingApplication { Review comment: one issue for this algorithm is If I have a bunch of apps, one is `Starting` and the rest is `Running`, then only the `Starting` app can be filtered out. If my `Starting` app happens to be a single pod app. Even the pod is running, it will need to wait for at least 5 minutes before other apps can be scheduled. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [incubator-yunikorn-core] yangwwei commented on a change in pull request #145: [YUNIKORN-99] Add state based app sorting
yangwwei commented on a change in pull request #145: URL: https://github.com/apache/incubator-yunikorn-core/pull/145#discussion_r427744799 ## File path: pkg/scheduler/sorters.go ## @@ -47,39 +39,101 @@ func sortQueue(queues []*SchedulingQueue, sortType SortType) { return comp < 0 }) } + metrics.GetSchedulerMetrics().ObserveQueueSortingLatency(sortingStart) } -func sortApplications(apps []*SchedulingApplication, sortType SortType, globalResource *resources.Resource) { - // TODO add latency metric +func sortApplications(apps map[string]*SchedulingApplication, sortType policies.SortPolicy, globalResource *resources.Resource) []*SchedulingApplication { + sortingStart := time.Now() + var sortedApps []*SchedulingApplication switch sortType { - case FairSortPolicy: + case policies.FairSortPolicy: + sortedApps = filterOnPendingResources(apps) // Sort by usage - sort.SliceStable(apps, func(i, j int) bool { - l := apps[i] - r := apps[j] + sort.SliceStable(sortedApps, func(i, j int) bool { + l := sortedApps[i] + r := sortedApps[j] return resources.CompUsageRatio(l.getAssumeAllocated(), r.getAssumeAllocated(), globalResource) < 0 }) - case FifoSortPolicy: + case policies.FifoSortPolicy: + sortedApps = filterOnPendingResources(apps) // Sort by submission time oldest first - sort.SliceStable(apps, func(i, j int) bool { - l := apps[i] - r := apps[j] + sort.SliceStable(sortedApps, func(i, j int) bool { + l := sortedApps[i] + r := sortedApps[j] return l.ApplicationInfo.SubmissionTime < r.ApplicationInfo.SubmissionTime }) + case policies.StateAwarePolicy: + sortedApps = stateAwareFilter(apps) + // Sort by submission time oldest first + sort.SliceStable(sortedApps, func(i, j int) bool { + l := sortedApps[i] + r := sortedApps[j] + return l.ApplicationInfo.SubmissionTime < r.ApplicationInfo.SubmissionTime + }) + } + metrics.GetSchedulerMetrics().ObserveAppSortingLatency(sortingStart) + return sortedApps +} + +func filterOnPendingResources(apps map[string]*SchedulingApplication) []*SchedulingApplication { + filteredApps := make([]*SchedulingApplication, 0) + for _, app := range apps { + // Only look at app when pending-res > 0 + if resources.StrictlyGreaterThanZero(app.GetPendingResource()) { + filteredApps = append(filteredApps, app) + } + } + return filteredApps +} + +// This filter only allows one (1) application with a state that is not running in the list of candidates. +// The preference is a state of Starting. If we can not find an app with a starting state we will use an app +// with an Accepted state. However if there is an app with a Starting state even with no pending resource +// requests, no Accepted apps can be scheduled. An app in New state does not have any asks and can never be +// scheduled. +func stateAwareFilter(apps map[string]*SchedulingApplication) []*SchedulingApplication { + filteredApps := make([]*SchedulingApplication, 0) + var acceptedApp *SchedulingApplication + var foundStarting bool + for _, app := range apps { + // found a starting app clear out the accepted app (independent of pending resources) + if app.isStarting() { + foundStarting = true + acceptedApp = nil + } + // Now just look at app when pending-res > 0 + if resources.StrictlyGreaterThanZero(app.GetPendingResource()) { + // filter accepted apps + if app.isAccepted() { + // check if we have not seen a starting app + // replace the currently tracked accepted app if this is an older one + if !foundStarting && (acceptedApp == nil || acceptedApp.ApplicationInfo.SubmissionTime > app.ApplicationInfo.SubmissionTime) { + acceptedApp = app + } + continue + } + // this is a running or starting app add it to the list + filteredApps = append(filteredApps, app) + } + } + // just add the accepted app if we need to: apps are not sorted yet Review comment:
[GitHub] [incubator-yunikorn-core] yangwwei commented on a change in pull request #145: [YUNIKORN-99] Add state based app sorting
yangwwei commented on a change in pull request #145: URL: https://github.com/apache/incubator-yunikorn-core/pull/145#discussion_r427744799 ## File path: pkg/scheduler/sorters.go ## @@ -47,39 +39,101 @@ func sortQueue(queues []*SchedulingQueue, sortType SortType) { return comp < 0 }) } + metrics.GetSchedulerMetrics().ObserveQueueSortingLatency(sortingStart) } -func sortApplications(apps []*SchedulingApplication, sortType SortType, globalResource *resources.Resource) { - // TODO add latency metric +func sortApplications(apps map[string]*SchedulingApplication, sortType policies.SortPolicy, globalResource *resources.Resource) []*SchedulingApplication { + sortingStart := time.Now() + var sortedApps []*SchedulingApplication switch sortType { - case FairSortPolicy: + case policies.FairSortPolicy: + sortedApps = filterOnPendingResources(apps) // Sort by usage - sort.SliceStable(apps, func(i, j int) bool { - l := apps[i] - r := apps[j] + sort.SliceStable(sortedApps, func(i, j int) bool { + l := sortedApps[i] + r := sortedApps[j] return resources.CompUsageRatio(l.getAssumeAllocated(), r.getAssumeAllocated(), globalResource) < 0 }) - case FifoSortPolicy: + case policies.FifoSortPolicy: + sortedApps = filterOnPendingResources(apps) // Sort by submission time oldest first - sort.SliceStable(apps, func(i, j int) bool { - l := apps[i] - r := apps[j] + sort.SliceStable(sortedApps, func(i, j int) bool { + l := sortedApps[i] + r := sortedApps[j] return l.ApplicationInfo.SubmissionTime < r.ApplicationInfo.SubmissionTime }) + case policies.StateAwarePolicy: + sortedApps = stateAwareFilter(apps) + // Sort by submission time oldest first + sort.SliceStable(sortedApps, func(i, j int) bool { + l := sortedApps[i] + r := sortedApps[j] + return l.ApplicationInfo.SubmissionTime < r.ApplicationInfo.SubmissionTime + }) + } + metrics.GetSchedulerMetrics().ObserveAppSortingLatency(sortingStart) + return sortedApps +} + +func filterOnPendingResources(apps map[string]*SchedulingApplication) []*SchedulingApplication { + filteredApps := make([]*SchedulingApplication, 0) + for _, app := range apps { + // Only look at app when pending-res > 0 + if resources.StrictlyGreaterThanZero(app.GetPendingResource()) { + filteredApps = append(filteredApps, app) + } + } + return filteredApps +} + +// This filter only allows one (1) application with a state that is not running in the list of candidates. +// The preference is a state of Starting. If we can not find an app with a starting state we will use an app +// with an Accepted state. However if there is an app with a Starting state even with no pending resource +// requests, no Accepted apps can be scheduled. An app in New state does not have any asks and can never be +// scheduled. +func stateAwareFilter(apps map[string]*SchedulingApplication) []*SchedulingApplication { + filteredApps := make([]*SchedulingApplication, 0) + var acceptedApp *SchedulingApplication + var foundStarting bool + for _, app := range apps { + // found a starting app clear out the accepted app (independent of pending resources) + if app.isStarting() { + foundStarting = true + acceptedApp = nil + } + // Now just look at app when pending-res > 0 + if resources.StrictlyGreaterThanZero(app.GetPendingResource()) { + // filter accepted apps + if app.isAccepted() { + // check if we have not seen a starting app + // replace the currently tracked accepted app if this is an older one + if !foundStarting && (acceptedApp == nil || acceptedApp.ApplicationInfo.SubmissionTime > app.ApplicationInfo.SubmissionTime) { + acceptedApp = app + } + continue + } + // this is a running or starting app add it to the list + filteredApps = append(filteredApps, app) + } + } + // just add the accepted app if we need to: apps are not sorted yet Review comment:
[GitHub] [incubator-yunikorn-core] yangwwei commented on a change in pull request #145: [YUNIKORN-99] Add state based app sorting
yangwwei commented on a change in pull request #145: URL: https://github.com/apache/incubator-yunikorn-core/pull/145#discussion_r427744799 ## File path: pkg/scheduler/sorters.go ## @@ -47,39 +39,101 @@ func sortQueue(queues []*SchedulingQueue, sortType SortType) { return comp < 0 }) } + metrics.GetSchedulerMetrics().ObserveQueueSortingLatency(sortingStart) } -func sortApplications(apps []*SchedulingApplication, sortType SortType, globalResource *resources.Resource) { - // TODO add latency metric +func sortApplications(apps map[string]*SchedulingApplication, sortType policies.SortPolicy, globalResource *resources.Resource) []*SchedulingApplication { + sortingStart := time.Now() + var sortedApps []*SchedulingApplication switch sortType { - case FairSortPolicy: + case policies.FairSortPolicy: + sortedApps = filterOnPendingResources(apps) // Sort by usage - sort.SliceStable(apps, func(i, j int) bool { - l := apps[i] - r := apps[j] + sort.SliceStable(sortedApps, func(i, j int) bool { + l := sortedApps[i] + r := sortedApps[j] return resources.CompUsageRatio(l.getAssumeAllocated(), r.getAssumeAllocated(), globalResource) < 0 }) - case FifoSortPolicy: + case policies.FifoSortPolicy: + sortedApps = filterOnPendingResources(apps) // Sort by submission time oldest first - sort.SliceStable(apps, func(i, j int) bool { - l := apps[i] - r := apps[j] + sort.SliceStable(sortedApps, func(i, j int) bool { + l := sortedApps[i] + r := sortedApps[j] return l.ApplicationInfo.SubmissionTime < r.ApplicationInfo.SubmissionTime }) + case policies.StateAwarePolicy: + sortedApps = stateAwareFilter(apps) + // Sort by submission time oldest first + sort.SliceStable(sortedApps, func(i, j int) bool { + l := sortedApps[i] + r := sortedApps[j] + return l.ApplicationInfo.SubmissionTime < r.ApplicationInfo.SubmissionTime + }) + } + metrics.GetSchedulerMetrics().ObserveAppSortingLatency(sortingStart) + return sortedApps +} + +func filterOnPendingResources(apps map[string]*SchedulingApplication) []*SchedulingApplication { + filteredApps := make([]*SchedulingApplication, 0) + for _, app := range apps { + // Only look at app when pending-res > 0 + if resources.StrictlyGreaterThanZero(app.GetPendingResource()) { + filteredApps = append(filteredApps, app) + } + } + return filteredApps +} + +// This filter only allows one (1) application with a state that is not running in the list of candidates. +// The preference is a state of Starting. If we can not find an app with a starting state we will use an app +// with an Accepted state. However if there is an app with a Starting state even with no pending resource +// requests, no Accepted apps can be scheduled. An app in New state does not have any asks and can never be +// scheduled. +func stateAwareFilter(apps map[string]*SchedulingApplication) []*SchedulingApplication { + filteredApps := make([]*SchedulingApplication, 0) + var acceptedApp *SchedulingApplication + var foundStarting bool + for _, app := range apps { + // found a starting app clear out the accepted app (independent of pending resources) + if app.isStarting() { + foundStarting = true + acceptedApp = nil + } + // Now just look at app when pending-res > 0 + if resources.StrictlyGreaterThanZero(app.GetPendingResource()) { + // filter accepted apps + if app.isAccepted() { + // check if we have not seen a starting app + // replace the currently tracked accepted app if this is an older one + if !foundStarting && (acceptedApp == nil || acceptedApp.ApplicationInfo.SubmissionTime > app.ApplicationInfo.SubmissionTime) { + acceptedApp = app + } + continue + } + // this is a running or starting app add it to the list + filteredApps = append(filteredApps, app) + } + } + // just add the accepted app if we need to: apps are not sorted yet Review comment:
[GitHub] [incubator-yunikorn-core] yangwwei commented on a change in pull request #145: [YUNIKORN-99] Add state based app sorting
yangwwei commented on a change in pull request #145: URL: https://github.com/apache/incubator-yunikorn-core/pull/145#discussion_r427744368 ## File path: pkg/scheduler/sorters.go ## @@ -47,39 +39,101 @@ func sortQueue(queues []*SchedulingQueue, sortType SortType) { return comp < 0 }) } + metrics.GetSchedulerMetrics().ObserveQueueSortingLatency(sortingStart) } -func sortApplications(apps []*SchedulingApplication, sortType SortType, globalResource *resources.Resource) { - // TODO add latency metric +func sortApplications(apps map[string]*SchedulingApplication, sortType policies.SortPolicy, globalResource *resources.Resource) []*SchedulingApplication { + sortingStart := time.Now() + var sortedApps []*SchedulingApplication switch sortType { - case FairSortPolicy: + case policies.FairSortPolicy: + sortedApps = filterOnPendingResources(apps) // Sort by usage - sort.SliceStable(apps, func(i, j int) bool { - l := apps[i] - r := apps[j] + sort.SliceStable(sortedApps, func(i, j int) bool { + l := sortedApps[i] + r := sortedApps[j] return resources.CompUsageRatio(l.getAssumeAllocated(), r.getAssumeAllocated(), globalResource) < 0 }) - case FifoSortPolicy: + case policies.FifoSortPolicy: + sortedApps = filterOnPendingResources(apps) // Sort by submission time oldest first - sort.SliceStable(apps, func(i, j int) bool { - l := apps[i] - r := apps[j] + sort.SliceStable(sortedApps, func(i, j int) bool { + l := sortedApps[i] + r := sortedApps[j] return l.ApplicationInfo.SubmissionTime < r.ApplicationInfo.SubmissionTime }) + case policies.StateAwarePolicy: + sortedApps = stateAwareFilter(apps) + // Sort by submission time oldest first + sort.SliceStable(sortedApps, func(i, j int) bool { + l := sortedApps[i] + r := sortedApps[j] + return l.ApplicationInfo.SubmissionTime < r.ApplicationInfo.SubmissionTime + }) + } + metrics.GetSchedulerMetrics().ObserveAppSortingLatency(sortingStart) + return sortedApps +} + +func filterOnPendingResources(apps map[string]*SchedulingApplication) []*SchedulingApplication { + filteredApps := make([]*SchedulingApplication, 0) + for _, app := range apps { + // Only look at app when pending-res > 0 + if resources.StrictlyGreaterThanZero(app.GetPendingResource()) { + filteredApps = append(filteredApps, app) + } + } + return filteredApps +} + +// This filter only allows one (1) application with a state that is not running in the list of candidates. +// The preference is a state of Starting. If we can not find an app with a starting state we will use an app +// with an Accepted state. However if there is an app with a Starting state even with no pending resource +// requests, no Accepted apps can be scheduled. An app in New state does not have any asks and can never be +// scheduled. +func stateAwareFilter(apps map[string]*SchedulingApplication) []*SchedulingApplication { Review comment: we need UT for this function This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [incubator-yunikorn-core] yangwwei commented on a change in pull request #145: [YUNIKORN-99] Add state based app sorting
yangwwei commented on a change in pull request #145: URL: https://github.com/apache/incubator-yunikorn-core/pull/145#discussion_r42773 ## File path: pkg/scheduler/scheduling_application.go ## @@ -633,3 +668,44 @@ func (sa *SchedulingApplication) recoverOnNode(node *SchedulingNode, ask *schedu zap.Error(err)) } } + +// Application status methods reflecting the underlying app object state +// link back to the underlying app object to prevent out of sync states +func (sa *SchedulingApplication) isAccepted() bool { + return sa.ApplicationInfo.IsAccepted() +} + +func (sa *SchedulingApplication) isStarting() bool { + return sa.ApplicationInfo.IsStarting() +} + +func (sa *SchedulingApplication) isNew() bool { + return sa.ApplicationInfo.IsNew() +} + +func (sa *SchedulingApplication) isWaiting() bool { + return sa.ApplicationInfo.IsWaiting() +} + +// Move the app state to running after allocation has been recovered. +// Since we do not add allocations in the normal way states will not change during recovery. +// There could also be multiple nodes that recover the app and +// This moves via starting directly to running. +func (sa *SchedulingApplication) finishRecovery() { + // no need to do anything if we are already running + if sa.ApplicationInfo.IsRunning() { + return + } + // this is the first recovered allocation: move to starting this cannot fail + if sa.ApplicationInfo.IsAccepted() { + // ignore the errors explicitly and marked as nolint + //nolint: errcheck + _ = sa.ApplicationInfo.HandleApplicationEvent(cache.StartApplication) + _ = sa.ApplicationInfo.HandleApplicationEvent(cache.RunApplication) + } + // log unexpected state + if !sa.ApplicationInfo.IsRunning() { + log.Logger().Warn("State of recovered app is not the expected RUNNING state", + zap.String("state", sa.ApplicationInfo.GetApplicationState())) + } +} Review comment: Similarly, can we handle a `FinishRecoveryEvent` in the state machine? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [incubator-yunikorn-core] yangwwei commented on a change in pull request #145: [YUNIKORN-99] Add state based app sorting
yangwwei commented on a change in pull request #145: URL: https://github.com/apache/incubator-yunikorn-core/pull/145#discussion_r427731619 ## File path: pkg/scheduler/scheduling_application.go ## @@ -183,6 +183,23 @@ func (sa *SchedulingApplication) removeAllocationAsk(allocKey string) int { delete(sa.requests, allocKey) } } + // Check if we need to change state based on the ask removal: + // 1) if pending is zero (no more asks left) + // 2) if allocation is zero (nothing is running) + // Change the state to waiting + if resources.IsZero(sa.pending) && resources.IsZero(sa.GetAllocatedResource()) { + oldState := sa.ApplicationInfo.GetApplicationState() + if err := sa.ApplicationInfo.HandleApplicationEvent(cache.WaitApplication); err != nil { + log.Logger().Warn("Application state not changed to Waiting while removing ask(s)", + zap.String("oldState", oldState), + zap.Int("releaseAllocs", toRelease), + zap.Error(err)) + } else { + log.Logger().Debug("Application state change to Waiting while removing ask(s)", + zap.String("oldState", oldState), + zap.Int("releaseAllocs", toRelease)) Review comment: dup logging? we already did the logging for each time when app state changes This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [incubator-yunikorn-core] yangwwei commented on a change in pull request #145: [YUNIKORN-99] Add state based app sorting
yangwwei commented on a change in pull request #145: URL: https://github.com/apache/incubator-yunikorn-core/pull/145#discussion_r427730285 ## File path: pkg/scheduler/scheduler.go ## @@ -407,12 +417,6 @@ func (s *Scheduler) processApplicationUpdateEvent(ev *schedulerevent.SchedulerAp acceptedApps = append(acceptedApps, &si.AcceptedApplication{ ApplicationID: app.ApplicationID, }) - // app is accepted by scheduler - err = app.HandleApplicationEvent(cache.AcceptApplication) - if err != nil { - log.Logger().Debug("cache event handling error returned", - zap.Error(err)) - } Review comment: The Accepted state means the app is accepted by the state (as the opposite to Rejected). This is a proper place for transiting the app to `Accepted`. Why do we want to move this to another place? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [incubator-yunikorn-core] yangwwei commented on a change in pull request #145: [YUNIKORN-99] Add state based app sorting
yangwwei commented on a change in pull request #145: URL: https://github.com/apache/incubator-yunikorn-core/pull/145#discussion_r427721780 ## File path: pkg/scheduler/scheduling_application.go ## @@ -207,10 +224,28 @@ func (sa *SchedulingApplication) addAllocationAsk(ask *schedulingAllocationAsk) oldAskResource = resources.Multiply(oldAsk.AllocatedResource, int64(oldAsk.getPendingAskRepeat())) } - delta.SubFrom(oldAskResource) + // Check if we need to change state based on the ask added, there are two cases: + // 1) first ask added on a new app: state is New + // 2) all asks and allocation have been removed: state is Waiting + // Accept the app and get it scheduling (again) + if sa.isNew() { + if err := sa.ApplicationInfo.HandleApplicationEvent(cache.AcceptApplication); err != nil { + log.Logger().Debug("Application state change failed while adding first ask", + zap.String("currentState", sa.ApplicationInfo.GetApplicationState()), + zap.Error(err)) + } + } + if sa.isWaiting() { + if err := sa.ApplicationInfo.HandleApplicationEvent(cache.RunApplication); err != nil { + log.Logger().Debug("Application state change failed while adding new ask", + zap.String("currentState", sa.ApplicationInfo.GetApplicationState()), + zap.Error(err)) + } + } Review comment: Similar to the comments I had for the `addAllocation`, I think we can achieve this by leveraging the state machine. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [incubator-yunikorn-core] yangwwei commented on a change in pull request #145: [YUNIKORN-99] Add state based app sorting
yangwwei commented on a change in pull request #145: URL: https://github.com/apache/incubator-yunikorn-core/pull/145#discussion_r427720947 ## File path: pkg/cache/application_info.go ## @@ -73,45 +81,117 @@ func (ai *ApplicationInfo) GetAllAllocations() []*AllocationInfo { return allocations } -// Return the current state for the application. +// Return the current state or a checked specific state for the application. // The state machine handles the locking. func (ai *ApplicationInfo) GetApplicationState() string { return ai.stateMachine.Current() } +func (ai *ApplicationInfo) IsStarting() bool { + return ai.stateMachine.Is(Starting.String()) +} + +func (ai *ApplicationInfo) IsAccepted() bool { + return ai.stateMachine.Is(Accepted.String()) +} + +func (ai *ApplicationInfo) IsNew() bool { + return ai.stateMachine.Is(New.String()) +} + +func (ai *ApplicationInfo) IsRunning() bool { + return ai.stateMachine.Is(Running.String()) +} + +func (ai *ApplicationInfo) IsWaiting() bool { + return ai.stateMachine.Is(Waiting.String()) +} + // Handle the state event for the application. // The state machine handles the locking. -func (ai *ApplicationInfo) HandleApplicationEvent(event ApplicationEvent) error { - err := ai.stateMachine.Event(event.String(), ai.ApplicationID) +func (ai *ApplicationInfo) HandleApplicationEvent(event applicationEvent) error { + err := ai.stateMachine.Event(event.String(), ai) // handle the same state transition not nil error (limit of fsm). if err != nil && err.Error() == "no transition" { return nil } return err } +// Set the starting timer to make sure the application will not get stuck in a starting state too long. +// This prevents an app from not progressing to Running when it only has 1 allocation. +// Called when entering the Starting state by the state machine. +func (ai *ApplicationInfo) setStartingTimer() { + ai.Lock() + defer ai.Unlock() + + log.Logger().Debug("Application Starting state timer initiated", + zap.String("appID", ai.ApplicationID), + zap.Duration("timeout", startingTimeout)) + ai.stateTimer = time.AfterFunc(startingTimeout, ai.timeOutStarting) +} + +// Clear the starting timer. If the application has progressed out of the starting state we need to stop the +// timer and clean up. +// Called when leaving the Starting state by the state machine. +func (ai *ApplicationInfo) clearStartingTimer() { + ai.Lock() + defer ai.Unlock() + + ai.stateTimer.Stop() + ai.stateTimer = nil +} + +// In case of state aware scheduling we do not want to get stuck in starting as we might have an application that only +// requires one allocation. +// This will progress the state of the application from Starting to Running +func (ai *ApplicationInfo) timeOutStarting() { + // make sure we are still in the right state + // we could have been killed or something might have happened while waiting for a lock + if ai.IsStarting() { + log.Logger().Warn("Application in starting state timed out: auto progress", + zap.String("applicationID", ai.ApplicationID), + zap.String("state", ai.stateMachine.Current())) + + //nolint: errcheck + _ = ai.HandleApplicationEvent(RunApplication) + } +} + // Return the total allocated resources for the application. func (ai *ApplicationInfo) GetAllocatedResource() *resources.Resource { - ai.lock.RLock() - defer ai.lock.RUnlock() + ai.RLock() + defer ai.RUnlock() return ai.allocatedResource.Clone() } // Set the leaf queue the application runs in. Update the queue name also to match as this might be different from the // queue that was given when submitting the application. func (ai *ApplicationInfo) SetQueue(leaf *QueueInfo) { - ai.lock.Lock() - defer ai.lock.Unlock() + ai.Lock() + defer ai.Unlock() ai.leafQueue = leaf ai.QueueName = leaf.GetQueuePath() } // Add a new allocation to the application func (ai *ApplicationInfo) addAllocation(info *AllocationInfo) { - ai.lock.Lock() - defer ai.lock.Unlock() + // progress the state based on where we are, we cannot fail in this case + // do NOT change the order as that will break the state + // ignore the errors explicitly and marked as nolint + if ai.IsStarting() { + //nolint: errcheck + _ = ai.HandleApplicationEvent(RunApplication) + } + if ai.IsAccepted() { + //nolint: errcheck + _ = ai.HandleApplicationEvent(StartApplication) + } + // add the allocation + ai.Lock() + defer ai.Unlock() Review comment: Do we need these state checks? I think we should leverage the state m