wilfred-s commented on code in PR #429:
URL: https://github.com/apache/yunikorn-core/pull/429#discussion_r945412946
##########
pkg/scheduler/objects/queue.go:
##########
@@ -657,6 +660,9 @@ func (sq *Queue) addChildQueue(child *Queue) error {
if sq.IsDraining() {
return fmt.Errorf("cannot add a child queue when queue is
marked for deletion: %s", sq.QueuePath)
}
+ if sq.maxRunningApps != 0 && sq.maxRunningApps < child.maxRunningApps {
+ return fmt.Errorf("parent maxRunningApps must be larger than
child maxRunningApps")
+ }
Review Comment:
This belongs in the config validations check, like we do for the "other"
resources. Otherwise we could fail during the config change and leave the
system in a strange state.
##########
pkg/scheduler/objects/application_state.go:
##########
@@ -143,6 +143,7 @@ func NewAppState() *fsm.FSM {
},
fmt.Sprintf("enter_%s", Starting.String()): func(event
*fsm.Event) {
app := event.Args[0].(*Application)
//nolint:errcheck
+ app.queue.incRunningApps()
Review Comment:
This is the correct point (see the [usage tracking
doc](https://docs.google.com/document/d/19FYOL1Uw7tSf74qyuVN40Fv-GdWd2KrBcHJpUQLTBZ4/edit#heading=h.481iug6rohb3))
but we need to make sure the metrics are updated in the same place as the
queue itself.
##########
pkg/scheduler/objects/queue.go:
##########
@@ -1292,3 +1304,67 @@ func (sq *Queue) String() string {
return fmt.Sprintf("{QueuePath: %s, State: %s, StateTime: %x,
MaxResource: %s}",
sq.QueuePath, sq.stateMachine.Current(), sq.stateTime,
sq.maxResource)
}
+
+func (sq *Queue) incRunningApps() {
+ if sq == nil {
+ return
+ }
+ if sq.parent != nil {
+ sq.parent.incRunningApps()
+ }
+ sq.internalIncRunningApps()
+}
+
+func (sq *Queue) internalIncRunningApps() {
+ sq.Lock()
+ defer sq.Unlock()
+ sq.runningApps++
+}
+
+func (sq *Queue) decRunningApps() {
+ if sq == nil {
+ return
+ }
+ if sq.parent != nil {
+ sq.parent.decRunningApps()
+ }
+ sq.internalDecRunningApps()
+}
+
+func (sq *Queue) internalDecRunningApps() {
+ sq.Lock()
+ defer sq.Unlock()
+ sq.runningApps--
+}
Review Comment:
This can be folded into the `decRunningApps()`
##########
pkg/scheduler/objects/queue.go:
##########
@@ -1292,3 +1304,67 @@ func (sq *Queue) String() string {
return fmt.Sprintf("{QueuePath: %s, State: %s, StateTime: %x,
MaxResource: %s}",
sq.QueuePath, sq.stateMachine.Current(), sq.stateTime,
sq.maxResource)
}
+
+func (sq *Queue) incRunningApps() {
+ if sq == nil {
+ return
+ }
+ if sq.parent != nil {
+ sq.parent.incRunningApps()
+ }
+ sq.internalIncRunningApps()
+}
+
+func (sq *Queue) internalIncRunningApps() {
+ sq.Lock()
+ defer sq.Unlock()
+ sq.runningApps++
+}
+
+func (sq *Queue) decRunningApps() {
+ if sq == nil {
+ return
+ }
+ if sq.parent != nil {
+ sq.parent.decRunningApps()
+ }
+ sq.internalDecRunningApps()
+}
+
+func (sq *Queue) internalDecRunningApps() {
+ sq.Lock()
+ defer sq.Unlock()
+ sq.runningApps--
+}
+
+func (sq *Queue) canRun() bool {
+ if sq == nil {
+ return false
+ }
+ ok := true
+ if sq.parent != nil {
+ ok = sq.parent.canRun()
+ }
+ return sq.internalCanRun(ok)
+}
+
+func (sq *Queue) internalCanRun(ok bool) bool {
+ sq.RLock()
+ defer sq.RUnlock()
+ if sq.maxRunningApps == 0 {
+ return true && ok
+ }
+ return ok && sq.runningApps < sq.maxRunningApps
+}
+
+func (sq *Queue) GetRunningApps() uint64 {
+ sq.RLock()
+ defer sq.RUnlock()
+ return sq.runningApps
+}
+
+func (sq *Queue) SetMaxRunningApps(maxApps uint64) {
Review Comment:
Add a comment to the exported function to show it is test only
##########
pkg/scheduler/objects/queue.go:
##########
@@ -1292,3 +1304,67 @@ func (sq *Queue) String() string {
return fmt.Sprintf("{QueuePath: %s, State: %s, StateTime: %x,
MaxResource: %s}",
sq.QueuePath, sq.stateMachine.Current(), sq.stateTime,
sq.maxResource)
}
+
+func (sq *Queue) incRunningApps() {
+ if sq == nil {
+ return
+ }
+ if sq.parent != nil {
+ sq.parent.incRunningApps()
+ }
+ sq.internalIncRunningApps()
+}
+
+func (sq *Queue) internalIncRunningApps() {
+ sq.Lock()
+ defer sq.Unlock()
+ sq.runningApps++
+}
+
+func (sq *Queue) decRunningApps() {
+ if sq == nil {
+ return
+ }
+ if sq.parent != nil {
+ sq.parent.decRunningApps()
+ }
+ sq.internalDecRunningApps()
+}
+
+func (sq *Queue) internalDecRunningApps() {
+ sq.Lock()
+ defer sq.Unlock()
+ sq.runningApps--
+}
+
+func (sq *Queue) canRun() bool {
+ if sq == nil {
+ return false
+ }
+ ok := true
+ if sq.parent != nil {
+ ok = sq.parent.canRun()
+ }
+ return sq.internalCanRun(ok)
+}
+
+func (sq *Queue) internalCanRun(ok bool) bool {
+ sq.RLock()
+ defer sq.RUnlock()
+ if sq.maxRunningApps == 0 {
+ return true && ok
+ }
+ return ok && sq.runningApps < sq.maxRunningApps
+}
Review Comment:
I had to really look twice to grasp what was going on with the `ok` flag.
Isn't this a much simpler flow and faster, as we fail fast if a parent has
failed:
```
func (sq *Queue) canRun() bool {
if sq == nil {
return false
}
// check the parent(s)
if sq.parent != nil {
ok := sq.parent.canRun()
if !ok {
return false
}
}
// check this queue only if all parents allowed it
sq.RLock()
defer sq.RUnlock()
if sq.maxRunningApps == 0 {
return true
}
return sq.runningApps < sq.maxRunningApps
}
```
##########
pkg/scheduler/objects/queue.go:
##########
@@ -1292,3 +1304,67 @@ func (sq *Queue) String() string {
return fmt.Sprintf("{QueuePath: %s, State: %s, StateTime: %x,
MaxResource: %s}",
sq.QueuePath, sq.stateMachine.Current(), sq.stateTime,
sq.maxResource)
}
+
+func (sq *Queue) incRunningApps() {
+ if sq == nil {
+ return
+ }
+ if sq.parent != nil {
+ sq.parent.incRunningApps()
+ }
+ sq.internalIncRunningApps()
+}
+
+func (sq *Queue) internalIncRunningApps() {
+ sq.Lock()
+ defer sq.Unlock()
+ sq.runningApps++
+}
Review Comment:
This can be folded into the `incRunningApps()`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]