wilfred-s commented on code in PR #1061:
URL: https://github.com/apache/yunikorn-core/pull/1061#discussion_r2692671753


##########
pkg/scheduler/objects/queue.go:
##########
@@ -392,85 +382,120 @@ func (sq *Queue) applyConf(conf configs.QueueConfig, 
silence bool) error {
 
        if !sq.isLeaf {
                if err = sq.setTemplate(conf.ChildTemplate); err != nil {
-                       return err
+                       return nil, err
                }
        }
 
+       oldMaxResource := sq.maxResource
        // Load the max & guaranteed resources and maxApps for all but the root 
queue
        if sq.Name != configs.RootQueue {
-               oldMaxResource := sq.maxResource
                if err = sq.setResourcesFromConf(conf.Resources); err != nil {
-                       return err
+                       return nil, err
                }
                sq.maxRunningApps = conf.MaxApplications
                sq.updateMaxRunningAppsMetrics()
-               sq.setPreemptionSettings(oldMaxResource, conf)
        }
-
        sq.properties = conf.Properties
-       return nil
+       return oldMaxResource, nil
 }
 
-// setPreemptionSettings Set Quota change preemption settings
-func (sq *Queue) setPreemptionSettings(oldMaxResource *resources.Resource, 
conf configs.QueueConfig) {
-       newMaxResource, err := resources.NewResourceFromConf(conf.Resources.Max)
-       if err != nil {
-               log.Log(log.SchedQueue).Error("parsing failed on max resources 
this should not happen",
-                       zap.String("queue", sq.QueuePath),
-                       zap.Error(err))
+// setPreemptionTime set the time the quota preemption should be triggered 
when a quota is changed.
+// Updates the time if the delay is set / changes and the preemption is not 
running yet.
+// This function MUST be called holding the lock for the queue.
+func (sq *Queue) setPreemptionTime(oldMaxResource *resources.Resource, 
oldDelay time.Duration) {
+       // if quota preemption is running nothing we do not have an influence
+       // if the delay for the queue is not set do not trigger preemption
+       if sq.isQuotaPreemptionRunning || sq.quotaPreemptionDelay == 0 {
                return
        }
-
-       switch {
-       // Set max res earlier but not now
-       case resources.IsZero(newMaxResource) && 
!resources.IsZero(oldMaxResource):
-               sq.quotaChangePreemptionDelay = 0
-               sq.quotaChangePreemptionStartTime = time.Time{}
-               // Set max res now but not earlier
-       case !resources.IsZero(newMaxResource) && 
resources.IsZero(oldMaxResource) && conf.Preemption.Delay != 0:
-               sq.quotaChangePreemptionDelay = conf.Preemption.Delay
-               sq.quotaChangePreemptionStartTime = 
time.Now().Add(time.Duration(int64(sq.quotaChangePreemptionDelay)) * 
time.Second) //nolint:gosec
-               // Set max res earlier and now as well
-       default:
-               switch {
-               // Quota decrease
-               case resources.StrictlyGreaterThan(oldMaxResource, 
newMaxResource) && conf.Preemption.Delay != 0:
-                       sq.quotaChangePreemptionDelay = conf.Preemption.Delay
-                       sq.quotaChangePreemptionStartTime = 
time.Now().Add(time.Duration(sq.quotaChangePreemptionDelay) * time.Second) 
//nolint:gosec
-                       // Quota increase
-               case resources.StrictlyGreaterThan(newMaxResource, 
oldMaxResource) && conf.Preemption.Delay != 0:
-                       sq.quotaChangePreemptionDelay = 0
-                       sq.quotaChangePreemptionStartTime = time.Time{}
-                       // Quota remains as is but delay has changed
-               case resources.Equals(oldMaxResource, newMaxResource) && 
conf.Preemption.Delay != 0 && sq.quotaChangePreemptionDelay != 
conf.Preemption.Delay:
-                       sq.quotaChangePreemptionDelay = conf.Preemption.Delay
-                       sq.quotaChangePreemptionStartTime = 
time.Now().Add(time.Duration(sq.quotaChangePreemptionDelay) * time.Second) 
//nolint:gosec
-               default:
-                       // noop
+       // if no current limit we should not preempt even if it was set 
earlier, clear the start time
+       if resources.IsZero(sq.maxResource) {
+               sq.quotaPreemptionStartTime = time.Time{}
+               log.Log(log.SchedQueue).Info("removed quota preemption start 
time",
+                       zap.String("queue", sq.QueuePath))
+               return
+       }
+       // adjust the startup based on the diff between the delays if no change
+       if resources.Equals(oldMaxResource, sq.maxResource) {
+               if sq.quotaPreemptionStartTime.IsZero() {
+                       // if a delay is set later than the quota change we 
would like to trigger preemption even if no change for
+                       // max resources is detected. preemption trigger will 
clean up if usage is below max already.
+                       if oldDelay == 0 && sq.quotaPreemptionDelay > 0 {
+                               sq.quotaPreemptionStartTime = 
time.Now().Add(sq.quotaPreemptionDelay)
+                               log.Log(log.SchedQueue).Info("set quota 
preemption start time based on delay change",
+                                       zap.String("queue", sq.QueuePath),
+                                       zap.Time("quotaPreemptionStartTime", 
sq.quotaPreemptionStartTime))
+                       }
+               } else {
+                       if oldDelay != sq.quotaPreemptionDelay {
+                               // apply the delta: this can be positive or 
negative does not matter. The resulting time could be in the
+                               // past: also not an issue.
+                               sq.quotaPreemptionStartTime = 
sq.quotaPreemptionStartTime.Add(sq.quotaPreemptionDelay - oldDelay)
+                               log.Log(log.SchedQueue).Info("updated quota 
preemption start time based on delay change",
+                                       zap.String("queue", sq.QueuePath),
+                                       zap.Time("quotaPreemptionStartTime", 
sq.quotaPreemptionStartTime),
+                                       zap.Duration("delta", 
sq.quotaPreemptionDelay-oldDelay))
+                       }
                }
+               return
+       }
+       // quota has been lowered, need to set the start time.
+       if resources.StrictlyGreaterThan(oldMaxResource, sq.maxResource) {
+               if !sq.quotaPreemptionStartTime.IsZero() {
+                       log.Log(log.SchedQueue).Info("quota preemption time 
already set from earlier quota change",
+                               zap.String("queue", sq.QueuePath),
+                               zap.Time("quotaPreemptionStartTime", 
sq.quotaPreemptionStartTime))
+                       return

Review Comment:
   > Another question, are we clearing start time when quota has lowered at T1 
and then increased again at T2 before the delay has passed?
   
   There are two cases:
   1: if the quota at T2 is above the usage at the point in time the change is 
made quota preemption will be cancelled by resetting the start time.
   2: if the quota at T2 is below the usage the old start time will be used but 
the new quota will be enforced not the old one. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to