pbacsko commented on code in PR #1034:
URL: https://github.com/apache/yunikorn-core/pull/1034#discussion_r2451497199


##########
pkg/scheduler/objects/queue.go:
##########
@@ -74,42 +74,46 @@ type Queue struct {
        // The queue properties should be treated as immutable the value is a 
merge of the
        // parent properties with the config for this queue only manipulated 
during creation
        // of the queue or via a queue configuration update.
-       properties             map[string]string
-       adminACL               security.ACL        // admin ACL
-       submitACL              security.ACL        // submit ACL
-       maxResource            *resources.Resource // When not set, max = nil
-       guaranteedResource     *resources.Resource // When not set, Guaranteed 
== 0
-       isLeaf                 bool                // this is a leaf queue or 
not (i.e. parent)
-       isManaged              bool                // queue is part of the 
config, not auto created
-       stateMachine           *fsm.FSM            // the state of the queue 
for scheduling
-       stateTime              time.Time           // last time the state was 
updated (needed for cleanup)
-       maxRunningApps         uint64
-       runningApps            uint64
-       allocatingAcceptedApps map[string]bool
-       template               *template.Template
-       queueEvents            *schedEvt.QueueEvents
+       properties                 map[string]string
+       adminACL                   security.ACL        // admin ACL
+       submitACL                  security.ACL        // submit ACL
+       maxResource                *resources.Resource // When not set, max = 
nil
+       guaranteedResource         *resources.Resource // When not set, 
Guaranteed == 0
+       isLeaf                     bool                // this is a leaf queue 
or not (i.e. parent)
+       isManaged                  bool                // queue is part of the 
config, not auto created
+       stateMachine               *fsm.FSM            // the state of the 
queue for scheduling
+       stateTime                  time.Time           // last time the state 
was updated (needed for cleanup)
+       maxRunningApps             uint64
+       runningApps                uint64
+       allocatingAcceptedApps     map[string]bool
+       template                   *template.Template
+       queueEvents                *schedEvt.QueueEvents
+       quotaChangePreemptionDelay uint64
+       quotaChangePreemptionTimer *time.Timer

Review Comment:
   The curical question of this PR: do we want to initiate the preemption logic 
from a Timer goroutine? Because I'm very much against that. I think it's much 
easier to reason about the quota preemption vs other preemption if it happens 
in the scheduling cycle.



##########
pkg/scheduler/objects/queue.go:
##########
@@ -369,17 +372,74 @@ func (sq *Queue) applyConf(conf configs.QueueConfig, 
silence bool) error {
 
        // Load the max & guaranteed resources and maxApps for all but the root 
queue
        if sq.Name != configs.RootQueue {
+               oldMaxResource := sq.maxResource
                if err = sq.setResourcesFromConf(conf.Resources); err != nil {
                        return err
                }
                sq.maxRunningApps = conf.MaxApplications
                sq.updateMaxRunningAppsMetrics()
+               sq.setPreemptionSettings(oldMaxResource, conf)
        }
 
        sq.properties = conf.Properties
        return nil
 }
 
+// setPreemptionSettings Set Quota change preemption settings
+func (sq *Queue) setPreemptionSettings(oldMaxResource *resources.Resource, 
conf configs.QueueConfig) {
+       newMaxResource, err := resources.NewResourceFromConf(conf.Resources.Max)
+       if err != nil {
+               log.Log(log.SchedQueue).Error("parsing failed on max resources 
this should not happen",
+                       zap.String("queue", sq.QueuePath),
+                       zap.Error(err))
+               return
+       }
+       reset := false
+       set := false
+       switch {
+       // Set max res earlier but not now
+       case resources.IsZero(newMaxResource) && 
!resources.IsZero(oldMaxResource):
+               reset = true
+               // Set max res now but not earlier
+       case !resources.IsZero(newMaxResource) && 
resources.IsZero(oldMaxResource) && conf.Preemption.Delay != 0:
+               set = true
+               // Set max res earlier and now as well
+       default:
+               switch {
+               // Quota decrease
+               case resources.StrictlyGreaterThan(oldMaxResource, 
newMaxResource) && conf.Preemption.Delay != 0:
+                       set = true
+                       // Quota remains as is but delay has changed
+               case resources.Equals(oldMaxResource, newMaxResource) && 
conf.Preemption.Delay != 0 && sq.quotaChangePreemptionDelay != 
conf.Preemption.Delay:
+                       sq.quotaChangePreemptionDelay = conf.Preemption.Delay
+                       
sq.quotaChangePreemptionTimer.Reset(time.Duration(sq.quotaChangePreemptionDelay))

Review Comment:
   That's why I don't like having a Timer object here.  We need to rely on 
proper understanding of how Timer works in Go, see: 
https://antonz.org/timer-reset/.



##########
pkg/scheduler/objects/queue.go:
##########
@@ -369,17 +372,74 @@ func (sq *Queue) applyConf(conf configs.QueueConfig, 
silence bool) error {
 
        // Load the max & guaranteed resources and maxApps for all but the root 
queue
        if sq.Name != configs.RootQueue {
+               oldMaxResource := sq.maxResource
                if err = sq.setResourcesFromConf(conf.Resources); err != nil {
                        return err
                }
                sq.maxRunningApps = conf.MaxApplications
                sq.updateMaxRunningAppsMetrics()
+               sq.setPreemptionSettings(oldMaxResource, conf)
        }
 
        sq.properties = conf.Properties
        return nil
 }
 
+// setPreemptionSettings Set Quota change preemption settings
+func (sq *Queue) setPreemptionSettings(oldMaxResource *resources.Resource, 
conf configs.QueueConfig) {
+       newMaxResource, err := resources.NewResourceFromConf(conf.Resources.Max)
+       if err != nil {
+               log.Log(log.SchedQueue).Error("parsing failed on max resources 
this should not happen",
+                       zap.String("queue", sq.QueuePath),
+                       zap.Error(err))
+               return
+       }
+       reset := false
+       set := false
+       switch {
+       // Set max res earlier but not now
+       case resources.IsZero(newMaxResource) && 
!resources.IsZero(oldMaxResource):
+               reset = true
+               // Set max res now but not earlier
+       case !resources.IsZero(newMaxResource) && 
resources.IsZero(oldMaxResource) && conf.Preemption.Delay != 0:
+               set = true
+               // Set max res earlier and now as well
+       default:
+               switch {
+               // Quota decrease
+               case resources.StrictlyGreaterThan(oldMaxResource, 
newMaxResource) && conf.Preemption.Delay != 0:
+                       set = true
+                       // Quota remains as is but delay has changed
+               case resources.Equals(oldMaxResource, newMaxResource) && 
conf.Preemption.Delay != 0 && sq.quotaChangePreemptionDelay != 
conf.Preemption.Delay:
+                       sq.quotaChangePreemptionDelay = conf.Preemption.Delay
+                       
sq.quotaChangePreemptionTimer.Reset(time.Duration(sq.quotaChangePreemptionDelay))
+               // Quota increase
+               default:
+                       reset = true
+               }
+       }
+
+       // Set preemption settings
+       if set {
+               sq.quotaChangePreemptionDelay = conf.Preemption.Delay
+               sq.quotaChangePreemptionTimer = 
time.AfterFunc(time.Duration(sq.quotaChangePreemptionDelay), 
sq.tryPreemptionToEnforceQuota)
+       }
+
+       // Reset preemption settings
+       if reset {
+               if sq.quotaChangePreemptionDelay != 0 {
+                       sq.quotaChangePreemptionDelay = 0
+                       sq.quotaChangePreemptionTimer.Stop()
+                       sq.quotaChangePreemptionTimer = nil
+               }
+       }
+}
+
+// tryPreemptionToEnforceQuota Try Preemption to enforce quota change if the 
quota change preemption delay is reached
+func (sq *Queue) tryPreemptionToEnforceQuota() {
+
+}

Review Comment:
   Leave this out if no impl/nothing calls this yet.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to