[GitHub] [incubator-yunikorn-core] yangwwei commented on a change in pull request #145: [YUNIKORN-99] Add state based app sorting

2020-05-20 Thread GitBox


yangwwei commented on a change in pull request #145:
URL: 
https://github.com/apache/incubator-yunikorn-core/pull/145#discussion_r428475700



##
File path: pkg/scheduler/sorters.go
##
@@ -47,39 +39,101 @@ func sortQueue(queues []*SchedulingQueue, sortType 
SortType) {
return comp < 0
})
}
+   metrics.GetSchedulerMetrics().ObserveQueueSortingLatency(sortingStart)
 }
 
-func sortApplications(apps []*SchedulingApplication, sortType SortType, 
globalResource *resources.Resource) {
-   // TODO add latency metric
+func sortApplications(apps map[string]*SchedulingApplication, sortType 
policies.SortPolicy, globalResource *resources.Resource) 
[]*SchedulingApplication {
+   sortingStart := time.Now()
+   var sortedApps []*SchedulingApplication
switch sortType {
-   case FairSortPolicy:
+   case policies.FairSortPolicy:
+   sortedApps = filterOnPendingResources(apps)
// Sort by usage
-   sort.SliceStable(apps, func(i, j int) bool {
-   l := apps[i]
-   r := apps[j]
+   sort.SliceStable(sortedApps, func(i, j int) bool {
+   l := sortedApps[i]
+   r := sortedApps[j]
return resources.CompUsageRatio(l.getAssumeAllocated(), 
r.getAssumeAllocated(), globalResource) < 0
})
-   case FifoSortPolicy:
+   case policies.FifoSortPolicy:
+   sortedApps = filterOnPendingResources(apps)
// Sort by submission time oldest first
-   sort.SliceStable(apps, func(i, j int) bool {
-   l := apps[i]
-   r := apps[j]
+   sort.SliceStable(sortedApps, func(i, j int) bool {
+   l := sortedApps[i]
+   r := sortedApps[j]
return l.ApplicationInfo.SubmissionTime < 
r.ApplicationInfo.SubmissionTime
})
+   case policies.StateAwarePolicy:
+   sortedApps = stateAwareFilter(apps)
+   // Sort by submission time oldest first
+   sort.SliceStable(sortedApps, func(i, j int) bool {
+   l := sortedApps[i]
+   r := sortedApps[j]
+   return l.ApplicationInfo.SubmissionTime < 
r.ApplicationInfo.SubmissionTime
+   })
+   }
+   metrics.GetSchedulerMetrics().ObserveAppSortingLatency(sortingStart)
+   return sortedApps
+}
+
+func filterOnPendingResources(apps map[string]*SchedulingApplication) 
[]*SchedulingApplication {
+   filteredApps := make([]*SchedulingApplication, 0)
+   for _, app := range apps {
+   // Only look at app when pending-res > 0
+   if resources.StrictlyGreaterThanZero(app.GetPendingResource()) {
+   filteredApps = append(filteredApps, app)
+   }
+   }
+   return filteredApps
+}
+
+// This filter only allows one (1) application with a state that is not 
running in the list of candidates.
+// The preference is a state of Starting. If we can not find an app with a 
starting state we will use an app
+// with an Accepted state. However if there is an app with a Starting state 
even with no pending resource
+// requests, no Accepted apps can be scheduled. An app in New state does not 
have any asks and can never be
+// scheduled.
+func stateAwareFilter(apps map[string]*SchedulingApplication) 
[]*SchedulingApplication {

Review comment:
   based on the discussion. Let's group un-named pods into one single app 
per namespace. this can resolve the issue.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-yunikorn-core] yangwwei commented on a change in pull request #145: [YUNIKORN-99] Add state based app sorting

2020-05-20 Thread GitBox


yangwwei commented on a change in pull request #145:
URL: 
https://github.com/apache/incubator-yunikorn-core/pull/145#discussion_r428228741



##
File path: pkg/scheduler/sorters.go
##
@@ -47,39 +39,101 @@ func sortQueue(queues []*SchedulingQueue, sortType 
SortType) {
return comp < 0
})
}
+   metrics.GetSchedulerMetrics().ObserveQueueSortingLatency(sortingStart)
 }
 
-func sortApplications(apps []*SchedulingApplication, sortType SortType, 
globalResource *resources.Resource) {
-   // TODO add latency metric
+func sortApplications(apps map[string]*SchedulingApplication, sortType 
policies.SortPolicy, globalResource *resources.Resource) 
[]*SchedulingApplication {
+   sortingStart := time.Now()
+   var sortedApps []*SchedulingApplication
switch sortType {
-   case FairSortPolicy:
+   case policies.FairSortPolicy:
+   sortedApps = filterOnPendingResources(apps)
// Sort by usage
-   sort.SliceStable(apps, func(i, j int) bool {
-   l := apps[i]
-   r := apps[j]
+   sort.SliceStable(sortedApps, func(i, j int) bool {
+   l := sortedApps[i]
+   r := sortedApps[j]
return resources.CompUsageRatio(l.getAssumeAllocated(), 
r.getAssumeAllocated(), globalResource) < 0
})
-   case FifoSortPolicy:
+   case policies.FifoSortPolicy:
+   sortedApps = filterOnPendingResources(apps)
// Sort by submission time oldest first
-   sort.SliceStable(apps, func(i, j int) bool {
-   l := apps[i]
-   r := apps[j]
+   sort.SliceStable(sortedApps, func(i, j int) bool {
+   l := sortedApps[i]
+   r := sortedApps[j]
return l.ApplicationInfo.SubmissionTime < 
r.ApplicationInfo.SubmissionTime
})
+   case policies.StateAwarePolicy:
+   sortedApps = stateAwareFilter(apps)
+   // Sort by submission time oldest first
+   sort.SliceStable(sortedApps, func(i, j int) bool {
+   l := sortedApps[i]
+   r := sortedApps[j]
+   return l.ApplicationInfo.SubmissionTime < 
r.ApplicationInfo.SubmissionTime
+   })
+   }
+   metrics.GetSchedulerMetrics().ObserveAppSortingLatency(sortingStart)
+   return sortedApps
+}
+
+func filterOnPendingResources(apps map[string]*SchedulingApplication) 
[]*SchedulingApplication {
+   filteredApps := make([]*SchedulingApplication, 0)
+   for _, app := range apps {
+   // Only look at app when pending-res > 0
+   if resources.StrictlyGreaterThanZero(app.GetPendingResource()) {
+   filteredApps = append(filteredApps, app)
+   }
+   }
+   return filteredApps
+}
+
+// This filter only allows one (1) application with a state that is not 
running in the list of candidates.
+// The preference is a state of Starting. If we can not find an app with a 
starting state we will use an app
+// with an Accepted state. However if there is an app with a Starting state 
even with no pending resource
+// requests, no Accepted apps can be scheduled. An app in New state does not 
have any asks and can never be
+// scheduled.
+func stateAwareFilter(apps map[string]*SchedulingApplication) 
[]*SchedulingApplication {

Review comment:
   do we have any other way to resolve this problem? this is a blocker to me





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-yunikorn-core] yangwwei commented on a change in pull request #145: [YUNIKORN-99] Add state based app sorting

2020-05-19 Thread GitBox


yangwwei commented on a change in pull request #145:
URL: 
https://github.com/apache/incubator-yunikorn-core/pull/145#discussion_r427778852



##
File path: pkg/scheduler/sorters.go
##
@@ -47,39 +39,101 @@ func sortQueue(queues []*SchedulingQueue, sortType 
SortType) {
return comp < 0
})
}
+   metrics.GetSchedulerMetrics().ObserveQueueSortingLatency(sortingStart)
 }
 
-func sortApplications(apps []*SchedulingApplication, sortType SortType, 
globalResource *resources.Resource) {
-   // TODO add latency metric
+func sortApplications(apps map[string]*SchedulingApplication, sortType 
policies.SortPolicy, globalResource *resources.Resource) 
[]*SchedulingApplication {
+   sortingStart := time.Now()
+   var sortedApps []*SchedulingApplication
switch sortType {
-   case FairSortPolicy:
+   case policies.FairSortPolicy:
+   sortedApps = filterOnPendingResources(apps)
// Sort by usage
-   sort.SliceStable(apps, func(i, j int) bool {
-   l := apps[i]
-   r := apps[j]
+   sort.SliceStable(sortedApps, func(i, j int) bool {
+   l := sortedApps[i]
+   r := sortedApps[j]
return resources.CompUsageRatio(l.getAssumeAllocated(), 
r.getAssumeAllocated(), globalResource) < 0
})
-   case FifoSortPolicy:
+   case policies.FifoSortPolicy:
+   sortedApps = filterOnPendingResources(apps)
// Sort by submission time oldest first
-   sort.SliceStable(apps, func(i, j int) bool {
-   l := apps[i]
-   r := apps[j]
+   sort.SliceStable(sortedApps, func(i, j int) bool {
+   l := sortedApps[i]
+   r := sortedApps[j]
return l.ApplicationInfo.SubmissionTime < 
r.ApplicationInfo.SubmissionTime
})
+   case policies.StateAwarePolicy:
+   sortedApps = stateAwareFilter(apps)
+   // Sort by submission time oldest first
+   sort.SliceStable(sortedApps, func(i, j int) bool {
+   l := sortedApps[i]
+   r := sortedApps[j]
+   return l.ApplicationInfo.SubmissionTime < 
r.ApplicationInfo.SubmissionTime
+   })
+   }
+   metrics.GetSchedulerMetrics().ObserveAppSortingLatency(sortingStart)
+   return sortedApps
+}
+
+func filterOnPendingResources(apps map[string]*SchedulingApplication) 
[]*SchedulingApplication {
+   filteredApps := make([]*SchedulingApplication, 0)
+   for _, app := range apps {
+   // Only look at app when pending-res > 0
+   if resources.StrictlyGreaterThanZero(app.GetPendingResource()) {
+   filteredApps = append(filteredApps, app)
+   }
+   }
+   return filteredApps
+}
+
+// This filter only allows one (1) application with a state that is not 
running in the list of candidates.
+// The preference is a state of Starting. If we can not find an app with a 
starting state we will use an app
+// with an Accepted state. However if there is an app with a Starting state 
even with no pending resource
+// requests, no Accepted apps can be scheduled. An app in New state does not 
have any asks and can never be
+// scheduled.
+func stateAwareFilter(apps map[string]*SchedulingApplication) 
[]*SchedulingApplication {

Review comment:
   Basically, I think the `Running` state should not be depending on the 
fact that there is a 2nd allocation that happened for the app. Instead, we 
should read the info from the app, each app can have a field, e.g 
`org.apache.yunikorn/minTaskNum` (default=1). When this is 1, then once 1 
allocation is made, we can say the app is `Running`. Later on, we can leverage 
this field for FIFO based Gang scheduling. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-yunikorn-core] yangwwei commented on a change in pull request #145: [YUNIKORN-99] Add state based app sorting

2020-05-19 Thread GitBox


yangwwei commented on a change in pull request #145:
URL: 
https://github.com/apache/incubator-yunikorn-core/pull/145#discussion_r427774391



##
File path: pkg/cache/application_state.go
##
@@ -31,76 +31,102 @@ import (
 // --
 // application events
 // --
-type ApplicationEvent int
+type applicationEvent int
 
 const (
-   AcceptApplication ApplicationEvent = iota
-   RejectApplication
+   AcceptApplication applicationEvent = iota
+   StartApplication
RunApplication
+   WaitApplication
+   RejectApplication
CompleteApplication
KillApplication
 )
 
-func (ae ApplicationEvent) String() string {
-   return [...]string{"AcceptApplication", "RejectApplication", 
"RunApplication", "CompleteApplication", "KillApplication"}[ae]
+func (ae applicationEvent) String() string {
+   return [...]string{"AcceptApplication", "StartApplication", 
"RunApplication", "WaitApplication", "RejectApplication", 
"CompleteApplication", "KillApplication"}[ae]
 }
 
 // --
 // application states
 // --
-type ApplicationState int
+type applicationState int
 
 const (
-   New ApplicationState = iota
+   New applicationState = iota
Accepted
-   Rejected
+   Starting
Running
+   Waiting
+   Rejected
Completed
Killed

Review comment:
   I found the meaning of these states is really confusing.
   Can you please add the comment here, and also in a doc. I think we need a 
design doc for this as well.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-yunikorn-core] yangwwei commented on a change in pull request #145: [YUNIKORN-99] Add state based app sorting

2020-05-19 Thread GitBox


yangwwei commented on a change in pull request #145:
URL: 
https://github.com/apache/incubator-yunikorn-core/pull/145#discussion_r427773603



##
File path: pkg/scheduler/policies/sorting_policy.go
##
@@ -0,0 +1,51 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package policies
+
+import (
+   "fmt"
+)
+
+// Sort type for queues & apps.
+type SortPolicy int
+
+const (
+   FifoSortPolicy   SortPolicy = iota // fair based on usage
+   FairSortPolicy // first in first out, submit time
+   StateAwarePolicy   // only 1 app in starting state
+   UndefinedApp   // not initialised or parsing failed

Review comment:
   UndefinedApp -> Undefined





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-yunikorn-core] yangwwei commented on a change in pull request #145: [YUNIKORN-99] Add state based app sorting

2020-05-19 Thread GitBox


yangwwei commented on a change in pull request #145:
URL: 
https://github.com/apache/incubator-yunikorn-core/pull/145#discussion_r427770746



##
File path: pkg/scheduler/sorters.go
##
@@ -47,39 +39,101 @@ func sortQueue(queues []*SchedulingQueue, sortType 
SortType) {
return comp < 0
})
}
+   metrics.GetSchedulerMetrics().ObserveQueueSortingLatency(sortingStart)
 }
 
-func sortApplications(apps []*SchedulingApplication, sortType SortType, 
globalResource *resources.Resource) {
-   // TODO add latency metric
+func sortApplications(apps map[string]*SchedulingApplication, sortType 
policies.SortPolicy, globalResource *resources.Resource) 
[]*SchedulingApplication {
+   sortingStart := time.Now()
+   var sortedApps []*SchedulingApplication
switch sortType {
-   case FairSortPolicy:
+   case policies.FairSortPolicy:
+   sortedApps = filterOnPendingResources(apps)
// Sort by usage
-   sort.SliceStable(apps, func(i, j int) bool {
-   l := apps[i]
-   r := apps[j]
+   sort.SliceStable(sortedApps, func(i, j int) bool {
+   l := sortedApps[i]
+   r := sortedApps[j]
return resources.CompUsageRatio(l.getAssumeAllocated(), 
r.getAssumeAllocated(), globalResource) < 0
})
-   case FifoSortPolicy:
+   case policies.FifoSortPolicy:
+   sortedApps = filterOnPendingResources(apps)
// Sort by submission time oldest first
-   sort.SliceStable(apps, func(i, j int) bool {
-   l := apps[i]
-   r := apps[j]
+   sort.SliceStable(sortedApps, func(i, j int) bool {
+   l := sortedApps[i]
+   r := sortedApps[j]
return l.ApplicationInfo.SubmissionTime < 
r.ApplicationInfo.SubmissionTime
})
+   case policies.StateAwarePolicy:
+   sortedApps = stateAwareFilter(apps)
+   // Sort by submission time oldest first
+   sort.SliceStable(sortedApps, func(i, j int) bool {
+   l := sortedApps[i]
+   r := sortedApps[j]
+   return l.ApplicationInfo.SubmissionTime < 
r.ApplicationInfo.SubmissionTime
+   })
+   }
+   metrics.GetSchedulerMetrics().ObserveAppSortingLatency(sortingStart)
+   return sortedApps
+}
+
+func filterOnPendingResources(apps map[string]*SchedulingApplication) 
[]*SchedulingApplication {
+   filteredApps := make([]*SchedulingApplication, 0)
+   for _, app := range apps {
+   // Only look at app when pending-res > 0
+   if resources.StrictlyGreaterThanZero(app.GetPendingResource()) {
+   filteredApps = append(filteredApps, app)
+   }
+   }
+   return filteredApps
+}
+
+// This filter only allows one (1) application with a state that is not 
running in the list of candidates.
+// The preference is a state of Starting. If we can not find an app with a 
starting state we will use an app
+// with an Accepted state. However if there is an app with a Starting state 
even with no pending resource
+// requests, no Accepted apps can be scheduled. An app in New state does not 
have any asks and can never be
+// scheduled.
+func stateAwareFilter(apps map[string]*SchedulingApplication) 
[]*SchedulingApplication {

Review comment:
   one issue for this algorithm is
   If I have a bunch of apps, one is `Starting` and the rest is `Running`, then 
only the `Starting` app can be filtered out. If my `Starting` app happens to be 
a single pod app. Even the pod is running,  it will need to wait for at least 5 
minutes before other apps can be scheduled. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-yunikorn-core] yangwwei commented on a change in pull request #145: [YUNIKORN-99] Add state based app sorting

2020-05-19 Thread GitBox


yangwwei commented on a change in pull request #145:
URL: 
https://github.com/apache/incubator-yunikorn-core/pull/145#discussion_r427744799



##
File path: pkg/scheduler/sorters.go
##
@@ -47,39 +39,101 @@ func sortQueue(queues []*SchedulingQueue, sortType 
SortType) {
return comp < 0
})
}
+   metrics.GetSchedulerMetrics().ObserveQueueSortingLatency(sortingStart)
 }
 
-func sortApplications(apps []*SchedulingApplication, sortType SortType, 
globalResource *resources.Resource) {
-   // TODO add latency metric
+func sortApplications(apps map[string]*SchedulingApplication, sortType 
policies.SortPolicy, globalResource *resources.Resource) 
[]*SchedulingApplication {
+   sortingStart := time.Now()
+   var sortedApps []*SchedulingApplication
switch sortType {
-   case FairSortPolicy:
+   case policies.FairSortPolicy:
+   sortedApps = filterOnPendingResources(apps)
// Sort by usage
-   sort.SliceStable(apps, func(i, j int) bool {
-   l := apps[i]
-   r := apps[j]
+   sort.SliceStable(sortedApps, func(i, j int) bool {
+   l := sortedApps[i]
+   r := sortedApps[j]
return resources.CompUsageRatio(l.getAssumeAllocated(), 
r.getAssumeAllocated(), globalResource) < 0
})
-   case FifoSortPolicy:
+   case policies.FifoSortPolicy:
+   sortedApps = filterOnPendingResources(apps)
// Sort by submission time oldest first
-   sort.SliceStable(apps, func(i, j int) bool {
-   l := apps[i]
-   r := apps[j]
+   sort.SliceStable(sortedApps, func(i, j int) bool {
+   l := sortedApps[i]
+   r := sortedApps[j]
return l.ApplicationInfo.SubmissionTime < 
r.ApplicationInfo.SubmissionTime
})
+   case policies.StateAwarePolicy:
+   sortedApps = stateAwareFilter(apps)
+   // Sort by submission time oldest first
+   sort.SliceStable(sortedApps, func(i, j int) bool {
+   l := sortedApps[i]
+   r := sortedApps[j]
+   return l.ApplicationInfo.SubmissionTime < 
r.ApplicationInfo.SubmissionTime
+   })
+   }
+   metrics.GetSchedulerMetrics().ObserveAppSortingLatency(sortingStart)
+   return sortedApps
+}
+
+func filterOnPendingResources(apps map[string]*SchedulingApplication) 
[]*SchedulingApplication {
+   filteredApps := make([]*SchedulingApplication, 0)
+   for _, app := range apps {
+   // Only look at app when pending-res > 0
+   if resources.StrictlyGreaterThanZero(app.GetPendingResource()) {
+   filteredApps = append(filteredApps, app)
+   }
+   }
+   return filteredApps
+}
+
+// This filter only allows one (1) application with a state that is not 
running in the list of candidates.
+// The preference is a state of Starting. If we can not find an app with a 
starting state we will use an app
+// with an Accepted state. However if there is an app with a Starting state 
even with no pending resource
+// requests, no Accepted apps can be scheduled. An app in New state does not 
have any asks and can never be
+// scheduled.
+func stateAwareFilter(apps map[string]*SchedulingApplication) 
[]*SchedulingApplication {
+   filteredApps := make([]*SchedulingApplication, 0)
+   var acceptedApp *SchedulingApplication
+   var foundStarting bool
+   for _, app := range apps {
+   // found a starting app clear out the accepted app (independent 
of pending resources)
+   if app.isStarting() {
+   foundStarting = true
+   acceptedApp = nil
+   }
+   // Now just look at app when pending-res > 0
+   if resources.StrictlyGreaterThanZero(app.GetPendingResource()) {
+   // filter accepted apps
+   if app.isAccepted() {
+   // check if we have not seen a starting app
+   // replace the currently tracked accepted app 
if this is an older one
+   if !foundStarting && (acceptedApp == nil || 
acceptedApp.ApplicationInfo.SubmissionTime > 
app.ApplicationInfo.SubmissionTime) {
+   acceptedApp = app
+   }
+   continue
+   }
+   // this is a running or starting app add it to the list
+   filteredApps = append(filteredApps, app)
+   }
+   }
+   // just add the accepted app if we need to: apps are not sorted yet

Review comment:
  

[GitHub] [incubator-yunikorn-core] yangwwei commented on a change in pull request #145: [YUNIKORN-99] Add state based app sorting

2020-05-19 Thread GitBox


yangwwei commented on a change in pull request #145:
URL: 
https://github.com/apache/incubator-yunikorn-core/pull/145#discussion_r427744799



##
File path: pkg/scheduler/sorters.go
##
@@ -47,39 +39,101 @@ func sortQueue(queues []*SchedulingQueue, sortType 
SortType) {
return comp < 0
})
}
+   metrics.GetSchedulerMetrics().ObserveQueueSortingLatency(sortingStart)
 }
 
-func sortApplications(apps []*SchedulingApplication, sortType SortType, 
globalResource *resources.Resource) {
-   // TODO add latency metric
+func sortApplications(apps map[string]*SchedulingApplication, sortType 
policies.SortPolicy, globalResource *resources.Resource) 
[]*SchedulingApplication {
+   sortingStart := time.Now()
+   var sortedApps []*SchedulingApplication
switch sortType {
-   case FairSortPolicy:
+   case policies.FairSortPolicy:
+   sortedApps = filterOnPendingResources(apps)
// Sort by usage
-   sort.SliceStable(apps, func(i, j int) bool {
-   l := apps[i]
-   r := apps[j]
+   sort.SliceStable(sortedApps, func(i, j int) bool {
+   l := sortedApps[i]
+   r := sortedApps[j]
return resources.CompUsageRatio(l.getAssumeAllocated(), 
r.getAssumeAllocated(), globalResource) < 0
})
-   case FifoSortPolicy:
+   case policies.FifoSortPolicy:
+   sortedApps = filterOnPendingResources(apps)
// Sort by submission time oldest first
-   sort.SliceStable(apps, func(i, j int) bool {
-   l := apps[i]
-   r := apps[j]
+   sort.SliceStable(sortedApps, func(i, j int) bool {
+   l := sortedApps[i]
+   r := sortedApps[j]
return l.ApplicationInfo.SubmissionTime < 
r.ApplicationInfo.SubmissionTime
})
+   case policies.StateAwarePolicy:
+   sortedApps = stateAwareFilter(apps)
+   // Sort by submission time oldest first
+   sort.SliceStable(sortedApps, func(i, j int) bool {
+   l := sortedApps[i]
+   r := sortedApps[j]
+   return l.ApplicationInfo.SubmissionTime < 
r.ApplicationInfo.SubmissionTime
+   })
+   }
+   metrics.GetSchedulerMetrics().ObserveAppSortingLatency(sortingStart)
+   return sortedApps
+}
+
+func filterOnPendingResources(apps map[string]*SchedulingApplication) 
[]*SchedulingApplication {
+   filteredApps := make([]*SchedulingApplication, 0)
+   for _, app := range apps {
+   // Only look at app when pending-res > 0
+   if resources.StrictlyGreaterThanZero(app.GetPendingResource()) {
+   filteredApps = append(filteredApps, app)
+   }
+   }
+   return filteredApps
+}
+
+// This filter only allows one (1) application with a state that is not 
running in the list of candidates.
+// The preference is a state of Starting. If we can not find an app with a 
starting state we will use an app
+// with an Accepted state. However if there is an app with a Starting state 
even with no pending resource
+// requests, no Accepted apps can be scheduled. An app in New state does not 
have any asks and can never be
+// scheduled.
+func stateAwareFilter(apps map[string]*SchedulingApplication) 
[]*SchedulingApplication {
+   filteredApps := make([]*SchedulingApplication, 0)
+   var acceptedApp *SchedulingApplication
+   var foundStarting bool
+   for _, app := range apps {
+   // found a starting app clear out the accepted app (independent 
of pending resources)
+   if app.isStarting() {
+   foundStarting = true
+   acceptedApp = nil
+   }
+   // Now just look at app when pending-res > 0
+   if resources.StrictlyGreaterThanZero(app.GetPendingResource()) {
+   // filter accepted apps
+   if app.isAccepted() {
+   // check if we have not seen a starting app
+   // replace the currently tracked accepted app 
if this is an older one
+   if !foundStarting && (acceptedApp == nil || 
acceptedApp.ApplicationInfo.SubmissionTime > 
app.ApplicationInfo.SubmissionTime) {
+   acceptedApp = app
+   }
+   continue
+   }
+   // this is a running or starting app add it to the list
+   filteredApps = append(filteredApps, app)
+   }
+   }
+   // just add the accepted app if we need to: apps are not sorted yet

Review comment:
  

[GitHub] [incubator-yunikorn-core] yangwwei commented on a change in pull request #145: [YUNIKORN-99] Add state based app sorting

2020-05-19 Thread GitBox


yangwwei commented on a change in pull request #145:
URL: 
https://github.com/apache/incubator-yunikorn-core/pull/145#discussion_r427744799



##
File path: pkg/scheduler/sorters.go
##
@@ -47,39 +39,101 @@ func sortQueue(queues []*SchedulingQueue, sortType 
SortType) {
return comp < 0
})
}
+   metrics.GetSchedulerMetrics().ObserveQueueSortingLatency(sortingStart)
 }
 
-func sortApplications(apps []*SchedulingApplication, sortType SortType, 
globalResource *resources.Resource) {
-   // TODO add latency metric
+func sortApplications(apps map[string]*SchedulingApplication, sortType 
policies.SortPolicy, globalResource *resources.Resource) 
[]*SchedulingApplication {
+   sortingStart := time.Now()
+   var sortedApps []*SchedulingApplication
switch sortType {
-   case FairSortPolicy:
+   case policies.FairSortPolicy:
+   sortedApps = filterOnPendingResources(apps)
// Sort by usage
-   sort.SliceStable(apps, func(i, j int) bool {
-   l := apps[i]
-   r := apps[j]
+   sort.SliceStable(sortedApps, func(i, j int) bool {
+   l := sortedApps[i]
+   r := sortedApps[j]
return resources.CompUsageRatio(l.getAssumeAllocated(), 
r.getAssumeAllocated(), globalResource) < 0
})
-   case FifoSortPolicy:
+   case policies.FifoSortPolicy:
+   sortedApps = filterOnPendingResources(apps)
// Sort by submission time oldest first
-   sort.SliceStable(apps, func(i, j int) bool {
-   l := apps[i]
-   r := apps[j]
+   sort.SliceStable(sortedApps, func(i, j int) bool {
+   l := sortedApps[i]
+   r := sortedApps[j]
return l.ApplicationInfo.SubmissionTime < 
r.ApplicationInfo.SubmissionTime
})
+   case policies.StateAwarePolicy:
+   sortedApps = stateAwareFilter(apps)
+   // Sort by submission time oldest first
+   sort.SliceStable(sortedApps, func(i, j int) bool {
+   l := sortedApps[i]
+   r := sortedApps[j]
+   return l.ApplicationInfo.SubmissionTime < 
r.ApplicationInfo.SubmissionTime
+   })
+   }
+   metrics.GetSchedulerMetrics().ObserveAppSortingLatency(sortingStart)
+   return sortedApps
+}
+
+func filterOnPendingResources(apps map[string]*SchedulingApplication) 
[]*SchedulingApplication {
+   filteredApps := make([]*SchedulingApplication, 0)
+   for _, app := range apps {
+   // Only look at app when pending-res > 0
+   if resources.StrictlyGreaterThanZero(app.GetPendingResource()) {
+   filteredApps = append(filteredApps, app)
+   }
+   }
+   return filteredApps
+}
+
+// This filter only allows one (1) application with a state that is not 
running in the list of candidates.
+// The preference is a state of Starting. If we can not find an app with a 
starting state we will use an app
+// with an Accepted state. However if there is an app with a Starting state 
even with no pending resource
+// requests, no Accepted apps can be scheduled. An app in New state does not 
have any asks and can never be
+// scheduled.
+func stateAwareFilter(apps map[string]*SchedulingApplication) 
[]*SchedulingApplication {
+   filteredApps := make([]*SchedulingApplication, 0)
+   var acceptedApp *SchedulingApplication
+   var foundStarting bool
+   for _, app := range apps {
+   // found a starting app clear out the accepted app (independent 
of pending resources)
+   if app.isStarting() {
+   foundStarting = true
+   acceptedApp = nil
+   }
+   // Now just look at app when pending-res > 0
+   if resources.StrictlyGreaterThanZero(app.GetPendingResource()) {
+   // filter accepted apps
+   if app.isAccepted() {
+   // check if we have not seen a starting app
+   // replace the currently tracked accepted app 
if this is an older one
+   if !foundStarting && (acceptedApp == nil || 
acceptedApp.ApplicationInfo.SubmissionTime > 
app.ApplicationInfo.SubmissionTime) {
+   acceptedApp = app
+   }
+   continue
+   }
+   // this is a running or starting app add it to the list
+   filteredApps = append(filteredApps, app)
+   }
+   }
+   // just add the accepted app if we need to: apps are not sorted yet

Review comment:
  

[GitHub] [incubator-yunikorn-core] yangwwei commented on a change in pull request #145: [YUNIKORN-99] Add state based app sorting

2020-05-19 Thread GitBox


yangwwei commented on a change in pull request #145:
URL: 
https://github.com/apache/incubator-yunikorn-core/pull/145#discussion_r427744368



##
File path: pkg/scheduler/sorters.go
##
@@ -47,39 +39,101 @@ func sortQueue(queues []*SchedulingQueue, sortType 
SortType) {
return comp < 0
})
}
+   metrics.GetSchedulerMetrics().ObserveQueueSortingLatency(sortingStart)
 }
 
-func sortApplications(apps []*SchedulingApplication, sortType SortType, 
globalResource *resources.Resource) {
-   // TODO add latency metric
+func sortApplications(apps map[string]*SchedulingApplication, sortType 
policies.SortPolicy, globalResource *resources.Resource) 
[]*SchedulingApplication {
+   sortingStart := time.Now()
+   var sortedApps []*SchedulingApplication
switch sortType {
-   case FairSortPolicy:
+   case policies.FairSortPolicy:
+   sortedApps = filterOnPendingResources(apps)
// Sort by usage
-   sort.SliceStable(apps, func(i, j int) bool {
-   l := apps[i]
-   r := apps[j]
+   sort.SliceStable(sortedApps, func(i, j int) bool {
+   l := sortedApps[i]
+   r := sortedApps[j]
return resources.CompUsageRatio(l.getAssumeAllocated(), 
r.getAssumeAllocated(), globalResource) < 0
})
-   case FifoSortPolicy:
+   case policies.FifoSortPolicy:
+   sortedApps = filterOnPendingResources(apps)
// Sort by submission time oldest first
-   sort.SliceStable(apps, func(i, j int) bool {
-   l := apps[i]
-   r := apps[j]
+   sort.SliceStable(sortedApps, func(i, j int) bool {
+   l := sortedApps[i]
+   r := sortedApps[j]
return l.ApplicationInfo.SubmissionTime < 
r.ApplicationInfo.SubmissionTime
})
+   case policies.StateAwarePolicy:
+   sortedApps = stateAwareFilter(apps)
+   // Sort by submission time oldest first
+   sort.SliceStable(sortedApps, func(i, j int) bool {
+   l := sortedApps[i]
+   r := sortedApps[j]
+   return l.ApplicationInfo.SubmissionTime < 
r.ApplicationInfo.SubmissionTime
+   })
+   }
+   metrics.GetSchedulerMetrics().ObserveAppSortingLatency(sortingStart)
+   return sortedApps
+}
+
+func filterOnPendingResources(apps map[string]*SchedulingApplication) 
[]*SchedulingApplication {
+   filteredApps := make([]*SchedulingApplication, 0)
+   for _, app := range apps {
+   // Only look at app when pending-res > 0
+   if resources.StrictlyGreaterThanZero(app.GetPendingResource()) {
+   filteredApps = append(filteredApps, app)
+   }
+   }
+   return filteredApps
+}
+
+// This filter only allows one (1) application with a state that is not 
running in the list of candidates.
+// The preference is a state of Starting. If we can not find an app with a 
starting state we will use an app
+// with an Accepted state. However if there is an app with a Starting state 
even with no pending resource
+// requests, no Accepted apps can be scheduled. An app in New state does not 
have any asks and can never be
+// scheduled.
+func stateAwareFilter(apps map[string]*SchedulingApplication) 
[]*SchedulingApplication {

Review comment:
   we need UT for this function





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-yunikorn-core] yangwwei commented on a change in pull request #145: [YUNIKORN-99] Add state based app sorting

2020-05-19 Thread GitBox


yangwwei commented on a change in pull request #145:
URL: 
https://github.com/apache/incubator-yunikorn-core/pull/145#discussion_r42773



##
File path: pkg/scheduler/scheduling_application.go
##
@@ -633,3 +668,44 @@ func (sa *SchedulingApplication) recoverOnNode(node 
*SchedulingNode, ask *schedu
zap.Error(err))
}
 }
+
+// Application status methods reflecting the underlying app object state
+// link back to the underlying app object to prevent out of sync states
+func (sa *SchedulingApplication) isAccepted() bool {
+   return sa.ApplicationInfo.IsAccepted()
+}
+
+func (sa *SchedulingApplication) isStarting() bool {
+   return sa.ApplicationInfo.IsStarting()
+}
+
+func (sa *SchedulingApplication) isNew() bool {
+   return sa.ApplicationInfo.IsNew()
+}
+
+func (sa *SchedulingApplication) isWaiting() bool {
+   return sa.ApplicationInfo.IsWaiting()
+}
+
+// Move the app state to running after allocation has been recovered.
+// Since we do not add allocations in the normal way states will not change 
during recovery.
+// There could also be multiple nodes that recover the app and
+// This moves via starting directly to running.
+func (sa *SchedulingApplication) finishRecovery() {
+   // no need to do anything if we are already running
+   if sa.ApplicationInfo.IsRunning() {
+   return
+   }
+   // this is the first recovered allocation: move to starting this cannot 
fail
+   if sa.ApplicationInfo.IsAccepted() {
+   // ignore the errors explicitly and marked as nolint
+   //nolint: errcheck
+   _ = 
sa.ApplicationInfo.HandleApplicationEvent(cache.StartApplication)
+   _ = 
sa.ApplicationInfo.HandleApplicationEvent(cache.RunApplication)
+   }
+   // log unexpected state
+   if !sa.ApplicationInfo.IsRunning() {
+   log.Logger().Warn("State of recovered app is not the expected 
RUNNING state",
+   zap.String("state", 
sa.ApplicationInfo.GetApplicationState()))
+   }
+}

Review comment:
   Similarly, can we handle a `FinishRecoveryEvent` in the state machine?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-yunikorn-core] yangwwei commented on a change in pull request #145: [YUNIKORN-99] Add state based app sorting

2020-05-19 Thread GitBox


yangwwei commented on a change in pull request #145:
URL: 
https://github.com/apache/incubator-yunikorn-core/pull/145#discussion_r427731619



##
File path: pkg/scheduler/scheduling_application.go
##
@@ -183,6 +183,23 @@ func (sa *SchedulingApplication) 
removeAllocationAsk(allocKey string) int {
delete(sa.requests, allocKey)
}
}
+   // Check if we need to change state based on the ask removal:
+   // 1) if pending is zero (no more asks left)
+   // 2) if allocation is zero (nothing is running)
+   // Change the state to waiting
+   if resources.IsZero(sa.pending) && 
resources.IsZero(sa.GetAllocatedResource()) {
+   oldState := sa.ApplicationInfo.GetApplicationState()
+   if err := 
sa.ApplicationInfo.HandleApplicationEvent(cache.WaitApplication); err != nil {
+   log.Logger().Warn("Application state not changed to 
Waiting while removing ask(s)",
+   zap.String("oldState", oldState),
+   zap.Int("releaseAllocs", toRelease),
+   zap.Error(err))
+   } else {
+   log.Logger().Debug("Application state change to Waiting 
while removing ask(s)",
+   zap.String("oldState", oldState),
+   zap.Int("releaseAllocs", toRelease))

Review comment:
   dup logging? we already did the logging for each time when app state 
changes





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-yunikorn-core] yangwwei commented on a change in pull request #145: [YUNIKORN-99] Add state based app sorting

2020-05-19 Thread GitBox


yangwwei commented on a change in pull request #145:
URL: 
https://github.com/apache/incubator-yunikorn-core/pull/145#discussion_r427730285



##
File path: pkg/scheduler/scheduler.go
##
@@ -407,12 +417,6 @@ func (s *Scheduler) processApplicationUpdateEvent(ev 
*schedulerevent.SchedulerAp
acceptedApps = append(acceptedApps, 
&si.AcceptedApplication{
ApplicationID: app.ApplicationID,
})
-   // app is accepted by scheduler
-   err = 
app.HandleApplicationEvent(cache.AcceptApplication)
-   if err != nil {
-   log.Logger().Debug("cache event 
handling error returned",
-   zap.Error(err))
-   }

Review comment:
   The Accepted state means the app is accepted by the state (as the 
opposite to Rejected). This is a proper place for transiting the app to 
`Accepted`. Why do we want to move this to another place? 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-yunikorn-core] yangwwei commented on a change in pull request #145: [YUNIKORN-99] Add state based app sorting

2020-05-19 Thread GitBox


yangwwei commented on a change in pull request #145:
URL: 
https://github.com/apache/incubator-yunikorn-core/pull/145#discussion_r427721780



##
File path: pkg/scheduler/scheduling_application.go
##
@@ -207,10 +224,28 @@ func (sa *SchedulingApplication) addAllocationAsk(ask 
*schedulingAllocationAsk)
oldAskResource = resources.Multiply(oldAsk.AllocatedResource, 
int64(oldAsk.getPendingAskRepeat()))
}
 
-   delta.SubFrom(oldAskResource)
+   // Check if we need to change state based on the ask added, there are 
two cases:
+   // 1) first ask added on a new app: state is New
+   // 2) all asks and allocation have been removed: state is Waiting
+   // Accept the app and get it scheduling (again)
+   if sa.isNew() {
+   if err := 
sa.ApplicationInfo.HandleApplicationEvent(cache.AcceptApplication); err != nil {
+   log.Logger().Debug("Application state change failed 
while adding first ask",
+   zap.String("currentState", 
sa.ApplicationInfo.GetApplicationState()),
+   zap.Error(err))
+   }
+   }
+   if sa.isWaiting() {
+   if err := 
sa.ApplicationInfo.HandleApplicationEvent(cache.RunApplication); err != nil {
+   log.Logger().Debug("Application state change failed 
while adding new ask",
+   zap.String("currentState", 
sa.ApplicationInfo.GetApplicationState()),
+   zap.Error(err))
+   }
+   }

Review comment:
   Similar to the comments I had for the `addAllocation`, I think we can 
achieve this by leveraging the state machine.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-yunikorn-core] yangwwei commented on a change in pull request #145: [YUNIKORN-99] Add state based app sorting

2020-05-19 Thread GitBox


yangwwei commented on a change in pull request #145:
URL: 
https://github.com/apache/incubator-yunikorn-core/pull/145#discussion_r427720947



##
File path: pkg/cache/application_info.go
##
@@ -73,45 +81,117 @@ func (ai *ApplicationInfo) GetAllAllocations() 
[]*AllocationInfo {
return allocations
 }
 
-// Return the current state for the application.
+// Return the current state or a checked specific state for the application.
 // The state machine handles the locking.
 func (ai *ApplicationInfo) GetApplicationState() string {
return ai.stateMachine.Current()
 }
 
+func (ai *ApplicationInfo) IsStarting() bool {
+   return ai.stateMachine.Is(Starting.String())
+}
+
+func (ai *ApplicationInfo) IsAccepted() bool {
+   return ai.stateMachine.Is(Accepted.String())
+}
+
+func (ai *ApplicationInfo) IsNew() bool {
+   return ai.stateMachine.Is(New.String())
+}
+
+func (ai *ApplicationInfo) IsRunning() bool {
+   return ai.stateMachine.Is(Running.String())
+}
+
+func (ai *ApplicationInfo) IsWaiting() bool {
+   return ai.stateMachine.Is(Waiting.String())
+}
+
 // Handle the state event for the application.
 // The state machine handles the locking.
-func (ai *ApplicationInfo) HandleApplicationEvent(event ApplicationEvent) 
error {
-   err := ai.stateMachine.Event(event.String(), ai.ApplicationID)
+func (ai *ApplicationInfo) HandleApplicationEvent(event applicationEvent) 
error {
+   err := ai.stateMachine.Event(event.String(), ai)
// handle the same state transition not nil error (limit of fsm).
if err != nil && err.Error() == "no transition" {
return nil
}
return err
 }
 
+// Set the starting timer to make sure the application will not get stuck in a 
starting state too long.
+// This prevents an app from not progressing to Running when it only has 1 
allocation.
+// Called when entering the Starting state by the state machine.
+func (ai *ApplicationInfo) setStartingTimer() {
+   ai.Lock()
+   defer ai.Unlock()
+
+   log.Logger().Debug("Application Starting state timer initiated",
+   zap.String("appID", ai.ApplicationID),
+   zap.Duration("timeout", startingTimeout))
+   ai.stateTimer = time.AfterFunc(startingTimeout, ai.timeOutStarting)
+}
+
+// Clear the starting timer. If the application has progressed out of the 
starting state we need to stop the
+// timer and clean up.
+// Called when leaving the Starting state by the state machine.
+func (ai *ApplicationInfo) clearStartingTimer() {
+   ai.Lock()
+   defer ai.Unlock()
+
+   ai.stateTimer.Stop()
+   ai.stateTimer = nil
+}
+
+// In case of state aware scheduling we do not want to get stuck in starting 
as we might have an application that only
+// requires one allocation.
+// This will progress the state of the application from Starting to Running
+func (ai *ApplicationInfo) timeOutStarting() {
+   // make sure we are still in the right state
+   // we could have been killed or something might have happened while 
waiting for a lock
+   if ai.IsStarting() {
+   log.Logger().Warn("Application in starting state timed out: 
auto progress",
+   zap.String("applicationID", ai.ApplicationID),
+   zap.String("state", ai.stateMachine.Current()))
+
+   //nolint: errcheck
+   _ = ai.HandleApplicationEvent(RunApplication)
+   }
+}
+
 // Return the total allocated resources for the application.
 func (ai *ApplicationInfo) GetAllocatedResource() *resources.Resource {
-   ai.lock.RLock()
-   defer ai.lock.RUnlock()
+   ai.RLock()
+   defer ai.RUnlock()
 
return ai.allocatedResource.Clone()
 }
 
 // Set the leaf queue the application runs in. Update the queue name also to 
match as this might be different from the
 // queue that was given when submitting the application.
 func (ai *ApplicationInfo) SetQueue(leaf *QueueInfo) {
-   ai.lock.Lock()
-   defer ai.lock.Unlock()
+   ai.Lock()
+   defer ai.Unlock()
 
ai.leafQueue = leaf
ai.QueueName = leaf.GetQueuePath()
 }
 
 // Add a new allocation to the application
 func (ai *ApplicationInfo) addAllocation(info *AllocationInfo) {
-   ai.lock.Lock()
-   defer ai.lock.Unlock()
+   // progress the state based on where we are, we cannot fail in this case
+   // do NOT change the order as that will break the state
+   // ignore the errors explicitly and marked as nolint
+   if ai.IsStarting() {
+   //nolint: errcheck
+   _ = ai.HandleApplicationEvent(RunApplication)
+   }
+   if ai.IsAccepted() {
+   //nolint: errcheck
+   _ = ai.HandleApplicationEvent(StartApplication)
+   }
+   // add the allocation
+   ai.Lock()
+   defer ai.Unlock()

Review comment:
   Do we need these state checks?
   I think we should leverage the state m