pbacsko commented on code in PR #1034:
URL: https://github.com/apache/yunikorn-core/pull/1034#discussion_r2451497199
##########
pkg/scheduler/objects/queue.go:
##########
@@ -74,42 +74,46 @@ type Queue struct {
// The queue properties should be treated as immutable the value is a
merge of the
// parent properties with the config for this queue only manipulated
during creation
// of the queue or via a queue configuration update.
- properties map[string]string
- adminACL security.ACL // admin ACL
- submitACL security.ACL // submit ACL
- maxResource *resources.Resource // When not set, max = nil
- guaranteedResource *resources.Resource // When not set, Guaranteed
== 0
- isLeaf bool // this is a leaf queue or
not (i.e. parent)
- isManaged bool // queue is part of the
config, not auto created
- stateMachine *fsm.FSM // the state of the queue
for scheduling
- stateTime time.Time // last time the state was
updated (needed for cleanup)
- maxRunningApps uint64
- runningApps uint64
- allocatingAcceptedApps map[string]bool
- template *template.Template
- queueEvents *schedEvt.QueueEvents
+ properties map[string]string
+ adminACL security.ACL // admin ACL
+ submitACL security.ACL // submit ACL
+ maxResource *resources.Resource // When not set, max =
nil
+ guaranteedResource *resources.Resource // When not set,
Guaranteed == 0
+ isLeaf bool // this is a leaf queue
or not (i.e. parent)
+ isManaged bool // queue is part of the
config, not auto created
+ stateMachine *fsm.FSM // the state of the
queue for scheduling
+ stateTime time.Time // last time the state
was updated (needed for cleanup)
+ maxRunningApps uint64
+ runningApps uint64
+ allocatingAcceptedApps map[string]bool
+ template *template.Template
+ queueEvents *schedEvt.QueueEvents
+ quotaChangePreemptionDelay uint64
+ quotaChangePreemptionTimer *time.Timer
Review Comment:
The curical question of this PR: do we want to initiate the preemption logic
from a Timer goroutine? Because I'm very much against that. I think it's much
easier to reason about the quota preemption vs other preemption if it happens
in the scheduling cycle.
##########
pkg/scheduler/objects/queue.go:
##########
@@ -369,17 +372,74 @@ func (sq *Queue) applyConf(conf configs.QueueConfig,
silence bool) error {
// Load the max & guaranteed resources and maxApps for all but the root
queue
if sq.Name != configs.RootQueue {
+ oldMaxResource := sq.maxResource
if err = sq.setResourcesFromConf(conf.Resources); err != nil {
return err
}
sq.maxRunningApps = conf.MaxApplications
sq.updateMaxRunningAppsMetrics()
+ sq.setPreemptionSettings(oldMaxResource, conf)
}
sq.properties = conf.Properties
return nil
}
+// setPreemptionSettings Set Quota change preemption settings
+func (sq *Queue) setPreemptionSettings(oldMaxResource *resources.Resource,
conf configs.QueueConfig) {
+ newMaxResource, err := resources.NewResourceFromConf(conf.Resources.Max)
+ if err != nil {
+ log.Log(log.SchedQueue).Error("parsing failed on max resources
this should not happen",
+ zap.String("queue", sq.QueuePath),
+ zap.Error(err))
+ return
+ }
+ reset := false
+ set := false
+ switch {
+ // Set max res earlier but not now
+ case resources.IsZero(newMaxResource) &&
!resources.IsZero(oldMaxResource):
+ reset = true
+ // Set max res now but not earlier
+ case !resources.IsZero(newMaxResource) &&
resources.IsZero(oldMaxResource) && conf.Preemption.Delay != 0:
+ set = true
+ // Set max res earlier and now as well
+ default:
+ switch {
+ // Quota decrease
+ case resources.StrictlyGreaterThan(oldMaxResource,
newMaxResource) && conf.Preemption.Delay != 0:
+ set = true
+ // Quota remains as is but delay has changed
+ case resources.Equals(oldMaxResource, newMaxResource) &&
conf.Preemption.Delay != 0 && sq.quotaChangePreemptionDelay !=
conf.Preemption.Delay:
+ sq.quotaChangePreemptionDelay = conf.Preemption.Delay
+
sq.quotaChangePreemptionTimer.Reset(time.Duration(sq.quotaChangePreemptionDelay))
Review Comment:
That's why I don't like having a Timer object here. We need to rely on
proper understanding of how Timer works in Go, see:
https://antonz.org/timer-reset/.
##########
pkg/scheduler/objects/queue.go:
##########
@@ -369,17 +372,74 @@ func (sq *Queue) applyConf(conf configs.QueueConfig,
silence bool) error {
// Load the max & guaranteed resources and maxApps for all but the root
queue
if sq.Name != configs.RootQueue {
+ oldMaxResource := sq.maxResource
if err = sq.setResourcesFromConf(conf.Resources); err != nil {
return err
}
sq.maxRunningApps = conf.MaxApplications
sq.updateMaxRunningAppsMetrics()
+ sq.setPreemptionSettings(oldMaxResource, conf)
}
sq.properties = conf.Properties
return nil
}
+// setPreemptionSettings Set Quota change preemption settings
+func (sq *Queue) setPreemptionSettings(oldMaxResource *resources.Resource,
conf configs.QueueConfig) {
+ newMaxResource, err := resources.NewResourceFromConf(conf.Resources.Max)
+ if err != nil {
+ log.Log(log.SchedQueue).Error("parsing failed on max resources
this should not happen",
+ zap.String("queue", sq.QueuePath),
+ zap.Error(err))
+ return
+ }
+ reset := false
+ set := false
+ switch {
+ // Set max res earlier but not now
+ case resources.IsZero(newMaxResource) &&
!resources.IsZero(oldMaxResource):
+ reset = true
+ // Set max res now but not earlier
+ case !resources.IsZero(newMaxResource) &&
resources.IsZero(oldMaxResource) && conf.Preemption.Delay != 0:
+ set = true
+ // Set max res earlier and now as well
+ default:
+ switch {
+ // Quota decrease
+ case resources.StrictlyGreaterThan(oldMaxResource,
newMaxResource) && conf.Preemption.Delay != 0:
+ set = true
+ // Quota remains as is but delay has changed
+ case resources.Equals(oldMaxResource, newMaxResource) &&
conf.Preemption.Delay != 0 && sq.quotaChangePreemptionDelay !=
conf.Preemption.Delay:
+ sq.quotaChangePreemptionDelay = conf.Preemption.Delay
+
sq.quotaChangePreemptionTimer.Reset(time.Duration(sq.quotaChangePreemptionDelay))
+ // Quota increase
+ default:
+ reset = true
+ }
+ }
+
+ // Set preemption settings
+ if set {
+ sq.quotaChangePreemptionDelay = conf.Preemption.Delay
+ sq.quotaChangePreemptionTimer =
time.AfterFunc(time.Duration(sq.quotaChangePreemptionDelay),
sq.tryPreemptionToEnforceQuota)
+ }
+
+ // Reset preemption settings
+ if reset {
+ if sq.quotaChangePreemptionDelay != 0 {
+ sq.quotaChangePreemptionDelay = 0
+ sq.quotaChangePreemptionTimer.Stop()
+ sq.quotaChangePreemptionTimer = nil
+ }
+ }
+}
+
+// tryPreemptionToEnforceQuota Try Preemption to enforce quota change if the
quota change preemption delay is reached
+func (sq *Queue) tryPreemptionToEnforceQuota() {
+
+}
Review Comment:
Leave this out if no impl/nothing calls this yet.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]